Rx Observable Contract

Observable contract describes what assumptions are safe to make about interactions between an Observable and an Observer. Originally presented by Microsoft as a set of Rx design principles, it’s been formalised and captured in Rx documentation. This post highlights essential parts of the contracts and provides code examples for better understanding.

The Contract

In my previous post I established Rx builds upon the Observer pattern. Here is a brief summary of the API:

Observable:

  • onNext
  • onCompleted / onError
  • onSubscribe

Observer (Subscriber):

  • subscribe / unsubscribe
  • request (only when back pressure is supported, see Observable vs Flowable)

Assuming familiarity with the API, let me explain key features of the contract through a range of examples.

Suppose a custom observer allowing to log API calls and, optionally, set threshold for emitted items. Once the threshold is met, the subscription is explicitly disposed of.

onNext is Optional

There is no guarantee an Observable will emit any items at all. The subscription is bound to end with either an error or completion, but no items need to be issued.

Observable
  .fromIterable(List.of())
  .subscribe(new CustomObserver<>());

Ends normally, despite not having emitted any items:

onSubscribe
onComplete

Completion or Error terminate Subscription

Once a Publisher (Observable) signals completion or error, the Subscriber(s) shouldn’t attempt to communicate (onNext) with it any longer. Publisher’s resources are supposed to be reclaimed and processing terminates.

Observable.fromCallable(
  () -> { throw new RuntimeException("Runtime failure"); }
).subscribe(new CustomObserver<>());

Subscribes and fails with the expected exception:

onSubscribe
onError: Runtime failure

Serial Notifications

An Observable must notify its Observers in a serial fashion. Even if the notifications come from different threads, there must be a happens-before relationship between notifications.

Observable.range(0, 10)
  .observeOn(Schedulers.newThread())
  .map(logEventContents())
  .observeOn(Schedulers.newThread())
  .map(logEventContents())
  .observeOn(Schedulers.newThread())
  .map(logEventContents())
  .subscribe(new CustomObserver<>());
try {
  Thread.sleep(200);
} catch (InterruptedException e) {
  e.printStackTrace();
}

The subscribed observer receives asynchronous notifications from two different threads. As you can see the notification are received in their original order, despite of concurrency.

# Concurrent notifications as sent by the scheduler threads
Scheduler thread: RxNewThreadScheduler-1, value: 0
Scheduler thread: RxNewThreadScheduler-1, value: 1
Scheduler thread: RxNewThreadScheduler-2, value: 0
Scheduler thread: RxNewThreadScheduler-1, value: 2
Scheduler thread: RxNewThreadScheduler-1, value: 3
Scheduler thread: RxNewThreadScheduler-2, value: 1
Scheduler thread: RxNewThreadScheduler-3, value: 0
...
# The Observer receives messages in their original order
RxNewThreadScheduler-3: onNext: 0
RxNewThreadScheduler-3: onNext: 1
RxNewThreadScheduler-3: onNext: 2
RxNewThreadScheduler-3: onNext: 3
...

Subscription triggers Notifications

An Observable can start to emit notification as soon as an Observer subscribes.

Observable
  .range(0, 10)
  .subscribe(new CustomObserver<>());

Subscription precedes incoming notifications:

onSubscribe
onNext: 0
onNext: 1
onNext: 2
...
onNext: 9
onComplete

Graceful termination upon unsubscribe

When an Observer signals to unsubscribe, the Observable will look to eventually stop sending notifications to the Observer. Work in progress is completed, i.e. some notifications still might be signalled shortly after the call to unsubscribe, but the unsubscribed Observer won’t be notified.

// The subscriber unsubscribes after having received 10th item
final CustomObserver<Integer> subscriber = new CustomObserver<>(10);
Observable.range(0, 100).subscribe(subscriber);

//  Asking for more items..
// The subscriber is supposed to be unsubscribed at this point. Requests for new notifications shouldn't have any effect.
subscriber.onNext(10);

As expected, asking for a new item has no effect, since the Observer has unsubscribed:

onSubscribe
onNext: 0
onNext: 1
...
onNext: 9
Unsubscribed

Replay is not guaranteed

Suppose an Observable emitting events to a single Observer. If another Observer subscribes, it’s up to the Observable whether it will replay events that had already been sent to the original subscribed Observer.

PublishSubject<Integer> source = PublishSubject.create();
source.observeOn(Schedulers.newThread()).subscribe(new CustomObserver<>());

source.onNext(1);
source.onNext(2);
source.onNext(3);

// A new subscriber joins
source.observeOn(Schedulers.newThread()).subscribe(new CustomObserver<>());
source.onNext(4);

// Terminate
// Published events won't be replayed to new subscribers
source.onComplete();

A Subject is a special construct, an Observable and an Observer at the same time. PublishSubject won’t replay published events to a new subscriber (RxNewThreadScheduler-5):

main: onSubscribe
RxNewThreadScheduler-4: onNext: 1
RxNewThreadScheduler-4: onNext: 2
RxNewThreadScheduler-4: onNext: 3
main: onSubscribe
RxNewThreadScheduler-4: onNext: 4
RxNewThreadScheduler-5: onNext: 4
RxNewThreadScheduler-4: onComplete
RxNewThreadScheduler-5: onComplete

ReplaySubscriber, as the name suggests, is designed to replay published events to new subscribers.

[java]
ReplaySubject&amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;lt;Integer&amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;gt; source = ReplaySubject.create();
...
[/java]

As expected, switching to ReplaySubscriber results into historical events being replayed to the new subscriber (RxNewThreadScheduler-7):

main: onSubscribe
main: onSubscribe
RxNewThreadScheduler-6: onNext: 1
RxNewThreadScheduler-6: onNext: 2
RxNewThreadScheduler-6: onNext: 3
RxNewThreadScheduler-7: onNext: 1
RxNewThreadScheduler-7: onNext: 2
RxNewThreadScheduler-7: onNext: 3
RxNewThreadScheduler-6: onNext: 4
RxNewThreadScheduler-7: onNext: 4
RxNewThreadScheduler-6: onComplete
RxNewThreadScheduler-7: onComplete

Back Pressure is Optional

No back pressure should be assumed by default. See this post for details.
Observables such as the ReplaySubject shown in the previous section, don’t support back pressure and there are other means of how defend against excessive production. Rx ships with operators allowing various kinds of throttling or buffering.

Similar Posts