Joins are an important and powerful part of the SQL language. You can perform joins in SQLStreamBuilder to enrich data and create net new streams of useful data. You can also direct the results to a sink conditionally (sort of an opposite join).
For these examples we are using our ADS-B dataset, and we are enriching the data based on various aircraft attributes for a variety of hypothetical national security, airspace management, and efficiency management use cases. In a nutshell, the airplanes Kafka topic is streaming aircraft telemetry, and we want to join the data against various enrichment sources using ICAO (primary key) of the data.
Let’s dive in.
A note on Virtual Tables in SQLStreamBuilder
It’s important to understand that when using SQLStreamBuilder you have two options when defining a job:
- Create and execute SQL – with results sampled to the screen only
- Create and execute SQL – with the results being sent to a virtual table sink.
Today we support both Kafka and S3 sinks. Because both a source (the query predicate) and the sink are virtual tables they can be different clusters and even of mixed type! Thus, one query can span multiple virtual tables, but may only have one sink (currently). To logically split output into multiple sinks define one job per sink.
Regular SQL joins vs streaming SQL joins
A SQL join is weaving together two data tables using a common key. You specify all the tables required to fulfill the query and specify the condition to join them on. There always must be one less join condition than number of tables. For example, if you
SELECT two tables of data, then you must specify one
ON clause to join them.
Streaming joins aren’t much different, except we are joining streams not tables of data. SQLStreamBuilder uses Apache Calcite/Apache Flink SQL grammar, here is a simple example:
|SELECT a.cost, b.item|
|FROM prices a|
|JOIN items b|
|ON a.item_id = b.item_id;|
SQLStreamBuilder supports all the different types of joins available in Flink itself, including the following types (Flink 1.9+):
tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
All of them joining on the same condition:
joinCondition: ON booleanExpression | USING '(' column [, column ]* ')'
But joins can be tricky when they are performed in a streaming context. When using time-windowed joins, care must be taken to ensure the join is actually possible and makes sense for the use case. Join conditions are also hard to debug; because, it can be tricky to identify when a condition is not met, often times you simply don’t see any results, and it can be very confusing. Luckily SQLStreamBuilder gives rich and instant feedback to help you get the query correct.
Joining streams from two Kafka topics
First let’s look at a join of the input ADS-B source and a topic to enrich the data based on the ICAO field. Maybe we want to join the data to get the airframe registration details and understand the country of origin of each aircraft along with it’s type:
|-- display reg and country for aircraft|
|SELECT a.icao, a.speed, b.type_aircraft, b.country|
|FROM airplanes a -- first table/stream|
|JOIN airplanes_registrations b -- second table/stream|
|ON a.icao = b.icao; -- join key|
Joining streams from two topics from different Kafka clusters
Using SQLStreamBuilder we can also join data from two entirely different Kafka clusters. Maybe we have a topic on a different cluster that indicates aircraft of interest via a simple message, and we want to know when one of those aircraft is in our airspace. In SQLStreamBuilder, it’s simply a matter of setting up two virtual tables on two different clusters as sources. This can be done with a time window as well.
We often refer to these joins as “HyperJoins” because of how powerful they are.
|-- find aircraft that have been flagged for further scrutiny|
|SELECT a.icao, a.lat, a.lon, a.type_aircraft, a.type_registrant|
|FROM airplanes a, -- virtual table for aircraft ADS-B cluster|
|airplanes_alert_notifications b -- virtual table for notifications cluster|
|WHERE a.icao = b.icao|
|AND a.eventTimestamp -- kafka timestamp|
|BETWEEN current_timestamp - INTERVAL '5' HOUR|
|AND current_timestamp -- last 5 hours of data|
Routing data using SQL
SQLStreamBuilder is designed to support various sources (Kafka today) and Sinks (Kafka, S3, etc), and data is routed between the two. You can perform a join reading two different sources with a Continuous SQL statement, and you can also conditionally write or route data via a Continuous SQL statement. Let’s say, for example, you want to route all data of aircraft with the new, lower noise Leap engines to S3 where we load the data into Snowflake. We can accomplish this with a simple SQLStreamBuilder job.
|-- job to route LEAP engine aircraft to S3 bucket|
|SELECT a.icao, a.speed, a.altitude, a.lat, a.lon, a.mfr_mdl_code,|
|b.mfr as aircraft_mfr, c.mfr as engine_mfr|
|FROM airplanes a,|
|WHERE a.icao = b.icao|
|AND b.mfr_mdl_code = c.code|
|AND c.mfr = 'CFM INTL'|
|AND c.model like 'LEAP%' -- code name for new quiet and fuel efficient motors|
In order to route this data to an S3 sink, we need to create a new S3 sink via the SQLStreamBuilder interface, and assign it a bucket and simple ruleset for file chunking:
Conversely, maybe we want to route all non-US military air traffic to a machine learning application fed via compacted Kafka topic. We could both filter data and define the sink topic partition key via:
|-- job to identify and route military aircraft to ML pipeline|
|SELECT a.icao as _eventKey, -- New Kafka partition key|
|a.lat, b.lon, b.mfr, b.model, b.type_acft|
|ON a.icao = b.icao|
|WHERE a.type_registrant = 5 -- Government registration|
|AND country <> 'US' -- US registrations only|
Try it yourself
If you want to play with this example, get yourself setup with a free Eventador.io SQLStreamBuilder account. You can create an environment and start running Continuous SQL against your topics in just a few steps.