Reactive programming is about processing an asynchronous stream of data items, where applications react to the data items as they occur. This article by Rahul Srivastava presents an example using the JDK 9 Flow API.
By Rahul Srivastava
What is Reactive Programming ?
Reactive programming is about processing an asynchronous stream of data items, where applications react to the data items as they occur. A stream of data is essentially a sequence of data items occurring over time. This model is more memory efficient because the data is processed as streams, as compared to iterating over the in-memory data.
In the Reactive Programming model, there is a Publisher and a Subscriber. The Publisher publishes a stream of data, to which the Subscriber is asynchronously subscribed.
The model also provides a mechanism to introduce higher order functions to operate on the stream by means of Processors. Processors transform the data stream without the need for changing the Publisher or the Subscriber. The Processor (or a chain of Processors) sit between the Publisher and the Subscriber to transform one stream of data to another. The Publisher and the Subscriber are independent of the transformation that happen to the stream of data.
Why Reactive Programming ?
- Simpler code, making it more readable.
- Abstracts away from boiler plate code to focus on business logic.
- Abstracts away from low-level threading, synchronization, and concurrency issues.
- Stream processing implies memory efficient
- The model can be applied almost everywhere to solve almost any kind of problem.
JDK 9 Flow API
The Flow APIs in JDK 9 correspond to the Reactive Streams Specification, which is a defacto standard. The Reactive Streams Specification is one of the initiatives to standardize Reactive Programming. Several implementations already support the Reactive Streams Specification.
The Flow API (and the Reactive Streams API), in some ways, is a combination of ideas from Iterator and Observer patterns. The Iterator is a pull model, where the application pulls items from the source. The Observer is a push model, where the items from the source are pushed to the application. Using the Flow API, the application initially requests for N items, and then the publisher pushes at most N items to the Subscriber. So its a mix of Pull and Push programming models.
The Flow API Interfaces (At a glance)
@FunctionalInterface
public static interface Flow.Publisher<T> {
public void subscribe(Flow.Subscriber\<? super T> subscriber);
}
public static interface Flow.Subscriber<T> {
public void onSubscribe(Flow.Subscription subscription);
public void onNext(T item) ;
public void onError(Throwable throwable) ;
public void onComplete() ;
}
public static interface Flow.Subscription {
public void request(long n);
public void cancel() ;
}
public static interface Flow.Processor<T,R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
}
The Subscriber
The Subscriber subscribes to the Publisher for the callbacks. Data items are not pushed to the Subscriber unless requested, but multiple items may be requested. Subscriber method invocations for a given Subscription are strictly ordered. The application can react to the following callbacks, which are available on the subscriber.
|
Callback
|
Description
|
|
onSubscribe
|
Method invoked prior to invoking any other Subscriber methods for the given Subscription.
|
|
onNext
|
Method invoked with a Subscription's next item.
|
|
onError
|
Method invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription.
If a Publisher encounters an error that does not allow items to be issued to a Subscriber, that Subscriber receives onError, and then receives no further messages.
|
|
onComplete
|
Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription.
When it is known that no further messages will be issued to it, a subscriber receives onComplete.
|
Sample Subscriber
import java.util.concurrent.Flow.*;
...
public class MySubscriber<T> implements Subscriber<T> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); //a value of ` Long.MAX_VALUE` may be considered as effectively unbounded
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1); //a value of ` Long.MAX_VALUE` may be considered as effectively unbounded
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
The Publisher
The publisher publishes the stream of data items to the registered subscribers. It publishes items to the subscriber asynchronously, normally using an Executor. Publishers ensure that Subscriber method invocations for each subscription are strictly ordered.
Example publishing a stream of data items to Subscribers using JDK's SubmissionPublisher
import java.util.concurrent.SubmissionPublisher;
...
//Create Publisher
SubmissionPublisher\<String> publisher = new SubmissionPublisher\<>();
//Register Subscriber
MySubscriber\<String> subscriber = new MySubscriber\<>();
publisher.subscribe(subscriber);
//Publish items
System.out.println("Publishing Items...");
String\[\] items = {"1", "x", "2", "x", "3", "x"};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
The Subscription
Links a Flow.Publisher and Flow.Subscriber. Subscribers receive items only when requested, and may cancel at any time, via the Subscription.
|
Method
|
Description
|
|
request
|
Adds the given number of n items to the current unfulfilled demand for this subscription.
|
|
cancel
|
Causes the Subscriber to (eventually) stop receiving messages.
|
The Processor
A component that acts as both a Subscriber and Publisher. The processor sits between the Publisher and Subscriber, and transforms one stream to another. There could be one or more processor chained together, and the result of the final processor in the chain, is processed by the Subscriber. The JDK does not provide any concrete Processors so it is left upto the individual to write whatever processor one requires.
Sample Processor to transform String to Integer
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
...
public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> {
private Function function;
private Subscription subscription;
public MyTransformProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit((R) function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}
Sample code to transform data stream using processor
import java.util.concurrent.SubmissionPublisher;
...
//Create Publisher
SubmissionPublisher\<String> publisher = new SubmissionPublisher\<>();
//Create Processor and Subscriber
MyFilterProcessor\<String, String> filterProcessor = new MyFilterProcessor\<>(s -> s.equals("x"));
MyTransformProcessor\<String, Integer> transformProcessor = new MyTransformProcessor\<>(s -> Integer.parseInt(s));
MySubscriber\<Integer> subscriber = new MySubscriber\<>();
//Chain Processor and Subscriber
publisher.subscribe(filterProcessor);
filterProcessor.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
System.out.println("Publishing Items...");
String\[\] items = {"1", "x", "2", "x", "3", "x"};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
Back pressure
Back pressure is built when Publishers are producing at a much faster rate than the rate at which the data items are consumed by the Subscribers. The size of the buffer where the unprocessed items are being buffered might be restricted. The Flow API does not provide any APIs to signal or deal with back pressure as such, but there could be various strategies one could implement by oneself to deal with back pressure. See how RxJava deals with back pressure.
Summary
Adding Reactive Programming API to JDK 9 is a good start. Many other products have also started to offer Reactive Progamming API to access their functionality. Though the Flow API allows programmers to start writing reactive programs, the eco system still has to evolve.
For example, a reactive program may still end up accessing DB using traditional APIs, maybe because not all DBs support API for Reactive Programming. – i.e. the APIs a reactive program may depend on, might not support reactive programming model yet.
References
About the Author
Rahul Srivastava is an ex-committer for the Xerces2-J project at Apache. He is currently Principal Member of the Technical Staff for the Application Server development team at Oracle.