Normalizing JSON Data in Kafka Topics? SQLStreamBuilder Input Transforms to the Rescue

November 12, 2019 in Apache Kafka and SQL



Normalizing JSON Data in Kafka Topics? SQLStreamBuilder Input Transforms to the Rescue

SQLStreamBuilder works just like you would expect a database to work. You have rows and columns and you run SQL queries on them – except, it’s a boundless stream of data, not a cursor with an endpoint.

In order to do this, you must have a valid schema to query against, which means, essentially, you must have a list of columns and associated datatypes. This works great when you are specifying your schema as AVRO and are well disciplined with schema management and schema hygiene (like using Schema Registry).

My data is a freaking mess

What if you are using strings of JSON data as your format? Or maybe you have sources you don’t explicitly control. Maybe your data has missing or non-uniform keys. Maybe you have anonymous arrays (arrays w/o a key). Maybe some of the data is junk and you want to ignore it vs transform it. Maybe your schema has evolved over time. Perhaps your data format is just..messy. Then what?

Input Transforms to the rescue

Input filters allow you to write a javascript function that operates on each message after it’s consumed from Kafka (or any other source) but before you write SQL against it. You define your schema for the output of the transform, which can be any bit of javascript you want. It just needs to take the input as a string and return a string. Most commonly, JSON is used for both input and output, but it could be anything.

  • Input Transformations are defined for a virtual table source.
  • One transformation per source.
  • Takes record as an JSON formatted string input variable. The input is always named record.
  • Emits the output of the last line to the calling JVM. It could be any variable name. In our examples we use out and emit it as a JSON formatted string. It must be JSON formatted.

A basic input transformation looks like this:

var out = JSON.parse(record); // record is input, parse JSON formatted string to object
// do whatever you want here..
JSON.stringify(out); // emit JSON formatted string of object
view raw input_transforms.js hosted with ❤ by GitHub

These transforms can be handy and very powerful when your input data isn’t quite perfect. Let’s dig into a couple of sample use cases to show the power. Starting with a simple example, say the data has mixed case keys, and we just want to normalize them by capitalizing them.

A simple transform

The input data looks like the below example. It has two different keys, slightly differing in implementation, but logically being the same thing. It’s messy, so we need to clean it up with a simple transform.

{"temp": 12}
{"Temp": 44}

The input transform would look like:

// normalize the case of keys
var parsedVal = JSON.parse(record);
out = {};
Object.keys(parsedVal).forEach(function(key) {
out[key.toUpperCase()] = parsedVal[key]
});
JSON.stringify(out);
view raw input_transforms.js hosted with ❤ by GitHub

and the resulting data would look like:

{"TEMP": 12}
{"TEMP": 44}

and we would make a schema in SQLStreamBuilder that looked like:

{
"namespace": "temps",
"type": "record",
"name": "com.eventador.temps",
"fields": [
{
"name": "TEMP",
"type": "int"
}
}
view raw input_transforms.js hosted with ❤ by GitHub

Finally, we would query it like:

SELECT temp FROM my_virtual_table WHERE temp > 20;

The resulting datastream would look like:

{"TEMP": 44}

Anonymous array transforms

Let’s take a more difficult example: anonymous arrays. Perhaps the data comes in like:

[["cheese", "meat", "lettuce"], ["hotsauce", "mustard"]]
[["cheese", "meat", "onions"], ["ketchup", "mustard"]]
view raw input_transforms.js hosted with ❤ by GitHub

If you wanted to query by index[1] of this array, it would be impossible without some key (column name) and type. In this case we can give the keys predicable names based on position then query them with SQL.

The transform would like like:

var parsedVal = JSON.parse(record);
out = {};
for (index = 0; index < parsedVal.length; index++) {
key = "data_"+index;
out[key] = parsedVal[index];
}
JSON.stringify(out);
view raw input_transforms.js hosted with ❤ by GitHub

And the output data would look like:

{"data_0":["cheese","meat","lettuce"],"data_1":["hotsauce","mustard"]}
{"data_0":["cheese","meat","onions"],"data_1":["ketchup","mustard"]}

The schema looks like:

{
"name": "ingredients",
"type": "record",
"namespace": "com.eventador.ingredients",
"fields": [
{
"name": "data_0",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "data_1",
"type": {
"type": "array",
"items": "string"
}
}
]
}
view raw input_transforms.js hosted with ❤ by GitHub

The query looks like:

SELECT r."EXPR$0" AS ingredient
FROM mytopic a,
UNNEST("a"."data_0") AS r

You may note the special expression EXPR$0 being used. This is the psuedo-column generated from the un-named array with each being EXPR$n where n is a unique ordered sequence for each column returned.

The resulting datastream would look like:

{"ingredient":"cheese"}
{"ingredient":"meat"}
{"ingredient":"lettuce"}
{"ingredient":"cheese"}
{"ingredient":"meat"}
{"ingredient":"onion"}

Trying it for yourself

We hope you find new powerful ways to utilize this new functionality. If you are interested in trying it out, you can sign up for a free trial of SQLStreamBuilder here and get started right away.

Want to get started using ANSI SQL with Kafka?

Leave a Reply

Your email address will not be published. Required fields are marked *