RxJava – WordCount Example

Counting words in large files is a “hello world” in the big data space. I wanted to share thoughts on sustainable processing of large files using Reactive Streams approach, namely with RxJava. This post outlines how to read and process contents of a large text file without overwhelming the system by using back pressure.

The Problem

How to safely process a not-so-small file, let’s say a few hundreds of megabytes or larger. I’ve tested with enwik9 (the link leads to a landing page, it does not actually start a download;-), which has a gigabyte worth of contents.

Reading File Contents

This piece of code taken from Tomas Nurkiewitz’s blog works like a charm.

Flowable.using(
  () -> new BufferedReader(new FileReader(filePath)),
  reader -> Flowable.fromIterable(() -> reader.lines().iterator()),
  reader -> reader.close()
);

Flowable.using creates a disposable resource whose lifetime is bound to the lifetime of an observable sequence (file contents, i.e. lines in the file). This is rather convenient, because we can safely assume that once all of the lines have been read, the resource is disposed of.

Flowable.using operator relies on a back pressure in the producer, i.e. the second argument – Flowable.iterateFrom, which in turn relies upon a subscriber to control data intake (Reactive Stream’s Subscription.request). More on that later.

Creating a Subscription

So far, we have a means of how to read a file. Since the read is lazily evaluated, nothing happens until we subscribe to it and start requesting data.

The logic we are looking to build is a bit more involved, so creating a custom subscriber sounds like a good idea. Here is a skeleton:

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class WordCountSubscriber implements Subscriber<String> {

  private final int maxLines;

  WordCountSubscriber(int maxLines) {
    this.maxLines = maxLines;
  }

  @Override
  public void onSubscribe(Subscription subscription) {
    this.subscription = subscription;
    this.subscription.request(maxLines);
  }

  @Override
  public void onNext(String line) {
    // TODO: Count words

    // Move on
    subscription.request(maxLines);
  }

  @Override
  public void onError(Throwable t) {
    t.printStackTrace();
  }

  @Override
  public void onComplete() {
    // Present results - Top 50 of the most frequent words
  }
}

Processing is triggered in onSubscribe by requesting a limited number of lines. The producer adheres to this limitation and provides at most as many lines from the text file as requested by the maxLines instance variable. While the onSubscribe creates a reusable subscription and triggers the pipeline, it’s the onNext callback that keeps the whole process going by requesting a new batch of lines, once it’s done processing the current batch.

Counting Words

The core logic is the least exciting bit, but just for completeness. I use the common approach with stop words and data cleansing. Each line is split by white spaces. For a chunk of text to count as a word it must be at least two characters long. Stop words (articles, prepositions etc.) are filtered out. There are certainly other ways of how to go about the problem, but I’d consider this good enough.

private static final String WHITE_SPACE = "[\\p{Punct}\\s]+";
private final List<String> stopWords; // Init-ed in the constructor

@Override
public void onNext(String line) {

  // Extract words
  final List<String> words = Stream.of(line.toLowerCase().split(WHITE_SPACE))
    .filter(word -> word.length() > 2)
    .filter(word -> !stopWords.contains(word))
    .collect(Collectors.toList());

  // Count them
  words.forEach(word -> wordCountMap.compute(word,
    (key, value) -> value == null ? 1 : value + 1));

  // Show progress
  trackProgress(words.size());

  // Move on
  subscription.request(maxLines);
}

Showing Progress

Crunching of large files takes time. It’s a good practice to give an indication of progress:

Processed words: 2088887, Elapsed: 14.19 seconds

The point being I don’t quite want to print out a zillion of lines just to give an update. Instead, the progress gets updated within a single line.

import java.util.concurrent.atomic.AtomicInteger;

private static final String MESSAGE = "\rProcessed words: %d, Elapsed: %.2f seconds";
private final AtomicInteger wordCount = new AtomicInteger();

private void trackProgress(int count) {
  final long millisElapsed = Duration.between(start, Instant.now()).toMillis();
  final double secondsElapsed = millisElapsed / 1000f;
  System.out.print(String.format(MESSAGE, wordCount.addAndGet(count), secondsElapsed));
}

Top 50 Words

Finally, once the processing is over it’s cool to get a gist of what’s the document or a book about. In any large document, the complete list of counted word would run way too long. I feel it’s better to show top X (50 in this case) of the most frequent words.

Given the sample file I mentioned above, the one-gigabyte enwik9, the results look as follows:

quot: 1768532
amp: 910220
title: 541474
page: 524688
text: 514243
revision: 489258
contributor: 487657
timestamp: 486945
census: 425447
category: 420425
username: 411017
http: 403432

# etc. all the way down to 50 words in total

As you can see there is still some space for improvements when it comes to data cleansing, but I believe it suffices as an illustration.

Obviously, the results are populated within the onComplete callback. I hope the code speaks for itself:

@Override
public void onComplete() {
  System.out.println("\n\n== Top 50 ==");

  // Top 50 entries
  final Stream<Map.Entry<String, Integer>> entryStream = 
    wordCountMap
      .entrySet()
      .stream()
      .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
      .limit(50);

  // Collect into a map
  final LinkedHashMap<String, Integer> entryMap = 
    entryStream
      .collect(
        Collectors.toMap(
          Map.Entry::getKey, Map.Entry::getValue, 
          (e1, e2) -> e1, LinkedHashMap::new
        )
      );
    
    // Print results    
    entryMap.forEach(
      (key, value) -> System.out.println(key + ": " + value)
    );
}

Conclusion

I’ve shown how to process a large data stream even with limited resources. The key takeaway is to ensure a cold data source (lazy stream) and control of the throughput via customised back pressure. I’ve used word count as a prototype of a more realistic project with some actual processing pipeline (counting words, summary of the results, tracking progress).

The source code is available on GitLab.

Thanks for reading to the end. Please share your thoughts in the comments section below and spread the word, if you found my advice useful. Thank you.

Similar Posts