Rx Observable Contract
Observable contract describes what assumptions are safe to make about interactions between an Observable and an Observer. Originally presented by Microsoft as a set of Rx design principles, it’s been formalised and captured in Rx documentation. This post highlights essential parts of the contracts and provides code examples for better understanding.
The Contract
In my previous post I established Rx builds upon the Observer pattern. Here is a brief summary of the API:
Observable:
- onNext
- onCompleted / onError
- onSubscribe
Observer (Subscriber):
- subscribe / unsubscribe
- request (only when back pressure is supported, see Observable vs Flowable)
Assuming familiarity with the API, let me explain key features of the contract through a range of examples.
Suppose a custom observer allowing to log API calls and, optionally, set threshold for emitted items. Once the threshold is met, the subscription is explicitly disposed of.
onNext is Optional
There is no guarantee an Observable will emit any items at all. The subscription is bound to end with either an error or completion, but no items need to be issued.
Observable .fromIterable(List.of()) .subscribe(new CustomObserver<>());
Ends normally, despite not having emitted any items:
onSubscribe onComplete
Completion or Error terminate Subscription
Once a Publisher (Observable) signals completion or error, the Subscriber(s) shouldn’t attempt to communicate (onNext) with it any longer. Publisher’s resources are supposed to be reclaimed and processing terminates.
Observable.fromCallable( () -> { throw new RuntimeException("Runtime failure"); } ).subscribe(new CustomObserver<>());
Subscribes and fails with the expected exception:
onSubscribe onError: Runtime failure
Serial Notifications
An Observable must notify its Observers in a serial fashion. Even if the notifications come from different threads, there must be a happens-before relationship between notifications.
Observable.range(0, 10) .observeOn(Schedulers.newThread()) .map(logEventContents()) .observeOn(Schedulers.newThread()) .map(logEventContents()) .observeOn(Schedulers.newThread()) .map(logEventContents()) .subscribe(new CustomObserver<>()); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
The subscribed observer receives asynchronous notifications from two different threads. As you can see the notification are received in their original order, despite of concurrency.
# Concurrent notifications as sent by the scheduler threads Scheduler thread: RxNewThreadScheduler-1, value: 0 Scheduler thread: RxNewThreadScheduler-1, value: 1 Scheduler thread: RxNewThreadScheduler-2, value: 0 Scheduler thread: RxNewThreadScheduler-1, value: 2 Scheduler thread: RxNewThreadScheduler-1, value: 3 Scheduler thread: RxNewThreadScheduler-2, value: 1 Scheduler thread: RxNewThreadScheduler-3, value: 0 ... # The Observer receives messages in their original order RxNewThreadScheduler-3: onNext: 0 RxNewThreadScheduler-3: onNext: 1 RxNewThreadScheduler-3: onNext: 2 RxNewThreadScheduler-3: onNext: 3 ...
Subscription triggers Notifications
An Observable can start to emit notification as soon as an Observer subscribes.
Observable .range(0, 10) .subscribe(new CustomObserver<>());
Subscription precedes incoming notifications:
onSubscribe onNext: 0 onNext: 1 onNext: 2 ... onNext: 9 onComplete
Graceful termination upon unsubscribe
When an Observer signals to unsubscribe, the Observable will look to eventually stop sending notifications to the Observer. Work in progress is completed, i.e. some notifications still might be signalled shortly after the call to unsubscribe, but the unsubscribed Observer won’t be notified.
// The subscriber unsubscribes after having received 10th item final CustomObserver<Integer> subscriber = new CustomObserver<>(10); Observable.range(0, 100).subscribe(subscriber); // Asking for more items.. // The subscriber is supposed to be unsubscribed at this point. Requests for new notifications shouldn't have any effect. subscriber.onNext(10);
As expected, asking for a new item has no effect, since the Observer has unsubscribed:
onSubscribe onNext: 0 onNext: 1 ... onNext: 9 Unsubscribed
Replay is not guaranteed
Suppose an Observable emitting events to a single Observer. If another Observer subscribes, it’s up to the Observable whether it will replay events that had already been sent to the original subscribed Observer.
PublishSubject<Integer> source = PublishSubject.create(); source.observeOn(Schedulers.newThread()).subscribe(new CustomObserver<>()); source.onNext(1); source.onNext(2); source.onNext(3); // A new subscriber joins source.observeOn(Schedulers.newThread()).subscribe(new CustomObserver<>()); source.onNext(4); // Terminate // Published events won't be replayed to new subscribers source.onComplete();
A Subject is a special construct, an Observable and an Observer at the same time. PublishSubject won’t replay published events to a new subscriber (RxNewThreadScheduler-5):
main: onSubscribe RxNewThreadScheduler-4: onNext: 1 RxNewThreadScheduler-4: onNext: 2 RxNewThreadScheduler-4: onNext: 3 main: onSubscribe RxNewThreadScheduler-4: onNext: 4 RxNewThreadScheduler-5: onNext: 4 RxNewThreadScheduler-4: onComplete RxNewThreadScheduler-5: onComplete
ReplaySubscriber, as the name suggests, is designed to replay published events to new subscribers.
[java] ReplaySubject&amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;lt;Integer&amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;gt; source = ReplaySubject.create(); ... [/java]
As expected, switching to ReplaySubscriber results into historical events being replayed to the new subscriber (RxNewThreadScheduler-7):
main: onSubscribe main: onSubscribe RxNewThreadScheduler-6: onNext: 1 RxNewThreadScheduler-6: onNext: 2 RxNewThreadScheduler-6: onNext: 3 RxNewThreadScheduler-7: onNext: 1 RxNewThreadScheduler-7: onNext: 2 RxNewThreadScheduler-7: onNext: 3 RxNewThreadScheduler-6: onNext: 4 RxNewThreadScheduler-7: onNext: 4 RxNewThreadScheduler-6: onComplete RxNewThreadScheduler-7: onComplete
Back Pressure is Optional
No back pressure should be assumed by default. See this post for details.
Observables such as the ReplaySubject shown in the previous section, don’t support back pressure and there are other means of how defend against excessive production. Rx ships with operators allowing various kinds of throttling or buffering.