The Flink Table and SQL API with Apache Flink 1.6

Since my initial post on the Flink table and SQL API there have been some massive and, frankly, awesome changes. Let’s go over the changes that Fink 1.6 brought to the table step-by-step.

Quick Recap of the Flink Dataflow Model

Quickly, let’s go over the basic structure of the Flink APIs and the dataflow programming model it presents. There haven’t been any changes to the data flow model since 1.3 – just API refinements. The following diagram still represents the levels of abstraction Flink exposes.

Apache Flink Levels of Abstraction

Check out the documentation on the specifics. For this example, we will be using the Table API and SQL level of abstractions. The release notes are also a must read. We will focus on the Table API changes, but there are also many other changes that are huge steps forward – especially regarding state improvement and containerization support.

In a nutshell, the Table and SQL APIs have been massively reworked, simplified, and made much more powerful. We will focus on Kafka specific changes in FLINK-9846, and FLINK-8866/FLINK-8558. These changes constitute an entirely new interface for the Table API and make the entire model much easier to use easily and effectively. Other Sinks/Sources have changed in similar ways.

An example scenario

As an example, we will start with a simple python producer in Kafka. We create messages for temperature for each IoT device, in this case, a fictitious brewery. We will produce the data in three-second intervals into a Kafka topic (the source), then roll up the data, performing a simple computation by a one minute window, and emit the data back to a Kafka topic (the sink). The Flink job using the Table API handles the reads from the source, the computation, and the write to the sink. Our source data will be JSON, and so will our aggregated output data.

Apache Flink Diagram

Let’s do this

First let’s create some topics to be the source and sink topics, then create something to generate sample data, and finally the Flink computation. For this example, we will be using the Eventador Stack – so all the steps are fairly Eventador specific, but you can also change things to run this example locally.


Create topics

We need to create two topics, one for the source and one for the sink. You can use the command line tool bin/ --create...., or the Eventador console. Check out the Eventador documentation for a step-by-step guide.

Create the following topics:

  • brewery_source (replication_factor 1, partitions 1)
  • brewery_sink (replication_factor 1, partitions 1)

The data generator

This data generator utilizes the Python Kafka driver to produce data to a Kafka cluster. See the install instructions here if you don’t already have this configured for your system.

The generator code is available as a Gist, you can download it from Github and run it.

This can be run on the command line to generate data continuously. Be sure to change the connect string to the connect string exposed in the Eventador Console application. Executing the program and the resulting output should look like this:

Let this run for a while, or better yet, background it and let it continue to produce data to Kafka.

The Flink Job

We want to aggregate data (group by in SQL) by a time bucket, and then write the result out to Kafka. For this computation, we will write a simple Flink job using the Table API and SQL.

The Flink job is available on Github here. It can be run inside the Eventador Console directly from Github, no need to deal with Maven at all. The job is then submitted to run on a Flink cluster.

First, let’s create a project using this repo. You will need to close the example repo then specify it as a new Project in Eventador. Choose the option to import a project into Eventador. Once a project is created we can choose to run it on our Flink cluster.

Stepping through the code

There are 3 main components to our Flink job:

  • Registering a table for reading the data from a Kafka topic and associating the input schema.
  • Registering a table for writing the data into a Kafka topic using the table schema.
  • Performing the computation – a SQL group by and insert using Flink SQL from the source table into the sink table.

Let’s step through each part:

Registering a table for reading data

We use the Flink Kafka connector to connect to Kafka and consume data. The API has been updated and is very straightforward to set properties including bootstrap.servers, topic, and for the consumer group to use. You can see all the options available in the docs here.

In this case, we are serializing JSON, but Avro is a popular choice as well. We use .withFormat(new Json().deriveSchema()) to derive the schema of the table to create the JSON schema.

There are properties that can be set that control what offset to use to start reading from the Kafka topic – .startFromEarliest() and .startFromLatest() are a couple of common ones.

Timestamps are of particular importance when reading data from Kafka. In this example, we create a virtual column and populate it via .rowtime() creating a new Rowtime object and populating it using Kafka timestamps with .timestampsFromSource(). Alternatively, using a time value from the data is possible via .timestampsFromField(). For more information on setting timestamps from Kafka, check out the docs here.

Registering a table for writing data

Registering a table as a sink is very similar to registering a table as a source. It’s important to point out that the sink schema should match the output columns and datatypes from the SQL being run. In this case, we are doing a group by and outputting a key sensor, an aggregate column avgTemp and a couple of timestamps hopStart and hopEnd.

When using Kafka, it expects a partitioning strategy. By default it will use .sinkPartitionerFixed() but .sinkPartitionerRoundRobin() is what we use in this example. It’s also possible to specify a custom partitioner via .sinkPartitionerCustom(MyCustom.class).

Perform the computation – SQL group by using Flink SQL.

Lastly, we perform a SQL statement that continuously pulls data from the source topic FROM and inserts it into the sink topic INTO. The output columns must match the registered table columns in order and name. It’s possible to use string concatenation to specify tables like "INSERT INTO " + params.getRequired("write-topic") + "...".

The SQL statement is declared as a string then executes via sqlUpdate(), which is a bit new compared to simply using the sqlQuery() method – it makes it a simple one-step process to aggregate data and write it to a tableSink in one step.

The output data

The output data looks similar to the below output – an output per sensor averaging temp as avgTemp over the time period in JSON format. As an example, if you use kafkacat to sample the data, you should see two new documents every minute with the average data over that period. The difference between tumbleStart and tumbleEnd will be one minute.

Putting it together

Let’s take a peek at the entire Flink job together.

3 Responses to The Flink Table and SQL API with Apache Flink 1.6

  1. Ravi Sankar

    You code is working like charm. Thanks a lot. Can we able to define parallelism for the kafka connector? My data input rate is very high

    • Kenny Gorman


      Thanks for the comment. I think the correct way to handle this is to run the job itself with a level of parallelism (Execution Environment Level). Flink does a great job of managing this as well as the associated state, save point, etc that comes along with running something like this in a parallel fashion. So I would set setParallelism() to the number of task managers configured. On the Eventador ESP platform you can add as many task managers as needed under the task managers tab. Hope this helps.

  2. code4fun

    Hi Kenny,

    A good article. Would really appreciate a blog on Elasticsearch Sink too 🙂

Leave a Reply

Your email address will not be published. Required fields are marked *