Flink, JSON, and Twitter

While Twitter and WordCount are probably two of the most common ways to get started with streaming data, there’s good reason (for Twitter at least) – it’s a fun way to work on actual real-time, unbounded streams of interesting data. In this post, I’m going to explore some techniques to normalize, window, and introspect the data.


Getting started

So let’s outline our goals: We’re going to construct an application to read a Twitter stream, filter out tweets containing some interesting words, collect the hashtags included in them, and display a running, ordered total of the Top 20 most common ones. We’ll produce those to a Kafka topic as well.

We’re going to use Java for this – but if you’re using Scala, most of the same concepts apply.

While the example is based around Twitter, this illustrates a pretty common use case for Flink, regardless of datasource – building a scalable “leaderboard” type system to identify and report trends for data with a flexible schema.


Reading from Twitter

To start, let’s configure the Twitter connector. Flink provides an out-of-the-box Twitter connector that exposes hooks for various customizations. First, I’m going to construct an endpoint initializer to filter the words I’m interested in:

Now I’ll configure a stream around it:


Mapping simple strings to JSON

This builds a running Twitter stream (in “streamSource”) consisting of one JSON-encoded Tweet per event – but right now it’s delivered as a String, so I’ll need to map this into something more usable. First, I need a mapper, and I’ll use a FlatMapFunction, since it’ll allow me to emit zero or more events (which is useful in the case where I don’t find any data to emit, such as a tweet has no hashtags).


Build the keyed stream

Now I’ll use the FlatMapFunction to construct a stream of Tuples containing the hashtag, and a count – keyed by the hashtag. DataStreams are instantiated using generic types, so in this case, I’m indicating that I’m using a TupleN object (provided by Flink), consisting of a String (which is our hashtag) and an Integer (the counter).

Important note: Flink is flexible about types; You could just as easily use a plain Java object here, which would give you additional flexibility, and a bit more of a rigid ‘schema’ for the events in the stream. More information about how Flink handles types and hinting is available here:

Flink 1.3 Type Serialization

The .keyBy() is positional, so in this case, I’m saying “key by the first field in the Tuple”, which is the hashtag. If this were a POJO, I could use a named field (eg. keyBy(“hashtag”)) instead. For more complex data, Flink provides a KeySelector interface to allow you to extend the functionality as needed.


What’s trending?

Now if I stop and .print() this stream, we’ll see a stream of hashtags extracted from the tweets. If that’s all I needed, I could stop here (or decide to write them to Kafka here), but we quickly accumulate thousands of terms, and I want to see what’s trending. To do this, I’ll build a function to group and count them. Since I need to compare all of the terms to rank them, I’ll need to use an AllWindowFunction:

This is a little wordy, but it’s not really doing anything complex, let’s break it down real quick:

  • I’m implementing an AllWindowFunction, which expects to receive the entire contents of a window
  • I’m provided with an Iterable of the Hashtag and the Counter
  • I’ll iterate over these and, using a simple ValueComparator class, build an ordered map (via a LinkedHashMap to preserve insertion order)


Windowed data

Now let’s plug in the window function; I’ll specify a sliding timeAllWindow of 300 seconds, emitting the window results every 5 seconds, calling .apply() to send the window contents to the new ranking function.


Ready for takeoff

That’s it – to make it easy to debug, I’ll output everything to stdout, and then tell it to execute:

At this point, we have a working Twitter-hashtag-ranking application!

If you execute it locally, you should see something like this:


Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1024616487] with leader session id d4c464da-873d-449b-aab7-dddba4c4653e.
12/11/2017 22:49:39 Job execution switched to status RUNNING.
12/11/2017 22:49:39 Source: Custom Source -> Flat Map(1/1) switched to SCHEDULED
12/11/2017 22:49:39 TriggerWindow(SlidingProcessingTimeWindows(300000, 5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@ac9223a1}, ProcessingTimeTrigger(), AllWindowedStream.main(FlinkTwitter.java:111)) -> (Sink: Unnamed, Sink: Kafka Sink)(1/1) switched to SCHEDULED
12/11/2017 22:49:39 Source: Custom Source -> Flat Map(1/1) switched to DEPLOYING
12/11/2017 22:49:39 TriggerWindow(SlidingProcessingTimeWindows(300000, 5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@ac9223a1}, ProcessingTimeTrigger(), AllWindowedStream.main(FlinkTwitter.java:111)) -> (Sink: Unnamed, Sink: Kafka Sink)(1/1) switched to DEPLOYING
12/11/2017 22:49:39 Source: Custom Source -> Flat Map(1/1) switched to RUNNING
12/11/2017 22:49:39 TriggerWindow(SlidingProcessingTimeWindows(300000, 5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@ac9223a1}, ProcessingTimeTrigger(), AllWindowedStream.main(FlinkTwitter.java:111)) -> (Sink: Unnamed, Sink: Kafka Sink)(1/1) switched to RUNNING
{#Mars=1, #Browns=1, #100andChange=1, #GoFundMe=1}
{#Mars=1, #Aliens=1, #Browns=1, #Jupiter=1, #100andChange=1, #GoFundMe=1}
{#100andChange=2, #GameOfThrones=1, #Mars=1, #Aliens=1, #Browns=1, #Jupiter=1, #Dragon=1, #GoFundMe=1}
{#100andChange=2, #GameOfThrones=1, #Mars=1, #Aliens=1, #Flashback=1, #Browns=1, #Jupiter=1, #cyberattacks=1, #Dragon=1, #GoFundMe=1}
{#USA=4, #Tshirt=4, #science=3, #NASA=3, #exploration=3, #TheMartian=3, #SPACE=3, #100andChange=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #FakeNew=1, #liemachine=1, #GoFundMe=1, #MoonLander=1, #Jupiter=1, #Dragon=1, #Space=1, #GameOfThrones=1, #Nasa=1}
{#USA=4, #Tshirt=4, #science=3, #NASA=3, #exploration=3, #TheMartian=3, #SPACE=3, #100andChange=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #GoFundMe=2, #Browns=2, #FakeNew=1, #liemachine=1, #MoonLander=1, #Jupiter=1, #Dragon=1, #Space=1, #GameOfThrones=1}
{#NASA=5, #USA=4, #Tshirt=4, #100andChange=4, #science=3, #exploration=3, #Jupiter=3, #TheMartian=3, #SPACE=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #GoFundMe=2, #Browns=2, #FakeNew=1, #AGU17=1, #liemachine=1, #ISS=1, #MoonLander=1, #scifi=1}

You can see the list grow and the counts increase as the hashtag collection starts to populate.


Persistence

So that works great – but what if I want to persist this back to Kafka? Simple, I’ll just add a Kafka sink to the stream:

This will write to the topic I specified on the command line (–topic), and serialize it as a simple String representation of the LinkedHashMap. You can use Kafkacat (or any consumer, like kafka-console-consumer) to check the topic to see if this is working:

Looks good!


Conclusion

Thanks for reading! I hope this walk-through helps you get started with your own application. The complete source code is available at:

Flink_Twitter_TopN Github Repository

As always if you have questions feel free to ping us at support@eventador.io, and don’t forget to join us on the Streamprocessing Slack too!

One Response to Flink, JSON, and Twitter

  1. Mike

    Great post, this helped me get familiar with Flink. Just wanted to point out that the tweet json coming through has the hashtags populated for you at json path $.entities.hastags, so you don’t have to use the regex

Leave a Reply

Your email address will not be published. Required fields are marked *