Kafka-Python is a Python client Apache Kafka distributed stream processing system. Kafka-python is designed to function much like the official java client, with a sprinkling of pythonic interfaces (e.g., consumer iterators).
kafka-python
via pip pip install kafka-python
Plain Text Endpoint
.Note: make sure you have added the IP address of your machine or you will not be able to access your cluster (see “Add an ACL” in the getting started guide).
A producer is a Kafka client that publishes records to the Kafka cluster. KafkaProducer has many different configurations which you can read about here. For this example we will stick with a basic producer.
producer.py
demo
topic.Tip: If you have a single broker (developer plan) this will be a single endpoint ending in eventador.io:9092
. If you have a plan with multiple brokers, you can add additional endpoints separated by a comma. Ex: 'endpoint0:9092,endpoint1:9092,endpoint2:9092'
# producer.py
KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1
Now we add a the producer. You will need to import KafkaProducer
# producer.py
from kafka import KafkaProducer
KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKERS)
# producer.py
from kafka import KafkaProducer
KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKERS)
# Must send bytes
messages = [b'hello kafka', b'I am sending', b'3 test messages']
# Send the messages
for m in messages:
producer.send(KAFKA_TOPIC, m)
Run this script python consumer.py
This will publish these three messages to your topic.
Tip: Kafka clients work with bytes. If your messages are not already bytes you can use value_serializer
to tell the client how to encode your message.
Example producer that handles json messages
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=KAFKA_BROKERS)
Now we want to create a consumer so we can read back our messages. Consumers can be added at any time. In most production applications you will have N consumers subscribed to a topic. These messages will be consumed in real time as they are produced. A key feature of Kafka is the ability to read a topic starting at the earliest stored message. Any new consumer can read all historic messages when they connect. We will use that feature now.
consumer.py
# consumer.py
KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1
This time we’ll import KafkaConsumer
# consumer.py
from __future__ import print_function # python 2/3 compatibility
import sys # used to exit
from kafka import KafkaConsumer
KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1
consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS,
auto_offset_reset='earliest')
auto_offset_reset
tells the consumer to start reading from start of the topic. If left blank it will default to current offset (only new messages).
# consumer.py
KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1
from __future__ import print_function # python 2/3 compatibility
import sys # used to exit
from kafka import KafkaConsumer
KAFKA_TOPIC = 'demo'
KAFKA_BROKERS = '<value_from_plain_text_endpoint>' # see step 1
consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS,
auto_offset_reset='earliest')
try:
for message in consumer:
print(message.value)
except KeyboardInterrupt:
sys.exit()
Run the script python conumer.py
> python consumer.py
hello kafka
I am sending
3 test messages
If there are other messages in your topic, those will be print out as well. Once the consumer is caught up it will sit there waiting for new messages to be sent to the subscribed topic. If you want to see your consumer in action you can open a new terminal and re-run the producer.py
. New messages will show up real in real time.
You can view the code used for this tutorial in the tutorials folder of our examples repository.
Questions, feedback or find any of these steps unclear? Please contact us. We love helping people get started.