Apache Kafka in Docker Container and Implement it's Functionalities with Python

brown brick wall during daytime
Photo by Francisco De Nova on Unsplash

According to the website, Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

In this post, I am going to share a basic way to start using Apache Kafka with python.

brown brick wall during daytime
Photo by Francisco De Nova on Unsplash

What are the tools needed?

  • docker-compose

Docker compose will be used to run kafka and it’s dependencies.

  1. zookeeper
  2. kafka
  3. kafka-ui

The compose file-

version: "2"

services:
  zookeeper:
    container_name: zookeeper
    image: docker.io/bitnami/zookeeper:3.7
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka:
    container_name: kafka
    image: docker.io/bitnami/kafka:2
    ports:
      - "9092:9092"
      - "9093:9093"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "18080:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
    depends_on:
      - kafka
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

As you can see, I have used zookeeper and kafka images from bitnami. To get a generic view of kafka instance, I have used an opensource project from Github. The project is called kafka-ui which is managed by provectus. It is a web ui which can be accessed via hosts localhost with the port of 18080, as you can see from the compose file.

Let you want to produce some message and ship it to kafka.

# kproducer.py
import datetime
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9093')

try:
    for _ in range(100):
        the_dt = str(datetime.datetime.utcnow())
        val = f"Count: {_} at {the_dt}".encode(encoding='utf8')
        producer.send(topic="KafkaExplored", value=val)
    producer.close()
except Exception as ex:
    print(ex)

for consuming –

# kconsumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('KafkaExplored', bootstrap_servers='localhost:9093')

for msg in consumer:
    topic = msg[0]
    value = msg[6]
    print(msg)
    print(f"{topic}:{value.decode()}")

There are several kafka client for python, but I have used – kafka-python

First, you want to kconsumer.py and then from another terminal you can run the kproduce.py to produce the message.

In kconsumer.py terminal, you will get the messages –

ConsumerRecord(topic='KafkaExplored', partition=0, offset=98, timestamp=1625160633441, timestamp_type=0, key=None, value=b'Count: 98 at 2021-07-01 17:30:33.441293', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)
KafkaExplored:Count: 98 at 2021-07-01 17:30:33.441293
ConsumerRecord(topic='KafkaExplored', partition=0, offset=99, timestamp=1625160633441, timestamp_type=0, key=None, value=b'Count: 99 at 2021-07-01 17:30:33.441448', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)
KafkaExplored:Count: 99 at 2021-07-01 17:30:33.441448

If you want to explore the api for accessing kafka using this kafka-python, you can look into here.