RxJava – Leveraging Back Pressure
In this post I look into practical applications of a back pressure when building data intensive pipelines with RxJava. I explain what a hot source is and how to handle large data streams without overwhelming the system. As usual there are trade-offs to consider. Please read on if that sounds interesting to you.
Cold vs Hot Source
Cold sources, or rather value generators are demand-driven. An infinite stream is a good example: Nothing happens, unless the client explicitly asks for the next value.
Hot sources emit events autonomously and subscribers (observers) are forced to keep up with whatever the data rate is. This has a potential to cause performance issues and high resource consumption in general.
A source is conceptualised by an Observable: monitors data flows from sources and makes them accessible to subscribers.
A Flowable is an Observable with a back pressure mechanism (strategy). Whether to choose one or the other depends on how “bursty” your data source is. Let’s have a look at a few examples.
The code snippet below is an example of a cold source. Observable.range is lazy, the demand is driven by subscribers (pull approach) and thereof no back pressure needs to be applied.
import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; Observable.range(0, 10_000) .observeOn(Schedulers.computation()) .subscribe(System.out::println, Throwable::printStackTrace);
PublishProcessor, on the other hand, is considered a hot source. The code below is therefore prone to a MissingBackpressureException. I will look into resolving this issue in a minute.
import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import java.util.stream.IntStream; PublishProcessor<Integer> source = PublishProcessor.create(); source .observeOn(Schedulers.computation()) .subscribe(System.out::println, Throwable::printStackTrace); IntStream.range(0, 10_000).forEach(source::onNext);
Okay, so what actually is a back pressure and how to implement it?
Back pressure gives subscribers control over data flow. It’s an ability to slow down or throttle data intake. When it comes to implementation the most straightforward approach is buffering.
Buffering
To reduce the likelihood of MissingBackpressureException, data can be batched by size or by time. The code below adds size-based back pressure by slicing the incoming data flow into batches, a thousand of items each.
PublishProcessor<Integer> source = PublishProcessor.create(); source .buffer(1000) .observeOn(Schedulers.computation()) .flatMap(x -> Flowable.fromIterable(x)) .subscribe(System.out::println, Throwable::printStackTrace); IntStream.range(0, 10_000).forEach(source::onNext);
Version 2 of RxJava introduces a Flowable – a reactive data flow handler with a default internal buffer of 128 items. The example below combines two data sources and uses a queue as a temporary data storage.
The first implementation is done using a plain Observable. It’s obvious there is no back pressure, since all of the items are eagerly queued up.
import java.util.concurrent.ConcurrentLinkedQueue; final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); final Observable<Integer> o1 = Observable.range(0, 10_000).doOnNext(queue::add); final Observable<Integer> o2 = Observable.fromIterable(queue).doOnNext(queue::remove); Observable .zip( o1, o2, (x, y) -> String.format("[%d, %d], backlog: %d", x, y, queue.size())) .observeOn(Schedulers.computation()) .subscribe(System.out::println, Throwable::printStackTrace);
Output:
[0, 0], backlog: 9999 [1, 1], backlog: 9998 [2, 2], backlog: 9997 [3, 3], backlog: 9996 [4, 4], backlog: 9995 [5, 5], backlog: 9994 [6, 6], backlog: 9993 [7, 7], backlog: 9992 ...
A safer approach is to enforce buffering. As you can see the implementation is almost identical. A mere switch to a Flowable leverages the aforementioned internal buffer of 128 elements, which is visible from the output.
final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); final Flowable<Integer> f1 = Flowable.range(0, 10_000).doOnNext(queue::add); final Flowable<Integer> f2 = Flowable.fromIterable(queue).doOnNext(queue::remove); Flowable .zip( f1, f2, (x, y) -> String.format("%d vs %d, backlog: %d", x, y, queue.size())) .subscribe(System.out::println, Throwable::printStackTrace);
Output:
[0, 0], backlog: 127 [1, 1], backlog: 126 [2, 2], backlog: 125 [3, 3], backlog: 124 [4, 4], backlog: 123 [5, 5], backlog: 122 [6, 6], backlog: 121 [7, 7], backlog: 120 ...
Skipping Values
Sampling is another great means of how to preserve resources. It’s a lossy operation reducing throughput by allowing only a certain number of items per a given period of time.
PublishProcessor<Integer> source = PublishProcessor.create(); source .sample(1, TimeUnit.MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe(System.out::println, Throwable::printStackTrace); IntStream .range(0, 10_000) .forEach(source::onNext);
Yields a somewhat trimmed output:
263 533 747 1211 1746 2097 2782 3655 4562 4612 5351 5354 6388 7054 9999
Another way of how to reduce data inflow is an application of BackpressureOverflowStrategy:
- ERROR: A Default strategy, where an exception (BufferOverflowException) is thrown whenever the buffer fills up.
- DROP_OLDEST: Adds the latest value onto the buffer at the cost of dropping the oldest value.
- DROP_LATEST: Replaces the latest buffered value with a new one.
Here is an example of how to apply an explicit back pressure strategy.
import io.reactivex.BackpressureOverflowStrategy; Flowable .range(0, 10_000) .onBackpressureBuffer( 1000, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST) .observeOn(Schedulers.computation()) .subscribe(System.out::println, Throwable::printStackTrace);
Inspecting the output reveals an expected data loss:
... 219 220 221 222 223 3206 3207 3215 ...
Best Practices
The ultimate best approach always depends on the use case. The library provides tools for controlling the volume of the data flow, each approach has its own advantages and shortcomings. I hope the summary below helps you decide of what to do in your particular situation.
Observable vs Flowable
Observable imposes a lower overhead in comparison with Flowable, but presents a risk of running out of memory or a failure to handle an overwhelming data stream.
Observable is safe to use when there is a modest data load (thousands of items at most). This applies to capturing GUI interactions, such as mouse moves or touch events. In any case, should the amount of data grow beyond these limits consider the use of sampling.
Flowable comes with a built-in back pressure and covers all kinds of data intensive scenarios dealing with tens of thousands of events. Blocking I/O operations, such as reading from a file or pulling data from a database are good candidates for batched processing. Also, network streaming – whenever the protocol allows to set a threshold. Apply sampling or an appropriate back pressure strategy.
Back Pressure
Reactive Streams specification mandates operators supporting non-blocking back pressure. This is to guarantee that consumers won’t overflow when requesting data from hot sources. In ReactiveX, Flowable ensures proper handling of downstream data. While a back pressure is built in, OutOfMemory or MissingBackpressure exceptions can still occur. At the very least, there is a guarantee that in case of problems a call to onNext in the consumer won’t happen and an exception is signalled instead.
ReactiveX project’s wiki talks about back pressure concepts in detail.
Thanks for reading to the end. Let me know your feedback in the comments section below and please do share this post if you found it useful.