Flink, JSON, and Twitter

While Twitter and WordCount are probably two of the most common ways to get started with streaming data, there’s good reason (for Twitter at least) – it’s a fun way to work on actual real-time, unbounded streams of interesting data. In this post, I’m going to explore some techniques to normalize, window, and introspect the data.


Getting started

So let’s outline our goals: We’re going to construct an application to read a Twitter stream, filter out tweets containing some interesting words, collect the hashtags included in them, and display a running, ordered total of the Top 20 most common ones. We’ll produce those to a Kafka topic as well.

We’re going to use Java for this – but if you’re using Scala, most of the same concepts apply.

While the example is based around Twitter, this illustrates a pretty common use case for Flink, regardless of datasource – building a scalable “leaderboard” type system to identify and report trends for data with a flexible schema.


Reading from Twitter

To start, let’s configure the Twitter connector. Flink provides an out-of-the-box Twitter connector that exposes hooks for various customizations. First, I’m going to construct an endpoint initializer to filter the words I’m interested in:

Now I’ll configure a stream around it:


Mapping simple strings to JSON

This builds a running Twitter stream (in “streamSource”) consisting of one JSON-encoded Tweet per event – but right now it’s delivered as a String, so I’ll need to map this into something more usable. First, I need a mapper, and I’ll use a FlatMapFunction, since it’ll allow me to emit zero or more events (which is useful in the case where I don’t find any data to emit, such as a tweet has no hashtags).


Build the keyed stream

Now I’ll use the FlatMapFunction to construct a stream of Tuples containing the hashtag, and a count – keyed by the hashtag. DataStreams are instantiated using generic types, so in this case, I’m indicating that I’m using a TupleN object (provided by Flink), consisting of a String (which is our hashtag) and an Integer (the counter).

Important note: Flink is flexible about types; You could just as easily use a plain Java object here, which would give you additional flexibility, and a bit more of a rigid ‘schema’ for the events in the stream. More information about how Flink handles types and hinting is available here:

Flink 1.3 Type Serialization

The .keyBy() is positional, so in this case, I’m saying “key by the first field in the Tuple”, which is the hashtag. If this were a POJO, I could use a named field (eg. keyBy(“hashtag”)) instead. For more complex data, Flink provides a KeySelector interface to allow you to extend the functionality as needed.


What’s trending?

Now if I stop and .print() this stream, we’ll see a stream of hashtags extracted from the tweets. If that’s all I needed, I could stop here (or decide to write them to Kafka here), but we quickly accumulate thousands of terms, and I want to see what’s trending. To do this, I’ll build a function to group and count them. Since I need to compare all of the terms to rank them, I’ll need to use an AllWindowFunction:

This is a little wordy, but it’s not really doing anything complex, let’s break it down real quick:

  • I’m implementing an AllWindowFunction, which expects to receive the entire contents of a window
  • I’m provided with an Iterable of the Hashtag and the Counter
  • I’ll iterate over these and, using a simple ValueComparator class, build an ordered map (via a LinkedHashMap to preserve insertion order)


Windowed data

Now let’s plug in the window function; I’ll specify a sliding timeAllWindow of 300 seconds, emitting the window results every 5 seconds, calling .apply() to send the window contents to the new ranking function.


Ready for takeoff

That’s it – to make it easy to debug, I’ll output everything to stdout, and then tell it to execute:

At this point, we have a working Twitter-hashtag-ranking application!

If you execute it locally, you should see something like this:

You can see the list grow and the counts increase as the hashtag collection starts to populate.


Persistence

So that works great – but what if I want to persist this back to Kafka? Simple, I’ll just add a Kafka sink to the stream:

This will write to the topic I specified on the command line (–topic), and serialize it as a simple String representation of the LinkedHashMap. You can use Kafkacat (or any consumer, like kafka-console-consumer) to check the topic to see if this is working:

Looks good!


Conclusion

Thanks for reading! I hope this walk-through helps you get started with your own application. The complete source code is available at:

Flink_Twitter_TopN Github Repository

As always if you have questions feel free to ping us at support@eventador.io, and don’t forget to join us on the Streamprocessing Slack too!

Leave a Reply

Your email address will not be published. Required fields are marked *