Python Examples - Eventador.io

Python Examples

Kafka-Python

Kafka-Python is a Python client Apache Kafka distributed stream processing system. Kafka-python is designed to function much like the official java client, with a sprinkling of pythonic interfaces (e.g., consumer iterators).

Prerequisites

Step 1: Get your endpoint

Note: make sure you have added the IP address of your machine or you will not be able to access your cluster (see “Add an ACL” in the getting started guide).

Step 2: Create a producer

A producer is a Kafka client that publishes records to the Kafka cluster. KafkaProducer has many different configurations which you can read about here. For this example we will stick with a basic producer.

Tip: If you have a single broker (developer plan) this will be a single endpoint ending in eventador.io:9092. If you have a plan with multiple brokers, you can add additional endpoints separated by a comma. Ex: 'endpoint0:9092,endpoint1:9092,endpoint2:9092'

# producer.py

KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1

Now we add a the producer. You will need to import KafkaProducer

# producer.py

from kafka import KafkaProducer

KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1

producer = KafkaProducer(bootstrap_servers=KAFKA_BROKERS)

Step 3: Send some messages

# producer.py

from kafka import KafkaProducer

KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1

producer = KafkaProducer(bootstrap_servers=KAFKA_BROKERS)

# Must send bytes
messages = [b'hello kafka', b'I am sending', b'3 test messages']

# Send the messages
for m in messages:
    producer.send(KAFKA_TOPIC, m)

Run this script python consumer.py

This will publish these three messages to your topic.

Tip: Kafka clients work with bytes. If your messages are not already bytes you can use value_serializer to tell the client how to encode your message.

Example producer that handles json messages

producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                         bootstrap_servers=KAFKA_BROKERS)

Step 4: Create a consumer

Now we want to create a consumer so we can read back our messages. Consumers can be added at any time. In most production applications you will have N consumers subscribed to a topic. These messages will be consumed in real time as they are produced. A key feature of Kafka is the ability to read a topic starting at the earliest stored message. Any new consumer can read all historic messages when they connect. We will use that feature now.

# consumer.py

KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1

This time we’ll import KafkaConsumer

# consumer.py

from __future__ import print_function  # python 2/3 compatibility

import sys # used to exit
from kafka import KafkaConsumer

KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS, 
                         auto_offset_reset='earliest')

auto_offset_reset tells the consumer to start reading from start of the topic. If left blank it will default to current offset (only new messages).

Step 5: Read the messages:

# consumer.py

KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1
from __future__ import print_function  # python 2/3 compatibility

import sys # used to exit
from kafka import KafkaConsumer

KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS, 
                         auto_offset_reset='earliest')

try:
    for message in consumer:
        print(message.value)
except KeyboardInterrupt:
    sys.exit()

Run the script python conumer.py

> python consumer.py
hello kafka
I am sending
3 test messages

If there are other messages in your topic, those will be print out as well. Once the consumer is caught up it will sit there waiting for new messages to be sent to the subscribed topic. If you want to see your consumer in action you can open a new terminal and re-run the producer.py. New messages will show up real in real time.

You can view the code used for this tutorial in the tutorials folder of our examples repository.

Questions, feedback or find any of these steps unclear? Please contact us. We love helping people get started.