0.8: PrestoDB & Eventador.io

Eventador.io now has PrestoDB baked in


by Kenny Gorman, Founder and CEO
   19 Feb 2017

Since the very first release of Eventador.io we have had a SQL interface. We strongly believe that SQL is an incredible language for dealing with streaming data. Not only semantically, but also because it opens up a whole ecosystem of tools and utilities. As we have grown and gathered customer feedback over the last number of months, we have heard a consistent sentiment: Give us PrestoDB!. So, as of Eventador 0.8 we are replacing PipelineDB with PrestoDB 0.166 in our stack going forward. It turns out, what happens when you pair Kafka and PrestoDB is pretty magical.

What is PrestoDB?

If you aren’t already familiar, PrestoDB is an open source project that was initially created at Facebook. In a nutshell, it’s a distributed SQL query engine designed to query large datasets on heterogeneous data sources. In our case one of those data sources is your Kafka stream.

PrestoDB allows you to issue simple SQL calls against the live Kafka stream and view the data in real-time. One of the problems with pub/sub messaging is that it’s hard to get a quick glimpse of what data is actually coming over the pipeline at any given point. A simple SQL interface allows the developer or engineer to quickly inspect the data for correctness, spot trends, perform aggregations, maths, or utilize the myriad of built in functions.

PrestoDB on Eventador.io

In building PrestoDB into our service, we took the design philosophy to keep it as seamless as possible. After all, one of the main benefits of using SQL against streaming data is simplicity, and we didn’t want to detract from that in any way. PrestoDB comes with a Kafka connector, and we make use of that feature. But, as a service, we allow for easy creation of topics. This presented a problem with PrestoDB’s configuration. So we wrote a patch for PrestoDB (PR forthcoming) to understand this and dynamically update the catalog based on the current topic list in Kafka. This is all seamless to the end user. If you write data to a Kafka topic, then you can immediately query that data from PrestoDB!

Here is how it all looks in practice:

presto architecture

How to use PrestoDB on Eventador.io

First let’s talk how to get up and running with PrestoDB. When you create a deployment on Eventador.io you automatically get a PrestoDB endpoint deployed. The endpoint is protected behind the same whitelist ACL you created for your Kafka endpoints. To start accessing data via SQL, you can either use the command like client (CLI) to run queries or use the built in SQL Query Interface.

To use the CLI, you must first install it on your machine. The CLI connect string is listed under deployments->connections in the Eventador.io console. Simply copy/paste that into your terminal and start running queries.

To use the built in SQL Query Interface there is zero setup, just simply go over to the deployments->SQL tab and start typing SQL!

Let’s run some queries

PrestoDB has been pre-configured to speak to your Kafka cluster as well as dynamically sensing changes to your Kafka topics. Kafka topics are represented as tables, but PrestoDB has no physical storage backing these tables. They are simply logical views of the topics at any one point in time.

PrestoDB has a rich set of SQL operators and functions. You can find the complete list in the PrestoDB documentation.

The easiest way to explain how awesome PrestoDB is when paired with Apache Kafka is to show some examples. In this case we are inspecting a pipeline of data streaming off aircraft. It’s common to use JSON as the message payload so thats what we are doing here.

-- Show tables
SHOW TABLES;
       Table        
--------------------
 __consumer_offsets
 aircraft

-- describe the stream, this is a topic, no setup needed, it just shows up in PrestoDB.
DESC aircraft;

Column       |  Type   | Extra |                   Comment                   
-------------------+---------+-------+---------------------------------------------
_partition_id     | bigint  |       | Partition Id                                
_partition_offset | bigint  |       | Offset for the message within the partition
_segment_start    | bigint  |       | Segment start offset                        
_segment_end      | bigint  |       | Segment end offset                          
_segment_count    | bigint  |       | Running message count per segment           
_key              | varchar |       | Key text                                    
_key_corrupt      | boolean |       | Key data is corrupt                         
_key_length       | bigint  |       | Total number of key bytes                   
_message          | varchar |       | Message text                                
_message_corrupt  | boolean |       | Message data is corrupt                     
_message_length   | bigint  |       | Total number of message bytes               
(21 rows)

Query 20170220_231734_00125_cfg2h, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [21 rows, 1.75KB] [72 rows/s, 6.05KB/s]

-- how many messages are in kafka?
SELECT count(*) FROM aircraft;
  _col0  
---------
 1229797
(1 row)

Query 20170220_231804_00127_cfg2h, FINISHED, 1 node
Splits: 81 total, 81 done (100.00%)
0:07 [1.23M rows, 258MB] [186K rows/s, 39MB/s]

-- query it again, see the data changed?
SELECT count(*) FROM aircraft;
  _col0  
---------
 1229802
(1 row)

Query 20170220_231926_00130_cfg2h, FINISHED, 1 node
Splits: 81 total, 81 done (100.00%)
0:06 [1.23M rows, 258MB] [190K rows/s, 39.9MB/s]

-- look at the message field for the data payload, in this case JSON
presto:default>SELECT _message FROM aircraft LIMIT 1;
_message
--------
{"flight": "", "timestamp_verbose": "2017-02-07 15:50:33.022883", "msg_type": "8", "track": "", "timestamp": 1486504233, "altitude": "", "counter": 1360, "lon": "", "icao": "A0C6D9", "vr": "", "lat": "", "speed": ""}

-- pull out the data from JSON and see the timespan the messages are from.
SELECT
MIN(from_unixtime(try_cast(json_extract(_message, '$.timestamp') as integer))) as min,
MAX(from_unixtime(try_cast(json_extract(_message, '$.timestamp') as integer))) as max
FROM aircraft;
           min           |           max           
-------------------------+-------------------------
 2017-02-07 15:36:21.000 | 2017-02-20 17:21:15.000
(1 row)

Query 20170220_232123_00133_cfg2h, FINISHED, 1 node
Splits: 81 total, 81 done (100.00%)
0:12 [1.23M rows, 258MB] [104K rows/s, 21.8MB/s]

-- now, select the average speed by day
SELECT
round(avg(try_cast(json_extract(_message, '$.speed') as integer))) as avg_speed,
date_format(from_unixtime(try_cast(json_extract(_message, '$.timestamp') as integer)), '%d') as thedate
FROM aircraft
GROUP BY 2
ORDER BY 2;
avg_speed | thedate
-----------+---------
    272.0 | 07      
    287.0 | 08      
    269.0 | 09      
    279.0 | 10      
    234.0 | 11      
    271.0 | 12      
    315.0 | 13      
    284.0 | 14      
    269.0 | 15      
    269.0 | 16      
    289.0 | 17      
    250.0 | 18      
    296.0 | 19      
    285.0 | 20      
(14 rows)

Query 20170220_232211_00134_cfg2h, FINISHED, 1 node
Splits: 113 total, 113 done (100.00%)
0:27 [1.23M rows, 258MB] [45.7K rows/s, 9.6MB/s]

We’ve created a repository with some examples you can use to build your own SQL queries to run against your data pipeline.

Using PrestoDB inside Eventador Notebooks

Eventador notebooks have been available since 0.5, and for 0.8 we are updating the helper code to be compatible with PrestoDB. This means it’s easy to pull data out of Kafka via PrestoDB and use it to perform analysis, create models, perform experiments, or create reports. For instance, pull some data from the stream and push it directly into a dataframe in python.

prestonotebook

Third party tools and the larger SQL ecosystem

Because we expose the PrestoDB port like any other service, you can wire up any number of third party services or use your existing reporting tooling against real-time streaming data.

modeimage

For instance, check out services and tools like:

Or use ODBC and JDBC connectors via your own application or tooling:

Going Forward

These examples are just the beginning. We hope customers will find new ways to use PrestoDB in conjunction with Kafka as well as a host of other data sources, tools, and applications. We have a feature roadmap that includes scaling the PrestoDB cluster, adding additional SQL operators, easily configuring other heterogeneous services, and more helper functions. If you like what you see, ping us and let us know what you would like to see us tackle next!

If you aren’t already using Eventador.io for your data pipelines and want to try this out, you can sign up and try it for free. Happy streaming!