The Flink Table and SQL API with Apache Flink 1.6
Since my initial post on the Flink table and SQL API there have been some massive and, frankly, awesome changes. Let’s go over the changes that Fink 1.6 brought to the table step-by-step.
Quick Recap of the Flink Dataflow Model
Quickly, let’s go over the basic structure of the Flink APIs and the dataflow programming model it presents. There haven’t been any changes to the data flow model since 1.3 – just API refinements. The following diagram still represents the levels of abstraction Flink exposes.
Check out the documentation on the specifics. For this example, we will be using the Table API and SQL level of abstractions. The release notes are also a must read. We will focus on the Table API changes, but there are also many other changes that are huge steps forward – especially regarding state improvement and containerization support.
In a nutshell, the Table and SQL APIs have been massively reworked, simplified, and made much more powerful. We will focus on Kafka specific changes in FLINK-9846, and FLINK-8866/FLINK-8558. These changes constitute an entirely new interface for the Table API and make the entire model much easier to use easily and effectively. Other Sinks/Sources have changed in similar ways.
An example scenario
As an example, we will start with a simple python producer in Kafka. We create messages for temperature for each IoT device, in this case, a fictitious brewery. We will produce the data in three-second intervals into a Kafka topic (the source), then roll up the data, performing a simple computation by a one minute window, and emit the data back to a Kafka topic (the sink). The Flink job using the Table API handles the reads from the source, the computation, and the write to the sink. Our source data will be JSON, and so will our aggregated output data.
Let’s do this
First let’s create some topics to be the source and sink topics, then create something to generate sample data, and finally the Flink computation. For this example, we will be using the Eventador Stack – so all the steps are fairly Eventador specific, but you can also change things to run this example locally.
Prerequisites
- A Github account.
- An Eventador account. I recommend you auth with Github.
- Create a Kafka + Flink deployment on Eventador (free for 30 days).
- Kafka python driver installed.
Create topics
We need to create two topics, one for the source and one for the sink. You can use the command line tool bin/kafka-topics.sh --create....
, or the Eventador console. Check out the Eventador documentation for a step-by-step guide.
Create the following topics:
- brewery_source (replication_factor 1, partitions 1)
-
brewery_sink (replication_factor 1, partitions 1)
The data generator
This data generator utilizes the Python Kafka driver to produce data to a Kafka cluster. See the install instructions here if you don’t already have this configured for your system.
The generator code is available as a Gist, you can download it from Github and run it.
This can be run on the command line to generate data continuously. Be sure to change the connect string to the connect string exposed in the Eventador Console application. Executing the program and the resulting output should look like this:
1 2 3 4 5 6 7 |
python ./brewery_example.py connected to myserver:9092 {'sensor': 'MashTun1' 'temp': 32} {'sensor': 'MashTun2', 'temp': 82} {'sensor': 'MashTun1', 'temp': 34} {'sensor': 'MashTun2', 'temp': 37} |
Let this run for a while, or better yet, background it and let it continue to produce data to Kafka.
The Flink Job
We want to aggregate data (group by in SQL) by a time bucket, and then write the result out to Kafka. For this computation, we will write a simple Flink job using the Table API and SQL.
The Flink job is available on Github here. It can be run inside the Eventador Console directly from Github, no need to deal with Maven at all. The job is then submitted to run on a Flink cluster.
First, let’s create a project using this repo. You will need to close the example repo then specify it as a new Project in Eventador. Choose the option to import a project into Eventador. Once a project is created we can choose to run it on our Flink cluster.
Stepping through the code
There are 3 main components to our Flink job:
- Registering a table for reading the data from a Kafka topic and associating the input schema.
- Registering a table for writing the data into a Kafka topic using the table schema.
- Performing the computation – a SQL group by and insert using Flink SQL from the source table into the sink table.
Let’s step through each part:
Registering a table for reading data
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
tableEnv.connect(new Kafka() .version("0.11") .topic(params.getRequired("read-topic")) .property("bootstrap.servers", params.getRequired("bootstrap.servers"))) .withSchema(new Schema() .field("sensor", Types.STRING()) .field("temp", Types.LONG()) .field("ts", Types.SQL_TIMESTAMP()) .rowtime(new Rowtime() .timestampsFromSource() .watermarksPeriodicBounded(1000) ) ) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSource("sourceTopic"); |
We use the Flink Kafka connector to connect to Kafka and consume data. The API has been updated and is very straightforward to set properties including bootstrap.servers
, topic
, and group.id
for the consumer group to use. You can see all the options available in the docs here.
In this case, we are serializing JSON, but Avro is a popular choice as well. We use .withFormat(new Json().deriveSchema())
to derive the schema of the table to create the JSON schema.
There are properties that can be set that control what offset to use to start reading from the Kafka topic – .startFromEarliest()
and .startFromLatest()
are a couple of common ones.
Timestamps are of particular importance when reading data from Kafka. In this example, we create a virtual column and populate it via .rowtime()
creating a new Rowtime
object and populating it using Kafka timestamps with .timestampsFromSource()
. Alternatively, using a time value from the data is possible via .timestampsFromField()
. For more information on setting timestamps from Kafka, check out the docs here.
Registering a table for writing data
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
tableEnv.connect(new Kafka() .version("0.11") .topic(params.getRequired("write-topic")) .property("bootstrap.servers", params.getRequired("bootstrap.servers")) .sinkPartitionerRoundRobin()) .withSchema(new Schema() .field("sensor", Types.STRING()) .field("tumbleStart", Types.SQL_TIMESTAMP()) .field("tumbleEnd", Types.SQL_TIMESTAMP()) .field("avgTemp", Types.LONG()) ) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSink("sinkTopic"); |
Registering a table as a sink is very similar to registering a table as a source. It’s important to point out that the sink schema should match the output columns and datatypes from the SQL being run. In this case, we are doing a group by and outputting a key sensor
, an aggregate column avgTemp
and a couple of timestamps hopStart
and hopEnd
.
When using Kafka, it expects a partitioning strategy. By default it will use .sinkPartitionerFixed()
but .sinkPartitionerRoundRobin()
is what we use in this example. It’s also possible to specify a custom partitioner via .sinkPartitionerCustom(MyCustom.class)
.
Perform the computation – SQL group by using Flink SQL.
1 2 3 4 5 6 7 8 9 10 11 |
String sql = "INSERT INTO sinkTopic " + "SELECT sensor, " + "TUMBLE_START(ts, INTERVAL '1' MINUTE) as tumbleStart, " + "TUMBLE_END(ts, INTERVAL '1' MINUTE) as tumbleEnd, " + "AVG(temp) AS avgTemp " + "FROM sourceTopic " + "WHERE sensor IS NOT null " + "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), sensor"; tableEnv.sqlUpdate(sql); |
Lastly, we perform a SQL statement that continuously pulls data from the source topic FROM
and inserts it into the sink topic INTO
. The output columns must match the registered table columns in order and name. It’s possible to use string concatenation to specify tables like "INSERT INTO " + params.getRequired("write-topic") + "..."
.
The SQL statement is declared as a string then executes via sqlUpdate()
, which is a bit new compared to simply using the sqlQuery()
method – it makes it a simple one-step process to aggregate data and write it to a tableSink in one step.
The output data
The output data looks similar to the below output – an output per sensor
averaging temp
as avgTemp
over the time period in JSON format. As an example, if you use kafkacat to sample the data, you should see two new documents every minute with the average data over that period. The difference between tumbleStart
and tumbleEnd
will be one minute.
1 2 3 |
{"sensor":"MashTun1","tumbleStart":1542420360000,"tumbleEnd":1542420420000,"avgTemp":62} {"sensor":"MashTun2","tumbleStart":1542420360000,"tumbleEnd":1542420420000,"avgTemp":59} |
Putting it together
Let’s take a peek at the entire Flink job together.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package io.eventador; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.descriptors.Rowtime; import java.util.Properties; import java.util.UUID; public class FlinkKafkaTable { public static void main(String[] args) throws Exception { // Read parameters from command line final ParameterTool params = ParameterTool.fromArgs(args); if(params.getNumberOfParameters() < 4) { System.out.println("\nUsage: FlinkReadKafka --read-topic <topic> --write-topic <topic> --bootstrap.servers <kafka brokers> --group.id <groupid>"); return; } Properties kparams = params.getProperties(); kparams.setProperty("auto.offset.reset", "earliest"); kparams.setProperty("flink.starting-position", "earliest"); kparams.setProperty("group.id", UUID.randomUUID().toString()); // setup streaming environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); env.enableCheckpointing(300000); // 300 seconds env.getConfig().setGlobalJobParameters(params); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); tableEnv.connect(new Kafka() .version("0.11") .topic(params.getRequired("read-topic")) .property("bootstrap.servers", params.getRequired("bootstrap.servers"))) .withSchema(new Schema() .field("sensor", Types.STRING()) .field("temp", Types.LONG()) .field("ts", Types.SQL_TIMESTAMP()) .rowtime(new Rowtime() .timestampsFromSource() .watermarksPeriodicBounded(1000) ) ) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSource("sourceTopic"); tableEnv.connect(new Kafka() .version("0.11") .topic(params.getRequired("write-topic")) .property("bootstrap.servers", params.getRequired("bootstrap.servers")) .sinkPartitionerRoundRobin()) .withSchema(new Schema() .field("sensor", Types.STRING()) .field("tumbleStart", Types.SQL_TIMESTAMP()) .field("tumbleEnd", Types.SQL_TIMESTAMP()) .field("avgTemp", Types.LONG()) ) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSink("sinkTopic"); String sql = "INSERT INTO sinkTopic " + "SELECT sensor, " + "TUMBLE_START(ts, INTERVAL '1' MINUTE) as tumbleStart, " + "TUMBLE_END(ts, INTERVAL '1' MINUTE) as tumbleEnd, " + "AVG(temp) AS avgTemp " + "FROM sourceTopic " + "WHERE sensor IS NOT null " + "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), sensor"; tableEnv.sqlUpdate(sql); env.execute("FlinkKafkaTable"); } } |
You code is working like charm. Thanks a lot. Can we able to define parallelism for the kafka connector? My data input rate is very high
Ravi,
Thanks for the comment. I think the correct way to handle this is to run the job itself with a level of parallelism (Execution Environment Level). Flink does a great job of managing this as well as the associated state, save point, etc that comes along with running something like this in a parallel fashion. https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/parallel.html. So I would set
setParallelism()
to the number of task managers configured. On the Eventador ESP platform you can add as many task managers as needed under the task managers tab. Hope this helps.Hi Kenny,
A good article. Would really appreciate a blog on Elasticsearch Sink too 🙂