Flink, JSON and Twitter

December 12, 2017 in Streaming Tutorials



Flink, JSON and Twitter
Eventador Blog | Flink, JSON and Twitter

While Twitter and WordCount are probably two of the most common ways to get started with streaming data, there’s good reason (for Twitter at least) – it’s a fun way to work on actual real-time, unbounded streams of interesting data. In this post, I’m going to explore some techniques to normalize, window, and introspect the data.

Getting started

So let’s outline our goals: We’re going to construct an application to read a Twitter stream, filter out tweets containing some interesting words, collect the hashtags included in them, and display a running, ordered total of the Top 20 most common ones. We’ll produce those to a Kafka topic as well.

We’re going to use Java for this – but if you’re using Scala, most of the same concepts apply.

While the example is based around Twitter, this illustrates a pretty common use case for Flink, regardless of datasource – building a scalable “leaderboard” type system to identify and report trends for data with a flexible schema.

Reading from Twitter

To start, let’s configure the Twitter connector. Flink provides an out-of-the-box Twitter connector that exposes hooks for various customizations. First, I’m going to construct an endpoint initializer to filter the words I’m interested in:

public static class TweetFilter implements TwitterSource.EndpointInitializer, Serializable {
    @Override
    public StreamingEndpoint createEndpoint() {
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        endpoint.trackTerms(Arrays.asList("NASA", "Discovery", "Interstellar"));
        return endpoint;
    }
}

Now I’ll configure a stream around it:

// Configure Twitter source
TwitterSource twitterSource = new TwitterSource(props);
TweetFilter customFilterInitializer = new TweetFilter();
twitterSource.setCustomEndpointInitializer(customFilterInitializer);
DataStream<string> streamSource = env.addSource(twitterSource);

Mapping simple strings to JSON

This builds a running Twitter stream (in “streamSource”) consisting of one JSON-encoded Tweet per event – but right now it’s delivered as a String, so I’ll need to map this into something more usable. First, I need a mapper, and I’ll use a FlatMapFunction, since it’ll allow me to emit zero or more events (which is useful in the case where I don’t find any data to emit, such as a tweet has no hashtags).

private static class TweetFlatMapper implements FlatMapFunction<string , Tuple2<String, Integer>> {
    @Override
    public void flatMap(String tweet, Collector<tuple2 <String, Integer>> out) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        String tweetString = null;

        Pattern p = Pattern.compile("#\\w+");

        try {
            JsonNode jsonNode = mapper.readValue(tweet, JsonNode.class);
            tweetString = jsonNode.get("text").textValue();
        } catch (Exception e) {
            // That's ok, received a malformed document
        }

        if (tweetString != null) {
            List<string> tags = new ArrayList<>();
            Matcher matcher = p.matcher(tweetString);

            while (matcher.find()) {
                String cleanedHashtag = matcher.group(0).trim();
                if (cleanedHashtag != null) {
                    out.collect(new Tuple2<>(cleanedHashtag, 1));
                }
            }
        }
    }
}
/code></string></tuple2></string>

Build the keyed stream

Now I’ll use the FlatMapFunction to construct a stream of Tuples containing the hashtag, and a count – keyed by the hashtag. DataStreams are instantiated using generic types, so in this case, I’m indicating that I’m using a TupleN object (provided by Flink), consisting of a String (which is our hashtag) and an Integer (the counter).

Important note: Flink is flexible about types; You could just as easily use a plain Java object here, which would give you additional flexibility, and a bit more of a rigid ‘schema’ for the events in the stream. More information about how Flink handles types and hinting is available here:

// Parse JSON tweets, flatmap and emit keyed stream
DataStream<tuple2 <String, Integer>> jsonTweets = streamSource.flatMap(new TweetFlatMapper())
                                                             .keyBy(0);
</tuple2>

The .keyBy() is positional, so in this case, I’m saying “key by the first field in the Tuple”, which is the hashtag. If this were a POJO, I could use a named field (eg. keyBy(“hashtag”)) instead. For more complex data, Flink provides a KeySelector interface to allow you to extend the functionality as needed.

What’s trending?

Now if I stop and .print() this stream, we’ll see a stream of hashtags extracted from the tweets. If that’s all I needed, I could stop here (or decide to write them to Kafka here), but we quickly accumulate thousands of terms, and I want to see what’s trending. To do this, I’ll build a function to group and count them. Since I need to compare all of the terms to rank them, I’ll need to use an AllWindowFunction:

public static class MostPopularTags implements AllWindowFunction<tuple2 <String, Integer>, LinkedHashMap<string , Integer>, TimeWindow> {
    @Override
    public void apply(TimeWindow window, Iterable<tuple2 <String, Integer>> tweets, Collector<linkedhashmap <String, Integer>> collector) throws Exception {
        HashMap<string , Integer> hmap = new HashMap</string><string , Integer>();

        for (Tuple2</string><string , Integer> t: tweets) {
            int count = 0;
            if (hmap.containsKey(t.f0)) {
                count = hmap.get(t.f0);
            }
            hmap.put(t.f0, count + t.f1);
        }

        Comparator</string><string> comparator = new ValueComparator(hmap);
        TreeMap</linkedhashmap></tuple2></string><string , Integer> sortedMap = new TreeMap</string><string , Integer>(comparator);

        sortedMap.putAll(hmap);

        LinkedHashMap</string><string , Integer> sortedTopN = sortedMap
            .entrySet()
            .stream()
            .limit(HASHTAG_LIMIT)
            .collect(LinkedHashMap::new, (m, e) -> m.put(e.getKey(), e.getValue()), Map::putAll);

        collector.collect(sortedTopN);
    }
}

public static class ValueComparator implements Comparator</string><string> {
    HashMap</string><string , Integer> map = new HashMap</string><string , Integer>();

    public ValueComparator(HashMap</string><string , Integer> map){
        this.map.putAll(map);
    }

    @Override
    public int compare(String s1, String s2) {
        if (map.get(s1) >= map.get(s2)) {
            return -1;
        } else {
            return 1;
        }
    }
}
</string></tuple2>

This is a little wordy, but it’s not really doing anything complex, let’s break it down real quick:

  • I’m implementing an AllWindowFunction, which expects to receive the entire contents of a window
  • I’m provided with an Iterable of the Hashtag and the Counter
  • I’ll iterate over these and, using a simple ValueComparator class, build an ordered map (via a LinkedHashMap to preserve insertion order)

Windowed data

Now let’s plug in the window function; I’ll specify a sliding timeAllWindow of 300 seconds, emitting the window results every 5 seconds, calling .apply() to send the window contents to the new ranking function.

// Ordered topN list of most popular hashtags
DataStream<linkedhashmap <String, Integer>> ds = jsonTweets.timeWindowAll(Time.seconds(300), Time.seconds(5))
                                                          .apply(new MostPopularTags());
</linkedhashmap>

Ready for takeoff

That’s it – to make it easy to debug, I’ll output everything to stdout, and then tell it to execute:

// Print to stdout
ds.print();

String app_name = String.format("Streaming Tweets");
env.execute(app_name);

At this point, we have a working Twitter-hashtag-ranking application!

If you execute it locally, you should see something like this:

 [erik@erikthinkpad] ~/code/flink_twitter >> mvn exec:java
    

... Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1024616487] with leader session id d4c464da-873d-449b-aab7-dddba4c4653e. 12/11/2017 22:49:39 Job execution switched to status RUNNING. 12/11/2017 22:49:39 Source: Custom Source -> Flat Map(1/1) switched to SCHEDULED 12/11/2017 22:49:39 TriggerWindow(SlidingProcessingTimeWindows(300000, 5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@ac9223a1}, ProcessingTimeTrigger(), AllWindowedStream.main(FlinkTwitter.java:111)) -> (Sink: Unnamed, Sink: Kafka Sink)(1/1) switched to SCHEDULED 12/11/2017 22:49:39 Source: Custom Source -> Flat Map(1/1) switched to DEPLOYING 12/11/2017 22:49:39 TriggerWindow(SlidingProcessingTimeWindows(300000, 5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@ac9223a1}, ProcessingTimeTrigger(), AllWindowedStream.main(FlinkTwitter.java:111)) -> (Sink: Unnamed, Sink: Kafka Sink)(1/1) switched to DEPLOYING 12/11/2017 22:49:39 Source: Custom Source -> Flat Map(1/1) switched to RUNNING 12/11/2017 22:49:39 TriggerWindow(SlidingProcessingTimeWindows(300000, 5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@ac9223a1}, ProcessingTimeTrigger(), AllWindowedStream.main(FlinkTwitter.java:111)) -> (Sink: Unnamed, Sink: Kafka Sink)(1/1) switched to RUNNING {#Mars=1, #Browns=1, #100andChange=1, #GoFundMe=1} {#Mars=1, #Aliens=1, #Browns=1, #Jupiter=1, #100andChange=1, #GoFundMe=1} {#100andChange=2, #GameOfThrones=1, #Mars=1, #Aliens=1, #Browns=1, #Jupiter=1, #Dragon=1, #GoFundMe=1} {#100andChange=2, #GameOfThrones=1, #Mars=1, #Aliens=1, #Flashback=1, #Browns=1, #Jupiter=1, #cyberattacks=1, #Dragon=1, #GoFundMe=1} {#USA=4, #Tshirt=4, #science=3, #NASA=3, #exploration=3, #TheMartian=3, #SPACE=3, #100andChange=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #FakeNew=1, #liemachine=1, #GoFundMe=1, #MoonLander=1, #Jupiter=1, #Dragon=1, #Space=1, #GameOfThrones=1, #Nasa=1} {#USA=4, #Tshirt=4, #science=3, #NASA=3, #exploration=3, #TheMartian=3, #SPACE=3, #100andChange=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #GoFundMe=2, #Browns=2, #FakeNew=1, #liemachine=1, #MoonLander=1, #Jupiter=1, #Dragon=1, #Space=1, #GameOfThrones=1} {#NASA=5, #USA=4, #Tshirt=4, #100andChange=4, #science=3, #exploration=3, #Jupiter=3, #TheMartian=3, #SPACE=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #GoFundMe=2, #Browns=2, #FakeNew=1, #AGU17=1, #liemachine=1, #ISS=1, #MoonLander=1, #scifi=1} 

You can see the list grow and the counts increase as the hashtag collection starts to populate.

Persistence

So that works great – but what if I want to persist this back to Kafka? Simple, I’ll just add a Kafka sink to the stream:

// Write to Kafka
ds.addSink(new FlinkKafkaProducer010<>(
            params.getRequired("topic"),
            new SerializationSchema<linkedhashmap <String, Integer>>() {
                @Override
                public byte[] serialize(LinkedHashMap<string , Integer> element) {
                    return element.toString().getBytes();
                }
            },
            params.getProperties())
    ).name("Kafka Sink");
</string></linkedhashmap>

This will write to the topic I specified on the command line (–topic), and serialize it as a simple String representation of the LinkedHashMap. You can use Kafkacat (or any consumer, like kafka-console-consumer) to check the topic to see if this is working:

 [erik@erikthinkpad] ~ >> kafkacat -C -b $BROKERS -t hashtags -o end
    % Reached end of topic hashtags [0] at offset 14367
    {#NASA=7, #100andChange=5, #USA=4, #Jupiter=4, #Tshirt=4, #science=3, #exploration=3, #TheMartian=3, #SPACE=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #GoFundMe=2, #Juno=2, #GreatRedSpot=2, #SpaceProbe=2, #Browns=2, #NASAFilm=2, #FakeNew=1, #AGU17=1}
    {#NASA=7, #100andChange=5, #USA=4, #Jupiter=4, #Tshirt=4, #science=3, #exploration=3, #TheMartian=3, #SPACE=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #GoFundMe=2, #Juno=2, #GreatRedSpot=2, #SpaceProbe=2, #Browns=2, #NASAFilm=2, #FakeNew=1, #AGU17=1}

Looks good!

Conclusion

Thanks for reading! I hope this walk-through helps you get started with your own application. The complete source code is available at:

As always if you have questions feel free to ping us at support@eventador.io!

Leave a Reply

Your email address will not be published. Required fields are marked *