What is Continuous SQL? Stream Processing with Continuous SQL Explained.

January 16, 2020 in Continuous SQL



What is Continuous SQL? Stream Processing with Continuous SQL Explained.

During the technology boom of the last two decades, a trend has come into focus: data is the lifeblood of a modern company and nearly every company is, essentially, a data company. Organizations have realized that investing in data systems and infrastructure can make them more competitive in their marketplaces and allow for new exciting innovations.

In the last few years, this paradigm has exploded as it has become clear that streaming, or real-time, data has more value than data-at-rest. Organizations can add streaming data systems to their arsenal and massively boost their ability to provide highly differentiated, competitive, and compelling user experiences.

Overlapping this trend is the popularity to generate data using machines with cloud computing. The Internet of Things is an obvious example, but lesser known is the trend to instrument everything in your business—every web click, visitor session, page view, purchase, interaction, chat message, all of it. In 1995, companies stored who the customer was and what they purchased. In 2020, companies store every single interaction they have with the business—from the manufacturing of the product to data being generated as the product is used. In fact, companies are being created because of streaming data, and without it, they wouldn’t exist.

But, the velocity and volume of streaming data is massive, which has dictated that new architectures in data systems be invented. Ingesting the firehose of data, distilling down the useful parts, and routing it to this new breed of applications requires new and specialized designs. Because these streaming systems are not siloed, they are typically used alongside and in conjunction with more traditional technologies like relational database systems (PostgreSQL, MySQL, Oracle) or even NoSQL systems (MongoDB, Elastic).

Further, the modern enterprise is on a collision course between the desire to capture data at an increasing rate and the ability to process that data. For 30 years, SQL has been the defacto standard, but it can no longer keep up. Continuous SQL solves this problem by continuously running the SQL processes on the boundless stream of business data.

The distributed log

Append-only distributed log data systems like Apache Kafka, Pravega, Pulsar, and NATS have provided relatively simple, scalable, and high-performance architectures for managing the input/output and storage for streams of data. Designs differ, but a common design trait is that these systems allow for the persistence of data at a very high volume and concurrency, but they give up things like transactional semantics. Typically, they allow for various durability guarantees and allow capabilities from “at least once” to “exactly once” processing.

In the case of Kafka, it provides a massively scalable and redundant architecture coupled with straightforward APIs. Data is organized by a namespace called a topic and supports highly concurrent writes and reads. High performance and scalability are provided using a partitioning scheme. Programs can write (produce) and read (consume) data via language-specific drivers. The data can be in various formats with Apache AVRO and JSON being two common ones.

The distributed log gives us part of a solution for most organizations: APIs to write and read data in an insanely fast, yet durable, manner.

Stream processing

The stream processing paradigm was invented to perform continuous parallel computations on these streams of data. Stream processors are programs that interact with the streams of data. You don’t have to use them in combination with a distributed log, but when you do, they are massively powerful.

Stream processing frameworks present an API for writing processors that run in parallel to create computations and to reduce down and package data in a usable format for a business or application to consume. Often times these processors are called jobs. Stream processing frameworks like Apache Flink, Samza, and Storm do things like manage state, handle interprocess communications, provide high availability/restart-ability, and scalability of groups of jobs. Jobs can also be created as independent processors using APIs like Kstreams where Kafka itself is used for many of these functions.

Stream processing jobs are typically written using the specific API of the processing framework itself, and Java and Scala are prevalent languages for this. The APIs tend to be fantastically powerful and rich. For instance, in the case of Apache Flink, there are multiple APIs with various degrees of functionality and complication. They range from very low-level operations (datastream API), up to a higher levels (SQL API).

Stream processors tend to process data from an input (source) to an output (sink). Typical sources are Apache Kafka or AWS Kinesis. Typical sinks can be anything from Kafka to traditional RDBMS systems like PostgreSQL or even distributed file systems like Amazon S3. Jobs can be chained in process (parsed to a DAG) or extra-process by using a sink as a source for an entirely different job. Creating chains of these processors is called a data pipeline.

Continuous SQL

Continuous SQL – sometimes called Streaming SQL, StreamSQL, Continuous Query, Real-Time SQL, or even “Push SQL” – is the usage of Structured Query Language (SQL) for stream processing. SQL is inherently suitable for stream processing as the semantics and grammar of the language are naturally designed to ask for data in a declarative way. Moreover, relational SQL has the characteristic of using a set of tuples with types (called relations or tables) to express the schema of the data. These relations fundamentally differ from traditional RDBMS relations; because, as streams, they must have a time element. Because of SQL’s rich history, it is widely known and easy to write. Developers, data engineers, data scientists, and others do not need to use complicated, low-level APIs to create processors. They can create them in SQL. More importantly, they can issue SQL against the stream of data and receive feedback right away. This allows them to explore and reason about the data stream itself using a familiar paradigm. Continuous SQL should be familiar to anyone who has used SQL with a RDBMS, but it does have some important differences.

In a relational database system (RDBMS), SQL is interpreted and validated, an execution plan is created, a cursor is spawned, results are gathered into that cursor, and then iterated over for a point in time picture of the data. This picture is a result set, it has a start and an end. The phasing is described as parse, execute, and fetch.

  • Parse: validate the SQL statement, create an execution plan, give feedback to user.
  • Execute: run the SQL statement using the execution plan.
  • Fetch: open a cursor and return the data to the user, closing the cursor when the data is done being returned.

In contrast, Continuous SQL queries continuously process results to a sink of some type. The SQL statement is interpreted and validated against a schema (the set of tuples). The statement is then executed and the results matching the criteria are continuously returned. Jobs defined in SQL look a lot like regular stream processing jobs—the difference being they were created using SQL vs something like Java, Scala or python. Data being emitted via Continuous SQL are the continuous results—there is a beginning, but no end. A boundless stream of tuples.

  • Parse: validate the SQL statement, give feedback to user.
  • Execute: run the SQL statement
  • Continuously Process: push the results of the query to a sink

Continuous SQL looks a lot like standard SQL:

-- detect fraudulent auths
select card as fraudulent_card,
count(*) as auth_count,
max(amount) as max_amount,
tumble_end(eventTimestamp, interval '1' second) as ts_end
from paymentauths
where amount > 10
group by card, tumble(eventTimestamp, interval '1' second)
having count(*) > 2
view raw continuoussql.sql hosted with ❤ by GitHub

Schema

Distributed log systems like Kafka don’t inherently enforce a schema on the data flowing through it. They ingest and store messages over time, and data can be in any format. Clearly, without some known schema, data would be a mess and near impossible to query/filter. Thus, there must be some schema assigned to the data. Common formats are JSON and AVRO. In the case of AVRO, schemas are defined and versioned using a separate storage system. In the case of Apache Kafka, drivers are schema-aware and can enforce compliance at produce-time validating against a central repository (like Schema Registry). In the case of JSON a schema must be defined in order to query by values nested in the data structure, as well as assign types.

In order to run Continuous SQL, a schema must exist. This schema is the tuples and types that are part of the query (the columns and data types). This schema also provides the definition the parser will validate the statement against for validity (naming, operators, types, etc).

-- example schema for paymentauths
card varchar
amount integer
eventTimestamp timestamp
view raw continuoussql.sql hosted with ❤ by GitHub

Running SQL against boundless streams of data requires a bit of a mindset change. While much of the SQL grammar remains the same, how the queries work, the results that are shown, and the overall processing paradigm is different than traditional RDBMS systems. Filters, subqueries, joins, functions, literals and all the myriad of useful language features generally exist but may have different behaviors. New constructs around time, specifically windowing. are introduced.

Materializing results

One key difference between Continuous SQL and traditional SQL is how the results of the query are handled. In traditional SQL a result set is returned to the calling application as a cursor consumes the results. Using Continuous SQL the results are continuously returned to a sink. Sinks can be streams of data like Kafka topics, or sinks can be more traditional systems like a RDBMS. More than one stream can utilize the same sink, sometimes joined with other streams.

But careful consideration must be given to how the results of a stream of data are ultimately persisted. An impedance mismatch exists between streams and traditional storage systems like databases. Streams are a boundless set of tuples, and databases store the latest state of a tuple. This impedance mismatch must be handled in some manner. Systems like Apache Flink provide a number of handlers for sinks that describe the behavior of the transition. Typical constructs are:

  • Append: every message is a new insert
  • Upsert: update by key, insert if key missing (idempotent)
  • Retract: upsert with delete if key no longer present in window

Which one to use is highly dependent on the type of sink, it’s native capabilities, and the use case. For instance, a RDBMS could work with a retract stream, but a time-series DB would only support append only.

It’s extremely common to have some sort of persistent storage (a database) at the end of a stream of data. This makes it easy for developers to consume the data from an application. Perhaps a javascript application that is plotting vehicle locations on a map using streamed geo-locations. It pulls the locations over REST at periodic internals.

This presents many new management requirements like schema migrations, indexing strategies, write amplification, fragmentation, and typing mismatch handling. Ultimately, creating a materialized view on a stream (materializing the results of a query) is one of the important and tricky parts of a streaming data pipeline.

Leave a Reply

Your email address will not be published. Required fields are marked *