Photo by adrian on Unsplash

Flink, JSON, and Twitter

Building streams on JSON data with Twitter


by Erik Beebe, Founder and CTO
   11 Dec 2017

While Twitter and WordCount are probably two of the most common ways to get started with streaming data, there’s good reason (for Twitter at least) - it’s a fun way to work on actual real-time, unbounded streams of interesting data. In this post, I’m going to explore some techniques to normalize, window, and introspect the data.

Getting started

So let’s outline our goals: We’re going to construct an application to read a Twitter stream, filter out tweets containing some interesting words, collect the hashtags included in them, and display a running, ordered total of the Top 20 most common ones. We’ll produce those to a Kafka topic as well.

We’re going to use Java for this - but if you’re using Scala, most of the same concepts apply.

While the example is based around Twitter, this illustrates a pretty common use case for Flink, regardless of datasource - building a scalable “leaderboard” type system to identify and report trends for data with a flexible schema.

Reading from Twitter

To start, let’s configure the Twitter connector. Flink provides an out-of-the-box Twitter connector that exposes hooks for various customizations. First, I’m going to construct an endpoint initializer to filter the words I’m interested in:

public static class TweetFilter implements TwitterSource.EndpointInitializer, Serializable {
    @Override
    public StreamingEndpoint createEndpoint() {
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        endpoint.trackTerms(Arrays.asList("NASA", "Discovery", "Interstellar"));
        return endpoint;
    }
}

Now I’ll configure a stream around it:

// Configure Twitter source
TwitterSource twitterSource = new TwitterSource(props);
TweetFilter customFilterInitializer = new TweetFilter();
twitterSource.setCustomEndpointInitializer(customFilterInitializer);
DataStream<String> streamSource = env.addSource(twitterSource);

Mapping simple strings to JSON

This builds a running Twitter stream (in “streamSource”) consisting of one JSON-encoded Tweet per event - but right now it’s delivered as a String, so I’ll need to map this into something more usable. First, I need a mapper, and I’ll use a FlatMapFunction, since it’ll allow me to emit zero or more events (which is useful in the case where I don’t find any data to emit, such as a tweet has no hashtags).

private static class TweetFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String tweet, Collector<Tuple2<String, Integer>> out) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        String tweetString = null;

        Pattern p = Pattern.compile("#\\w+");

        try {
            JsonNode jsonNode = mapper.readValue(tweet, JsonNode.class);
            tweetString = jsonNode.get("text").textValue();
        } catch (Exception e) {
            // That's ok, received a malformed document
        }

        if (tweetString != null) {
            List<String> tags = new ArrayList<>();
            Matcher matcher = p.matcher(tweetString);

            while (matcher.find()) {
                String cleanedHashtag = matcher.group(0).trim();
                if (cleanedHashtag != null) {
                    out.collect(new Tuple2<>(cleanedHashtag, 1));
                }
            }
        }
    }
}

Build the keyed stream

Now I’ll use the FlatMapFunction to construct a stream of Tuples containing the hashtag, and a count - keyed by the hashtag. DataStreams are instantiated using generic types, so in this case, I’m indicating that I’m using a TupleN object (provided by Flink), consisting of a String (which is our hashtag) and an Integer (the counter).

Important note: Flink is flexible about types; You could just as easily use a plain Java object here, which would give you additional flexibility, and a bit more of a rigid ‘schema’ for the events in the stream. More information about how Flink handles types and hinting is available here:

Flink 1.3 Type Serialization

// Parse JSON tweets, flatmap and emit keyed stream
DataStream<Tuple2<String, Integer>> jsonTweets = streamSource.flatMap(new TweetFlatMapper())
                                                             .keyBy(0);

The .keyBy() is positional, so in this case, I’m saying “key by the first field in the Tuple”, which is the hashtag. If this were a POJO, I could use a named field (eg. keyBy(“hashtag”)) instead. For more complex data, Flink provides a KeySelector interface to allow you to extend the functionality as needed.

Now if I stop and .print() this stream, we’ll see a stream of hashtags extracted from the tweets. If that’s all I needed, I could stop here (or decide to write them to Kafka here), but we quickly accumulate thousands of terms, and I want to see what’s trending. To do this, I’ll build a function to group and count them. Since I need to compare all of the terms to rank them, I’ll need to use an AllWindowFunction:

public static class MostPopularTags implements AllWindowFunction<Tuple2<String, Integer>, LinkedHashMap<String, Integer>, TimeWindow> {
    @Override
    public void apply(TimeWindow window, Iterable<Tuple2<String, Integer>> tweets, Collector<LinkedHashMap<String, Integer>> collector) throws Exception {
        HashMap<String, Integer> hmap = new HashMap<String, Integer>();

        for (Tuple2<String, Integer> t: tweets) {
            int count = 0;
            if (hmap.containsKey(t.f0)) {
                count = hmap.get(t.f0);
            }
            hmap.put(t.f0, count + t.f1);
        }

        Comparator<String> comparator = new ValueComparator(hmap);
        TreeMap<String, Integer> sortedMap = new TreeMap<String, Integer>(comparator);

        sortedMap.putAll(hmap);

        LinkedHashMap<String, Integer> sortedTopN = sortedMap
            .entrySet()
            .stream()
            .limit(HASHTAG_LIMIT)
            .collect(LinkedHashMap::new, (m, e) -> m.put(e.getKey(), e.getValue()), Map::putAll);

        collector.collect(sortedTopN);
    }
}

public static class ValueComparator implements Comparator<String> {
    HashMap<String, Integer> map = new HashMap<String, Integer>();

    public ValueComparator(HashMap<String, Integer> map){
        this.map.putAll(map);
    }

    @Override
    public int compare(String s1, String s2) {
        if (map.get(s1) >= map.get(s2)) {
            return -1;
        } else {
            return 1;
        }
    }
}

This is a little wordy, but it’s not really doing anything complex, let’s break it down real quick:

  • I’m implementing an AllWindowFunction, which expects to receive the entire contents of a window
  • I’m provided with an Iterable of the Hashtag and the Counter
  • I’ll iterate over these and, using a simple ValueComparator class, build an ordered map (via a LinkedHashMap to preserve insertion order)

Windowed data

Now let’s plug in the window function; I’ll specify a sliding timeAllWindow of 300 seconds, emitting the window results every 5 seconds, calling .apply() to send the window contents to the new ranking function.

// Ordered topN list of most popular hashtags
DataStream<LinkedHashMap<String, Integer>> ds = jsonTweets.timeWindowAll(Time.seconds(300), Time.seconds(5))
                                                          .apply(new MostPopularTags());

Ready for takeoff

That’s it - to make it easy to debug, I’ll output everything to stdout, and then tell it to execute:

// Print to stdout
ds.print();

String app_name = String.format("Streaming Tweets");
env.execute(app_name);

At this point, we have a working Twitter-hashtag-ranking application!

If you execute it locally, you should see something like this:

[erik@erikthinkpad] ~/code/flink_twitter >> mvn exec:java
...
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1024616487] with leader session id d4c464da-873d-449b-aab7-dddba4c4653e.
12/11/2017 22:49:39	Job execution switched to status RUNNING.
12/11/2017 22:49:39	Source: Custom Source -> Flat Map(1/1) switched to SCHEDULED
12/11/2017 22:49:39	TriggerWindow(SlidingProcessingTimeWindows(300000, 5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@ac9223a1}, ProcessingTimeTrigger(), AllWindowedStream.main(FlinkTwitter.java:111)) -> (Sink: Unnamed, Sink: Kafka Sink)(1/1) switched to SCHEDULED
12/11/2017 22:49:39	Source: Custom Source -> Flat Map(1/1) switched to DEPLOYING
12/11/2017 22:49:39	TriggerWindow(SlidingProcessingTimeWindows(300000, 5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@ac9223a1}, ProcessingTimeTrigger(), AllWindowedStream.main(FlinkTwitter.java:111)) -> (Sink: Unnamed, Sink: Kafka Sink)(1/1) switched to DEPLOYING
12/11/2017 22:49:39	Source: Custom Source -> Flat Map(1/1) switched to RUNNING
12/11/2017 22:49:39	TriggerWindow(SlidingProcessingTimeWindows(300000, 5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@ac9223a1}, ProcessingTimeTrigger(), AllWindowedStream.main(FlinkTwitter.java:111)) -> (Sink: Unnamed, Sink: Kafka Sink)(1/1) switched to RUNNING
{#Mars=1, #Browns=1, #100andChange=1, #GoFundMe=1}
{#Mars=1, #Aliens=1, #Browns=1, #Jupiter=1, #100andChange=1, #GoFundMe=1}
{#100andChange=2, #GameOfThrones=1, #Mars=1, #Aliens=1, #Browns=1, #Jupiter=1, #Dragon=1, #GoFundMe=1}
{#100andChange=2, #GameOfThrones=1, #Mars=1, #Aliens=1, #Flashback=1, #Browns=1, #Jupiter=1, #cyberattacks=1, #Dragon=1, #GoFundMe=1}
{#USA=4, #Tshirt=4, #science=3, #NASA=3, #exploration=3, #TheMartian=3, #SPACE=3, #100andChange=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #FakeNew=1, #liemachine=1, #GoFundMe=1, #MoonLander=1, #Jupiter=1, #Dragon=1, #Space=1, #GameOfThrones=1, #Nasa=1}
{#USA=4, #Tshirt=4, #science=3, #NASA=3, #exploration=3, #TheMartian=3, #SPACE=3, #100andChange=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #GoFundMe=2, #Browns=2, #FakeNew=1, #liemachine=1, #MoonLander=1, #Jupiter=1, #Dragon=1, #Space=1, #GameOfThrones=1}
{#NASA=5, #USA=4, #Tshirt=4, #100andChange=4, #science=3, #exploration=3, #Jupiter=3, #TheMartian=3, #SPACE=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #GoFundMe=2, #Browns=2, #FakeNew=1, #AGU17=1, #liemachine=1, #ISS=1, #MoonLander=1, #scifi=1}

You can see the list grow and the counts increase as the hashtag collection starts to populate.

Persistence

So that works great - but what if I want to persist this back to Kafka? Simple, I’ll just add a Kafka sink to the stream:

// Write to Kafka
ds.addSink(new FlinkKafkaProducer010<>(
            params.getRequired("topic"),
            new SerializationSchema<LinkedHashMap<String, Integer>>() {
                @Override
                public byte[] serialize(LinkedHashMap<String, Integer> element) {
                    return element.toString().getBytes();
                }
            },
            params.getProperties())
    ).name("Kafka Sink");

This will write to the topic I specified on the command line (–topic), and serialize it as a simple String representation of the LinkedHashMap. You can use Kafkacat (or any consumer, like kafka-console-consumer) to check the topic to see if this is working:

[erik@erikthinkpad] ~ >> kafkacat -C -b $BROKERS -t hashtags -o end
% Reached end of topic hashtags [0] at offset 14367
{#NASA=7, #100andChange=5, #USA=4, #Jupiter=4, #Tshirt=4, #science=3, #exploration=3, #TheMartian=3, #SPACE=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #GoFundMe=2, #Juno=2, #GreatRedSpot=2, #SpaceProbe=2, #Browns=2, #NASAFilm=2, #FakeNew=1, #AGU17=1}
{#NASA=7, #100andChange=5, #USA=4, #Jupiter=4, #Tshirt=4, #science=3, #exploration=3, #TheMartian=3, #SPACE=3, #ASTRONAUT=3, #SPACESHUTTLE=3, #Interstellar=2, #GoFundMe=2, #Juno=2, #GreatRedSpot=2, #SpaceProbe=2, #Browns=2, #NASAFilm=2, #FakeNew=1, #AGU17=1}

Looks good!

Conclusion

Thanks for reading! I hope this walk-through helps you get started with your own application. The complete source code is available at:

Flink_Twitter_TopN Github Repository

As always if you have questions feel free to ping us at support@eventador.io, and don’t forget to join us on the Streamprocessing Slack too!