Joining Kafka Streams Using SQL to Enrich and Route Data

October 23, 2019 in Apache Kafka and SQL

Joining Kafka Streams Using SQL to Enrich and Route Data
Joining Kafka Streams Using SQL | Eventador Blog

Joins are an important and powerful part of the SQL language. You can perform joins in SQLStreamBuilder to enrich data and create net new streams of useful data. You can also direct the results to a sink conditionally (sort of an opposite join).

For these examples we are using our ADS-B dataset, and we are enriching the data based on various aircraft attributes for a variety of hypothetical national security, airspace management, and efficiency management use cases. In a nutshell, the airplanes Kafka topic is streaming aircraft telemetry, and we want to join the data against various enrichment sources using ICAO (primary key) of the data.

Let’s dive in.

A note on Virtual Tables in SQLStreamBuilder

It’s important to understand that when using SQLStreamBuilder you have two options when defining a job:

  1. Create and execute SQL – with results sampled to the screen only
  2. Create and execute SQL – with the results being sent to a virtual table sink.

Today we support both Kafka and S3 sinks. Because both a source (the query predicate) and the sink are virtual tables they can be different clusters and even of mixed type! Thus, one query can span multiple virtual tables, but may only have one sink (currently). To logically split output into multiple sinks define one job per sink.

Regular SQL joins vs streaming SQL joins

A SQL join is weaving together two data tables using a common key. You specify all the tables required to fulfill the query and specify the condition to join them on. There always must be one less join condition than number of tables. For example, if you SELECT two tables of data, then you must specify one ON clause to join them.

Streaming joins aren’t much different, except we are joining streams not tables of data. SQLStreamBuilder uses Apache Calcite/Apache Flink SQL grammar, here is a simple example:

SELECT a.cost, b.item
FROM prices a
JOIN items b
ON a.item_id = b.item_id;
view raw joins.sql hosted with ❤ by GitHub

SQLStreamBuilder supports all the different types of joins available in Flink itself, including the following types (Flink 1.9+):

  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

All of them joining on the same condition:

  ON booleanExpression
  | USING '(' column [, column ]* ')'

But joins can be tricky when they are performed in a streaming context. When using time-windowed joins, care must be taken to ensure the join is actually possible and makes sense for the use case. Join conditions are also hard to debug; because, it can be tricky to identify when a condition is not met, often times you simply don’t see any results, and it can be very confusing. Luckily SQLStreamBuilder gives rich and instant feedback to help you get the query correct.

Joining streams from two Kafka topics

First let’s look at a join of the input ADS-B source and a topic to enrich the data based on the ICAO field. Maybe we want to join the data to get the airframe registration details and understand the country of origin of each aircraft along with it’s type:

-- display reg and country for aircraft
SELECT a.icao, a.speed, b.type_aircraft,
FROM airplanes a -- first table/stream
JOIN airplanes_registrations b -- second table/stream
ON a.icao = b.icao; -- join key

Joining streams from two topics from different Kafka clusters

Using SQLStreamBuilder we can also join data from two entirely different Kafka clusters. Maybe we have a topic on a different cluster that indicates aircraft of interest via a simple message, and we want to know when one of those aircraft is in our airspace. In SQLStreamBuilder, it’s simply a matter of setting up two virtual tables on two different clusters as sources. This can be done with a time window as well.

We often refer to these joins as “HyperJoins” because of how powerful they are.

-- find aircraft that have been flagged for further scrutiny
SELECT a.icao,, a.lon, a.type_aircraft, a.type_registrant
FROM airplanes a, -- virtual table for aircraft ADS-B cluster
airplanes_alert_notifications b -- virtual table for notifications cluster
WHERE a.icao = b.icao
AND a.eventTimestamp -- kafka timestamp
BETWEEN current_timestamp - INTERVAL '5' HOUR
AND current_timestamp -- last 5 hours of data
view raw hyperjoins.sql hosted with ❤ by GitHub

Routing data using SQL

SQLStreamBuilder is designed to support various sources (Kafka today) and Sinks (Kafka, S3, etc), and data is routed between the two. You can perform a join reading two different sources with a Continuous SQL statement, and you can also conditionally write or route data via a Continuous SQL statement. Let’s say, for example, you want to route all data of aircraft with the new, lower noise Leap engines to S3 where we load the data into Snowflake. We can accomplish this with a simple SQLStreamBuilder job.

-- job to route LEAP engine aircraft to S3 bucket
SELECT a.icao, a.speed, a.altitude,, a.lon, a.mfr_mdl_code,
b.mfr as aircraft_mfr, c.mfr as engine_mfr
FROM airplanes a,
airplanes_registrations b,
airplanes_engines c
WHERE a.icao = b.icao
AND b.mfr_mdl_code = c.code
AND c.mfr = 'CFM INTL'
AND c.model like 'LEAP%' -- code name for new quiet and fuel efficient motors
view raw jointos3.sql hosted with ❤ by GitHub

In order to route this data to an S3 sink, we need to create a new S3 sink via the SQLStreamBuilder interface, and assign it a bucket and simple ruleset for file chunking:

Using S3 as a sink in SQLStreamBuilder

Conversely, maybe we want to route all non-US military air traffic to a machine learning application fed via compacted Kafka topic. We could both filter data and define the sink topic partition key via:

-- job to identify and route military aircraft to ML pipeline
SELECT a.icao as _eventKey, -- New Kafka partition key, b.lon, b.mfr, b.model, b.type_acft
FROM airplanes
JOIN airplanes_registrations
ON a.icao = b.icao
WHERE a.type_registrant = 5 -- Government registration
AND country <> 'US' -- US registrations only
view raw filteranddefine.sql hosted with ❤ by GitHub

Try it yourself

If you want to play with this example, get yourself setup with a free SQLStreamBuilder account. You can create an environment and start running Continuous SQL against your topics in just a few steps.

Leave a Reply

Your email address will not be published. Required fields are marked *