Skip to Main Content

Java APIs

Announcement

For appeals, questions and feedback about Oracle Forums, please email oracle-forums-moderators_us@oracle.com. Technical questions should be asked in the appropriate category. Thank you!

Interested in getting your voice heard by members of the Developer Marketing team at Oracle? Check out this post for AppDev or this post for AI focus group information.

Reactive Programming with JDK 9 Flow API

Bob Rhubart-OracleSep 26 2016 — edited Sep 26 2016

Reactive programming is about processing an asynchronous stream of data items, where applications react to the data items as they occur. This article by Rahul Srivastava presents an example using the JDK 9 Flow API.


By Rahul Srivastava

What is Reactive Programming ?

Reactive programming is about processing an asynchronous stream of data items, where applications react to the data items as they occur. A stream of data is essentially a sequence of data items occurring over time. This model is more memory efficient because the data is processed as streams, as compared to iterating over the in-memory data.

In the Reactive Programming model, there is a Publisher and a Subscriber. The Publisher publishes a stream of data, to which the Subscriber is asynchronously subscribed.

The model also provides a mechanism to introduce higher order functions to operate on the stream by means of Processors. Processors transform the data stream without the need for changing the Publisher or the Subscriber. The Processor (or a chain of Processors) sit between the Publisher and the Subscriber to transform one stream of data to another. The Publisher and the Subscriber are independent of the transformation that happen to the stream of data.

push.png

Why Reactive Programming ?

  • Simpler code, making it more readable.
  • Abstracts away from boiler plate code to focus on business logic.
  • Abstracts away from low-level threading, synchronization, and concurrency issues.
  • Stream processing implies memory efficient
  • The model can be applied almost everywhere to solve almost any kind of problem.

JDK 9 Flow API

The Flow APIs in JDK 9 correspond to the Reactive Streams Specification, which is a defacto standard. The Reactive Streams Specification is one of the initiatives to standardize Reactive Programming. Several implementations already support the Reactive Streams Specification.

The Flow API (and the Reactive Streams API), in some ways, is a combination of ideas from Iterator and Observer patterns. The Iterator is a pull model, where the application pulls items from the source. The Observer is a push model, where the items from the source are pushed to the application. Using the Flow API, the application initially requests for N items, and then the publisher pushes at most N items to the Subscriber. So its a mix of Pull and Push programming models.

pull-push.png

The Flow API Interfaces (At a glance)

@FunctionalInterface

public static interface Flow.Publisher<T> {

public void    subscribe(Flow.Subscriber\<? super T> subscriber);

}

public static interface Flow.Subscriber<T> {

public void    onSubscribe(Flow.Subscription subscription);

public void    onNext(T item) ;

public void    onError(Throwable throwable) ;

public void    onComplete() ;

}

public static interface Flow.Subscription {

public void    request(long n);

public void    cancel() ;

}

public static interface Flow.Processor<T,R> extends Flow.Subscriber<T>, Flow.Publisher<R> {

}

The Subscriber

The Subscriber subscribes to the Publisher for the callbacks. Data items are not pushed to the Subscriber unless requested, but multiple items may be requested. Subscriber method invocations for a given Subscription are strictly ordered. The application can react to the following callbacks, which are available on the subscriber.

|

Callback

|

Description

|
|

onSubscribe

|

Method invoked prior to invoking any other Subscriber methods for the given Subscription.

|
|

onNext

|

Method invoked with a Subscription's next item.

|
|

onError

|

Method invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription.

If a Publisher encounters an error that does not allow items to be issued to a Subscriber, that Subscriber receives onError, and then receives no further messages.

|
|

onComplete

|

Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription.

When it is known that no further messages will be issued to it, a subscriber receives onComplete.

|

Sample Subscriber

import java.util.concurrent.Flow.*;

...

public class MySubscriber<T> implements Subscriber<T> {

private Subscription subscription;

@Override

public void onSubscribe(Subscription subscription) {

this.subscription = subscription;

subscription.request(1); //a value of ` Long.MAX_VALUE` may be considered as effectively unbounded

}

@Override

public void onNext(T item) {

System.out.println("Got : " + item);

subscription.request(1); //a value of ` Long.MAX_VALUE` may be considered as effectively unbounded

}

@Override

public void onError(Throwable t) {

t.printStackTrace();

}

@Override

public void onComplete() {

System.out.println("Done");

}

}

The Publisher

The publisher publishes the stream of data items to the registered subscribers. It publishes items to the subscriber asynchronously, normally using an Executor. Publishers ensure that Subscriber method invocations for each subscription are strictly ordered.

Example publishing a stream of data items to Subscribers using JDK's SubmissionPublisher

import java.util.concurrent.SubmissionPublisher;

...

//Create Publisher

SubmissionPublisher\<String> publisher = new SubmissionPublisher\<>();

//Register Subscriber

MySubscriber\<String> subscriber = new MySubscriber\<>();

publisher.subscribe(subscriber);

//Publish items

System.out.println("Publishing Items...");

String\[\] items = {"1", "x", "2", "x", "3", "x"};

Arrays.asList(items).stream().forEach(i -> publisher.submit(i));

publisher.close();

The Subscription

Links a Flow.Publisher and Flow.Subscriber. Subscribers receive items only when requested, and may cancel at any time, via the Subscription.

|

Method

|

Description

|
|

request

|

Adds the given number of n items to the current unfulfilled demand for this subscription.

|
|

cancel

|

Causes the Subscriber to (eventually) stop receiving messages.

|

The Processor

A component that acts as both a Subscriber and Publisher. The processor sits between the Publisher and Subscriber, and transforms one stream to another. There could be one or more processor chained together, and the result of the final processor in the chain, is processed by the Subscriber. The JDK does not provide any concrete Processors so it is left upto the individual to write whatever processor one requires.

Sample Processor to transform String to Integer

import java.util.concurrent.Flow.*;

import java.util.concurrent.SubmissionPublisher;

...

public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> {

private Function function;

private Subscription subscription;

public MyTransformProcessor(Function<? super T, ? extends R> function) {

super();

this.function = function;

}

@Override

public void onSubscribe(Subscription subscription) {

this.subscription = subscription;

subscription.request(1);

}

@Override

public void onNext(T item) {

submit((R) function.apply(item));

subscription.request(1);

}

@Override

public void onError(Throwable t) {

t.printStackTrace();

}

@Override

public void onComplete() {

close();

}

}

Sample code to transform data stream using processor

import java.util.concurrent.SubmissionPublisher;

...

//Create Publisher

SubmissionPublisher\<String> publisher = new SubmissionPublisher\<>();

//Create Processor and Subscriber

MyFilterProcessor\<String, String> filterProcessor = new MyFilterProcessor\<>(s -> s.equals("x"));

MyTransformProcessor\<String, Integer> transformProcessor = new MyTransformProcessor\<>(s -> Integer.parseInt(s));

MySubscriber\<Integer> subscriber = new MySubscriber\<>();

//Chain Processor and Subscriber

publisher.subscribe(filterProcessor);

filterProcessor.subscribe(transformProcessor);

transformProcessor.subscribe(subscriber);

System.out.println("Publishing Items...");

String\[\] items = {"1", "x", "2", "x", "3", "x"};

Arrays.asList(items).stream().forEach(i -> publisher.submit(i));

publisher.close();

Back pressure

Back pressure is built when Publishers are producing at a much faster rate than the rate at which the data items are consumed by the Subscribers. The size of the buffer where the unprocessed items are being buffered might be restricted. The Flow API does not provide any APIs to signal or deal with back pressure as such, but there could be various strategies one could implement by oneself to deal with back pressure. See how RxJava deals with back pressure.

Summary

Adding Reactive Programming API to JDK 9 is a good start. Many other products have also started to offer Reactive Progamming API to access their functionality. Though the Flow API allows programmers to start writing reactive programs, the eco system still has to evolve.

For example, a reactive program may still end up accessing DB using traditional APIs, maybe because not all DBs support API for Reactive Programming. – i.e. the APIs a reactive program may depend on, might not support reactive programming model yet.

References

About the Author

Rahul Srivastava is an ex-committer for the Xerces2-J project at Apache. He is currently Principal Member of the Technical Staff for the Application Server development team at Oracle.

Comments
Post Details
Added on Sep 26 2016
2 comments
72,030 views