RxJava – WordCount Example
Counting words in large files is a “hello world” in the big data space. I wanted to share thoughts on sustainable processing of large files using Reactive Streams approach, namely with RxJava. This post outlines how to read and process contents of a large text file without overwhelming the system by using back pressure.
The Problem
How to safely process a not-so-small file, let’s say a few hundreds of megabytes or larger. I’ve tested with enwik9 (the link leads to a landing page, it does not actually start a download;-), which has a gigabyte worth of contents.
Reading File Contents
This piece of code taken from Tomas Nurkiewitz’s blog works like a charm.
Flowable.using( () -> new BufferedReader(new FileReader(filePath)), reader -> Flowable.fromIterable(() -> reader.lines().iterator()), reader -> reader.close() );
Flowable.using creates a disposable resource whose lifetime is bound to the lifetime of an observable sequence (file contents, i.e. lines in the file). This is rather convenient, because we can safely assume that once all of the lines have been read, the resource is disposed of.
Flowable.using operator relies on a back pressure in the producer, i.e. the second argument – Flowable.iterateFrom, which in turn relies upon a subscriber to control data intake (Reactive Stream’s Subscription.request). More on that later.
Creating a Subscription
So far, we have a means of how to read a file. Since the read is lazily evaluated, nothing happens until we subscribe to it and start requesting data.
The logic we are looking to build is a bit more involved, so creating a custom subscriber sounds like a good idea. Here is a skeleton:
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; class WordCountSubscriber implements Subscriber<String> { private final int maxLines; WordCountSubscriber(int maxLines) { this.maxLines = maxLines; } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(maxLines); } @Override public void onNext(String line) { // TODO: Count words // Move on subscription.request(maxLines); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { // Present results - Top 50 of the most frequent words } }
Processing is triggered in onSubscribe by requesting a limited number of lines. The producer adheres to this limitation and provides at most as many lines from the text file as requested by the maxLines instance variable. While the onSubscribe creates a reusable subscription and triggers the pipeline, it’s the onNext callback that keeps the whole process going by requesting a new batch of lines, once it’s done processing the current batch.
Counting Words
The core logic is the least exciting bit, but just for completeness. I use the common approach with stop words and data cleansing. Each line is split by white spaces. For a chunk of text to count as a word it must be at least two characters long. Stop words (articles, prepositions etc.) are filtered out. There are certainly other ways of how to go about the problem, but I’d consider this good enough.
private static final String WHITE_SPACE = "[\\p{Punct}\\s]+"; private final List<String> stopWords; // Init-ed in the constructor @Override public void onNext(String line) { // Extract words final List<String> words = Stream.of(line.toLowerCase().split(WHITE_SPACE)) .filter(word -> word.length() > 2) .filter(word -> !stopWords.contains(word)) .collect(Collectors.toList()); // Count them words.forEach(word -> wordCountMap.compute(word, (key, value) -> value == null ? 1 : value + 1)); // Show progress trackProgress(words.size()); // Move on subscription.request(maxLines); }
Showing Progress
Crunching of large files takes time. It’s a good practice to give an indication of progress:
Processed words: 2088887, Elapsed: 14.19 seconds
The point being I don’t quite want to print out a zillion of lines just to give an update. Instead, the progress gets updated within a single line.
import java.util.concurrent.atomic.AtomicInteger; private static final String MESSAGE = "\rProcessed words: %d, Elapsed: %.2f seconds"; private final AtomicInteger wordCount = new AtomicInteger(); private void trackProgress(int count) { final long millisElapsed = Duration.between(start, Instant.now()).toMillis(); final double secondsElapsed = millisElapsed / 1000f; System.out.print(String.format(MESSAGE, wordCount.addAndGet(count), secondsElapsed)); }
Top 50 Words
Finally, once the processing is over it’s cool to get a gist of what’s the document or a book about. In any large document, the complete list of counted word would run way too long. I feel it’s better to show top X (50 in this case) of the most frequent words.
Given the sample file I mentioned above, the one-gigabyte enwik9, the results look as follows:
quot: 1768532 amp: 910220 title: 541474 page: 524688 text: 514243 revision: 489258 contributor: 487657 timestamp: 486945 census: 425447 category: 420425 username: 411017 http: 403432 # etc. all the way down to 50 words in total
As you can see there is still some space for improvements when it comes to data cleansing, but I believe it suffices as an illustration.
Obviously, the results are populated within the onComplete callback. I hope the code speaks for itself:
@Override public void onComplete() { System.out.println("\n\n== Top 50 =="); // Top 50 entries final Stream<Map.Entry<String, Integer>> entryStream = wordCountMap .entrySet() .stream() .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) .limit(50); // Collect into a map final LinkedHashMap<String, Integer> entryMap = entryStream .collect( Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new ) ); // Print results entryMap.forEach( (key, value) -> System.out.println(key + ": " + value) ); }
Conclusion
I’ve shown how to process a large data stream even with limited resources. The key takeaway is to ensure a cold data source (lazy stream) and control of the throughput via customised back pressure. I’ve used word count as a prototype of a more realistic project with some actual processing pipeline (counting words, summary of the results, tracking progress).
The source code is available on GitLab.
Thanks for reading to the end. Please share your thoughts in the comments section below and spread the word, if you found my advice useful. Thank you.