Apache Flink offers two simple API’s for accessing streaming data with declarative semantics - The table and SQL API’s. In this post we dive in an build a simple processor in Java using these relatively new API’s.
Flink offers various API’s for accessing streaming data. Let’s go over these basic API abstractions. The Core API’s offer the most robust feature coverage, and is the most mature. The Table and SQL API’s offer a more approachable and declarative design, but are newer.
For this example we use our example data generator - Planestream. The generated ADS-B data has the property of having sparse messages where not every message has every key in it. Thus, one can’t just expect that each message can be processed by a common key. But what if our downstream system expects particular keys, perhaps because it’s plotting altitude over time? We need to write a simple data processor to ensure we have high quality data for some downstream sink.
Our processor needs to ignore messages without the altitude key, as well as shorten the message to only the specific keys we are interested in.
In our case our source data is in Kafka, and the sink/destination is also Kafka. We need a simple way to read data from a Kafka topic, ensure that a key is present, and then write a subset of fields into another Kafka topic.
There are some core components we use to access and process the data. Let’s talk about these components of the processor first, then I will show the entire processor program. This should help you quickly absorb the core meat of the program and differentiate it from the basic scaffolding of a Flink program.
To get the data we need to configure a source. Flink provides a few sources, in our case because the data is already JSON and it’s in a Kafka topic the KafkaXXJsonTableSource sink works perfectly. It accepts a topic, properties, and the a ROW that describes the JSON documents being passed through Kafka as input and returns a KafkaTableSource object.
KafkaJsonTableSource kafkaTableSource = new Kafka09JsonTableSource( params.getRequired("read-topic"), params.getProperties(), typeInfo );
After that we want to specify, via SQL, the data that we want from the KafkaTableSource object. We register the tableSource and pass it the a logical name that we use in the query (the FROM clause) and the tableSource object we are reading from, then we call the SQL method passing in our SQL string. It’s important to remember this is registering at run time SQL query against a dynamic data table. This is always running against the stream, not a singleton point in time query.
String sql = "SELECT timestamp_verbose, icao, lat, lon, altitude " + "FROM flights " + "WHERE altitude <> '' "; tableEnv.registerTableSource("flights", kafkaTableSource); Table result = tableEnv.sql(sql);
Now we have a result table object that is our filtered data. We need to write that to a Kafka topic. Again, we are going to use the KafkaXXJsonTableSink class. We pass it the topic to write to, properties, and a partition object. The partition object is simply the type of partitioning to use when writing the data to Kafka. Finally, we call the writeToSink method on the result specifying the sink to write to.
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( params.getRequired("write-topic"), params.getProperties(), partition ); result.writeToSink(kafkaTableSink);
Thats it. That’s our processor in a nutshell.
Let’s see the entire thing put together in a functional program. At a high level the anatomy of a Flink program that uses the Flink table API is detailed here (for 1.3). You can find the entire source of the project on Github. Feel free to fork it and experiment on your own.
The Flink Table API + SQL is very powerful and quite compact in terms of amount of code required to create powerful stream processing applications. It’s pretty easy to get started and build from there. Be sure to check out the docs here (1.3) for additional reading.
As always if you have questions feel free to ping us at email@example.com.