Connecting to Kafka

A basic example of connecting to Apache Kakfa


by Kenny Gorman, Founder and CEO
   15 Dec 2016

Getting connected and producing (publishing) and consuming (subscribing) messages is relatively easy in Apache Kafka. In this post we will go over connecting, producing a simple message, and consuming that message via one of a couple native python clients. Most languages are similar and there are a host of native drivers to choose from.

For this example we will use kafka-python. We need to first install the library:

pip install kafka-python

Producing

Producing a message, also called publishing a message is sending a message to Kafka.

Ok, so let’s write a simple producer. We need to import the library. The producer works asynchronously, and thus returns as soon as data is sent to it. There are multiple possible modes to use the driver with, consults the docs for more information. In this case we are going to use JSON so we want to include that library as will use it later to simplify serialization of dictionaries to JSON.

from kafka import KafkaProducer
import json

Now, let’s create a producer object, in this case we want to send a JSON payload:

producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                         bootstrap_servers=localhost:9092)

And send some data to it, because we set the value_serializer to use json.dumps to serialize a dictionary to json, we can send native dictionaries without worry. We need to specify the topic we want to produce to, and then specify the actual data being sent.

kafka_topic="testing123"
kafka_payload={"water_depth":432}

producer.send(kafka_topic, kafka_payload)

You may want to occasionally call flush to make sure that all messages are getting sent to the server. If you are looping over some data structure perhaps you do this every N loops, or perhaps at the end of the entire block of code. Flushing is done like this:

producer.flush()

Consuming

Consuming data is done by listening for messages on a particular topic (subscribing).

The consuming code is is similar to producing - with a couple more available options. In this case, the python client automatically handles partition assignment and the automatic handling of offsets. Available options are in the docs.

First the import:

from kafka import KafkaConsumer

Now let’s setup a consumer object, and grab some data:

kafka_topic="testing123"

consumer = KafkaConsumer(kafka_topic, bootstrap_servers=localhost:9092)

for msg in consumer:
    print msg

It should be noted that messages need to be consumed continuously, so the code should be designed to continuously fetch data and process it in some way. In this case we simply connect and return messages to the current offset and stop. A real use case would likely have a more continuous construct.

There are a myriad of options available both for producing and consuming data that you should check out as you build applications around Kafka. There is also a huge number of clients available for many programming languages. You can check out our examples to see a full example and how other clients work.