Java 9 Reactive Streams

Java 9 Reactive Streams contain a set of interfaces that follow the reactive streams specification.
This article is a part of series of articles that contain the features in Java 9. here is the list of changes.

  1. Java platform module system
  2. JShell – Interactive command line tool
  3. Reactive Streams
  4. Factory method for Collections
  5. Enhancements in Stream, CompletableFuture, java.util.Arrays, Stack walking API
  6. Project coin changes – e.g. private interface methods
  7. Jlink – tool to assemble modules
  8. Documentation updates, HTML5, javadoc search
  9. Process API changes, new internal representation for String
  10. Parser API for Nashorn
  11. MRJAR – Multi Release Jar

Before we describe the interfaces, lets look at some basic concepts:

What are reactive streams?

Reactive programming is based on the Reactive manifesto and primarily deals with asynchronously handling a stream of data . To give you an example, consider a weather mobile App that obtains data from a RESTful API. When the user clicks on a link, the process in the background kicks of a call to an API. However, instead of waiting for the response, the process simply returns back and another thread ‘subscribes’ to the API response. If the API returns an years worth of temperature data for plotting on a graph, it makes sense to render the data that has been obtained so far and continue subscribing for more data. The advantage is that the screen does not become unresponsive and the app can load a huge volume of data incrementally.
Before we explain a few other concepts, lets look at how the Java 9 reactive API is different from other reactive APIs such as RxJava.

Java 9 Reactive API vs RxJava

To answer this question, we need a bit of background. Some of the pioneers in reactive programming in java such as RxJava (1.x) and the Reactor project by Spring started developing their own implementation of the reactive manifesto and people soon realized that a common specification was needed. This gave birth to Reactive Streams. The Reactive streams describe a set of interfaces that enable asynchronous stream processing and non blocking backpressure (see below). Since RxJava 1.x could not be retrofitted to follow these interfaces, RxJava 2 was introduced. At the same time, the interfaces were incorporated into the Java specification in Java 9.
So the difference between Java 9 Reactive API and RxJava 2 is that the former specifies interfaces that follow the Reactive Stream discussions whereas the later implements those interfaces and scores of other methods that allow efficient handling of reactive streams.

What is asynchronous processing and non blocking backpressure

The whole idea of Reactive Streams is that the stream is handled asynchronously. I.e. the stream destination runs on a separate thread from the stream source. An obvious problem is what happens if the source emits more items than the destination can process? In such cases, backpressure implies that the destination does not accept more items than it can process and somehow signals upstream that it cannot handle more item (or asks for only a quantity of items that it can process). The backpressure has to be non blocking since the act of passing the backpressure message to the source should itself be asynchronous.

Java 9 Reactive Streams

There are four core interfaces in Java 9 Reactive Streams. The idea is that instead of the source (publisher) pushing items to the destination (subscriber), the subscriber asks for a certain number of items which the publisher sends as and when they are available. A subscriber subscribes to a publisher through a subscribe() method of the publisher. Here are the interfaces:

Java 9 Reactive Stream interfaces

public static interface Publisher<T> {
  public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
  public void onSubscribe(Subscription subscription);
  public void onNext(T item);
  public void onError(Throwable throwable);
  public void onComplete();
}

The interface that joins the Publisher and the Subscriber is called ‘Subscription’.

public static interface Subscription {
  public void request(long n);
  public void cancel();
}

The fourth interfaces is used to process items as they move from a Publisher to a Subscriber and consequently implements both the interfaces.

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

Overview of the reactive implementation

Java 9 Reactive Stream
Java 9 Reactive Stream
.
The steps in a Reactive application are:

  1. The Publisher subscribes a Subscriber using the ‘subscribe’ method of the publiser. ((1) in diagram above)
  2. The subscriber requests for ‘n’ items from the publisher ((2) in diagram above)
  3. The publisher sends an item to the subscriber asynchronously when it can. ( (3) in diagram above). The publisher maintains a buffer so that if the subscriber cannot process an item, the publisher keeps it in its buffer. It can either try again or discard some items for a subscriber

The JDK provides an implementation of the Publisher interface called ‘SubmissionPublisher’.
Here’s an example that uses SubmissionPublisher to create a publisher that emit temperature values. We create a SubmissionPublisher that publishes 9 values to the subscriber. The subscriber requests for 3 values initially and once it receives them, it asks for three more. In this example, we simulate a 0.5s processing time for each item that the subscriber receives. Since the publisher sends all items at once, it will have to buffer some items while the subscriber is processing it.

Here are the classes:
The Publisher:

package com.st.java9;

import java.util.concurrent.SubmissionPublisher;

public class Reactive {

	public static void main(String[] args) throws InterruptedException {
		// create a submission publisher
		SubmissionPublisher<Double> submissionPublisher = new SubmissionPublisher<>();
		submissionPublisher.subscribe(new TemperatureSubscriber());
		for (int i = 0; i < 9; i++) {
			// submit a randomly generated temperature value
			double value = Math.random() * 30;
			System.out.println("Submitting " + i + ":" + value);
			submissionPublisher.submit(value);
		}
		submissionPublisher.close();
	}
}

The Subscriber

package com.st.java9;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class TemperatureSubscriber implements Subscriber<Double> {

	private Subscription subscription;
	int pending = 0;
	int requests = 3;

	@Override
	public void onNext(Double item) {
		try {

			// stimulate an operation that takes 5s.
			Thread.sleep(500);
			System.out.println("Consuming " + Reactive.format.format(item));
			pending--;
			// start requesting more if only 1 or less is pending from the previous request.
			if (pending < 1) {
				System.out.println("Requesting " + requests);
				subscription.request(requests);
				pending = requests;
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void onError(Throwable throwable) {
		System.out.println(throwable.getMessage());
	}

	@Override
	public void onComplete() {
		System.out.println("Done");
	}

	@Override
	public void onSubscribe(Subscription subscription) {
		this.subscription = subscription;
		// request the first batch
		subscription.request(requests);
		pending = requests;
	}
}

Here’s how the output looks when run

The SubmissionPublisher has a few other methods too which we will cover in a later tutorial.

Conclusion to Java 9 Reactive Streams

Java 9 introduced 4 new interfaces for Reactive Stream processing and an implementation of a Publisher. The idea is that more libraries will start adhering to the reactive standard specified by this libraries. The Flow classes in RxJava2 already implement them and so do the Reactor framework by Spring.

Leave a Comment