by José Paumard
New, elegant ways to process data asynchronously
Java SE 8 brought so many new things to the Java platform that some of them have probably been left in the shadows. Not all applications are using the java.util.concurrent
package, even though the primitives provided in this package are extremely useful for writing correct concurrent code.
This package saw several very nice additions in Java 8. The ones we discuss in this article are the CompletionStage
interface and the CompletableFuture
implementing class. Along with the Future
interface, they provide very nice patterns for building asynchronous systems.
Problem Statement
Let's start with the following piece of code. This is not meant to be Java code, but merely metalanguage code. We do not care about which API provides the methods written here nor about the classes used.
queryEngine.select("select user from User user")
.forEach(user -> System.out.println(user));
We have here a query engine that launches a Java Persistence Query Language (JPQL) type of request on a database. Once we have the result of this query, we would like to simply print the result. The querying of the database might be slow, so we would like to execute this code in a separate thread and trigger the printing of the result when it is available. Once we have launched this task, we do not really want to take care of it anymore.
What tools do we have in Java 7 to create an API that would implement this code? We can wrap the task to be executed in a Callable
, and submit this object to an ExecutorService
. This pattern was introduced in Java 5 and is now well known.
Callable<String> task = () -> "select user from User";
Future<String> future = executorService.submit(task);
The only way to get the result from a future
object is to call its get()
method in the thread that submitted the task. This method is blocking, so this call will block the thread until the result is available to print.
This is exactly where the CompletionStage
comes to the rescue.
First Chaining Pattern
Let's rewrite the submission of the task using one of the CompletionStage
patterns.
This pattern:
executor.submit(() -> {
() -> "select user from User";
});
Becomes the following:
CompletableFuture<List<User>> completableFuture =
CompletableFuture.supplyAsync(() -> { () -> dbEngine.query("select user from User"); }, executor);
Instead of passing our Callable
to the submit()
method of an ExecutorService
, we pass it to the static supplyAsync()
method of CompletableFuture
. This method can also take an Executor
as a second parameter, giving the client a choice for the pool of threads that is going to execute the Callable
.
It returns an instance of CompletableFuture
, a new class from Java 8. On this object, we can call the following:
completableFuture.thenAccept(System.out::println);
The consumer passed to the thenAccept()
method will be called automatically—without any client intervention—when the result is available. No more thread blocking as in the previous case.
What Is a CompletionStage
?
In a nutshell, a CompletionStage
is a model that carries a task. We will see in the following sections that a task can be an instance of Runnable
, Consumer
, or Function
. The task is an element of a chain. CompletionStage
elements are linked together in different ways along the chain. An "upstream" element is a CompletionStage
that is executed before the element we are considering. Consequently, a "downstream" element is a CompletionStage
that is executed after the element we are considering.
The execution of a CompletionStage
is triggered upon the completion of one or more upstream CompletionStage
s. Those CompletionStage
s might return values, and these values can be fed to this CompletionStage
. The completion of this CompletionStage
can also produce a result and trigger other downstream CompletionStage
s.
So a CompletionStage
is an element of a chain.
The CompletionStage
interface has an implementation called CompletableFuture
. Note that CompletableFuture
is also an implementation of the Future
interface. CompletionStage
does not extend Future
.
A task has a state:
- It might be running.
- It might be completed normally and might have produced a result.
- It might be completed exceptionally and might have produced an exception.
More Methods from Future
Future
defines five methods in three categories:
cancel()
, which is meant to cancel a running task
isCanceled()
and isDone()
to check whether the task is still running
get()
, which comes in two flavors, the second one taking a timeout
CompletableFuture
adds six new Future
-like methods.
The first two methods are join()
and getNow(value)
. The first, join()
, blocks until the CompletableFuture
is completed, just like the old get()
method does. The main difference is that the join()
method does not throw a checked exception, leading to simpler patterns. The getNow(value)
is similar. It returns immediately, with the value provided in case this CompletableFuture
has not been completed yet. Note that this call does not force the CompletableFuture
to complete.
The four remaining methods force the future to complete, either with a value or exceptionally, and they can override the value produced by this future if it is already completed.
- The
complete(value)
method completes the CompletableFuture
if it has not been completed, and it sets its value to the passed value. If the CompletableFuture
has already completed, its return value is not changed. If you need to change this value, the method you want to call is the obtrude(value)
method. This method does change the value of the CompletableFuture
, even if it has already completed. This last method should be used with care and only in error recovery situations.
- Another pair of methods works the same way, but they force the
CompletableFuture
to complete exceptionally: completeExceptionally(throwable)
and obtrudeExceptionally(throwable)
. The first one throws an unchecked exception if the CompletableFuture
has not completed, and the second one forces the CompletableFuture
to change its state.
How to Create a CompletableFuture
There are several patterns for creating CompletableFuture
s.
Creating a Completed CompletableFuture
The first pattern presented here creates a CompletableFuture
that is already completed. It might seem odd to create such a Future
, but it can be very useful in a testing environment.
CompletableFuture<String> cf =
CompletableFuture.completedFuture("I'm done!");
cf.isDone(); // return true
cf.join(); // return "I'm done"
Creating a CompletableFuture
from a Task
When it is passed the first pattern, a CompletableFuture
can be built on two kinds of tasks: a Runnable
, which does not take any argument and does not return anything, and a Supplier
, which also takes no argument and which produces an object. In both cases, it is possible to pass an Executor
to set the pool of threads that will execute this task.
There are two patterns for each task:
CompletableFuture<Void> cf1 =
CompletableFuture.runAsync(Runnable runnable);
CompletableFuture<T> cf2 =
CompletableFuture.supplyAsync(Supplier\<T> supplier);
If no ExecutorService
is supplied, the tasks will be executed in the common fork/join pool, the same pool that is used for the parallel execution of streams.
Runnable runnable = () -> {
System.out.println("Executing in " +
Thread.currentThread().getName());
};
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> cf =
CompletableFuture.runAsync(runnable, executor);
cf.thenRun(() -> System.out.println("I'm done"));
executor.shutdown();
The result of the execution of the code above is the following:
Executing in pool-1-thread-1
I'm done
In this case, the Runnable
is executed in the thread pool executor
we created.
Building a CompletableFuture
Chain
As we saw in the introduction of this article, a CompletableFuture
is an element of a chain. We saw in the previous section how to create the first element of such a chain from a task (a Runnable
or a Supplier
). Now let's see now how we can chain others tasks to this one. In fact, we already saw a first hint of this in our previous example.
Tasks for an Element of the Chain
This first task is modeled by a Runnable
or a Supplier
, two functional interfaces (you could say functions), which do not take any argument and which might or might not return something.
The second element of the chain and the further ones could take the result of the previous element, if there is one. So we need different functions on which to build those elements. Let's try to understand the ones we need.
The previous element of the chain might or might not produce a result. So our functions should take one object or no object. This element might or might not produce a result. So our functions should return one object or no object. That makes four possible cases. Among those four possible functions, the one that does not take any result and produces a value should be discarded, because it is not an element of a chain; it is the starting point of a chain, which we already saw in the previous section.
Table 1 shows the result, with the name of the functions used in the CompletableFuture
API.
Table 1. Four possible functions
| Takes a Parameter? | Returns void
| Returns R
|
| Takes T
| Consumer<T>
| Function<T, R>
|
| Does not take anything | Runnable
| Not an element of a chain |
Types of Chaining
Now that we have a good idea of the tasks the API supports, let's examine what chaining means. We assumed so far that chaining is about triggering a task on the result of another task, passing the result of the first one as a parameter to the second one. This is the basic one-to-one chaining.
We can also compose the elements instead of chaining them. This makes sense only for tasks that take the result of the previous task and provide an object wrapped in another CompletableFuture
. This is once more a one-to-one relation (not chaining, because this is composition).
But we can also build a tree-like structure, where a task is triggered on two upstream tasks instead of one. We can imagine a combination of the two provided results, or a situation in which the current element is triggered on the first upstream element, which can provide a result. Both cases make sense, and we will see examples for them.
Choosing an ExecutorService
At last, we want to be able to decide what ExecutorService
(that is, pool of threads) is going to execute our tasks. There are many cases in which we want to have a say on this:
- One of our tasks might be updating a graphical user interface. In that case, we want it to be run in the human-machine interface (HMI) thread. This is the case in Swing, JavaFX, and Android.
- We might have I/O tasks or computation tasks that need to be executed in specialized pools of threads.
- We might have visibility issues in our variables that need a further task to be executed in the same thread as the first one.
- We might want to execute this task asynchronously, in the default fork/join pool.
In all these cases, we might have to pass an ExecutorService
as a parameter.
Lots of Methods to Implement
That makes a lot of methods in the CompletableFuture
class! Three types of tasks, times four types of chaining and composition, times three ways of specifying which ExecutorService
we want this task to be run. We have 36 methods to chain tasks. This is probably one of the elements that make this class complex: the high number of available methods.
Seeing them one by one would be extremely tedious, so let's have a look at a selection of them.
Selection of Patterns
Here are descriptions of some of the available patterns.
Some One-to-One Patterns
In this case, from the first CompletableFuture
, we create a CompletableFuture
that will execute its task when the first one is completed.
CompletableFuture<String> cf1 =
CompletableFuture.supplyAsync(() -> "Hello world");
CompletableFuture<String> cf2 =
cf1.thenApply(s -> s + " from the Future!");
There are three "then-apply" methods. All of them take a function as a parameter, which takes the result of the upstream element of the chain, and produces a new object from it.
We can add one step to our chain. This time, our call takes a Consumer<String>
as a parameter and does not produce any result.
CompletableFuture<Void> cf3 =
cf2.thenAccept(System.out::println);
Let's add one last step to this chain. This last call takes a Runnable
as a parameter—a function that does not take any parameter—and produces no result.
CompletableFuture<Void> cf4 =
cf3.thenRun(() -> System.out.println("Done processing this chain");
The way the names of these methods are built is very clear: then, followed by the name of the method of the function (functional interface) we take as a parameter (run
for Runnable
, accept
for Consumer
, and apply
for Function
). All these methods execute their tasks in the same pool of threads as the upstream task.
Then, these methods can take a further suffix: async. An async
method executes its task in the default fork/join pool, unless it takes an Executor
, in which case, the task will be executed in this Executor
.
We could have written cf4
in this way:
CompletableFuture<Void> cf4 =
cf3.thenRunAsync(() -> ...);
In this case, the provided Runnable
would have been executed in the common fork/join pool.
Some Two-to-One Combining Patterns
Combining patterns are patterns in which the task we write takes the results of two upstream tasks. Two functions can be used in this case: BiFunction
and BiConsumer
. It is also possible to execute a Runnable
in these patterns. Table 2 shows the three base methods.
Table 2. Three base methods for two-to-one combining patterns
| Method | Description |
|
<U, R> CompletionStage<R>
thenCombine(CompletionStage<U> other, BiFunction<T, U, R> action)
| Combines the result of this
and other
in one, using a BiFunction
|
|
<U> CompletionStage<Void>
thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action)
| Consumes the result of this
and other
, using a BiConsumer
|
|
<U> CompletionStage<Void>
runAfterBoth(CompletionStage<U> other, BiConsumer<T, U> action)
| Triggers the execution of a Runnable
on the completion of this
and other
|
These methods can also take an async
suffix, which has the same semantics as the set of methods of the previous section.
Some Two-to-One Selecting Patterns
This last category of patterns also contains two-to-one patterns. But this time, instead of executing the downstream element once, the two upstream elements are completed, and the downstream element is executed when one of the two upstream elements is completed. This might prove very useful when we want to resolve a domain name, for instance. Instead of querying only one domain name server, we might find it more efficient to query a group of domain name servers. We do not expect to have different results from the different servers, so we do not need more answers than the first we get. All the other queries can be safely canceled.
This time, the patterns are built on one result from the upstream element, because we do not need more. These methods have the either key word in their names. The combined elements should produce the same types of result, because only one of them will be selected.
Table 3. Three base methods for two-to-one selecting patterns
| Method | Description |
|
<R> CompletionStage<R>
applyToEither(CompletionStage<T> other, Function<T, R> function)
| Selects the first available result from this
and other
, and applies the function
to it |
|
<R> CompletionStage<R>
acceptEither(CompletionStage<T> other, Consumer<T> consumer)
| Selects the first available result from this
and other
, and passes it to the consumer
|
|
<R> CompletionStage<R>
runAfterEither(CompletionStage<T> other, Runnable action)
| Runs the provided action
after the first result
from this
and other
have been made available |
These methods can also take an async
suffix, which has the same semantics as the set of methods from the previous section.
Examples
Let's look at a couple of examples.
Testing a Long-Running Call in Jersey
Let's consider the following code, extracted from the Jersey documentation.
@Path("/resource")
public class AsyncResource {
@Inject
private Executor executor;
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
executor.execute(() -> {
String result = longOperation();
asyncResponse.resume(result);
});
}
}
This code is a basic REST service, which is calling an expensive operation. The classical way to deal with this is to call this long operation in another thread asynchronously. This method does not explicitly generate a response; it is the Jersey implementation that does that.
The problem we face here is the following: how can we unit-test this method? Testing the longOperation()
call in itself is not an issue: we can unit-test this method separately. What we need to test here is that the result
object is correctly passed to the resume()
method of the asyncResponse
object. This can easily be done with a mock object framework, such as Mockito, for instance. The problems we are facing are the following:
- The
executor.execute()
call is executed in the "main" thread.
- But the
asyncResponse.resume()
call is executed asynchronously in another thread, at a time in the future that we do not know.
What we need in our test is some kind of a callback that is going to be called once the asyncResponse.resume()
is called, so that we can test our mocks. The test of our mocks looks like the following:
Mockito.verify(mockAsyncResponse).resume(result);
We need to run this simple code:
- Once the
resume()
method has been called
- If possible, in the same thread as the one that executed the
resume()
call; that way, we are sure that we will not have any concurrent issues (especially visibility) in our mocks
This is where the CompletionStage
framework comes to the rescue! Instead of passing the Runnable
to the executor.execute()
method, we create a CompletionStage
with it.
This pattern:
executor.submit(() -> {
String result = longOperation();
asyncResponse.resume(result);
});
Becomes the following:
CompletableFuture<Void> completableFuture =
CompletableFuture.runAsync(() -> {
String result = longOperation();
asyncResponse.resume(result);
}, executor);
And because a CompletionStage
can trigger other tasks, we can add this code to our test:
completableFuture
.thenRun(() -> {
Mockito.verify(mockAsyncResponse).resume(result);
}
);
This code does exactly what we need:
- It is triggered by the completion of the
Runnable
of the previous CompletionStage
.
- It is executed in the same thread.
To implement this solution, we need to create a second public method in our Jersey class, which returns this CompletableFuture
. If we modify the return type of a Jersey method, Jersey will try to build a response with this return type, converting it to XML or JSON. This will not work very well with a CompletableFuture
.
The complete testing pattern is thus the following:
1. Create and train our mocks:
String result = Mockito.mock(String.class);
AsyncResponse response = Mockito.mock(AsyncResponse.class);
Runnable train = () -> {
Mockito.doReturn(result).when(response).longOperation();
}
Runnable verify = () -> Mockito.verify(response).resume(result);
2. Create the call and verify task:
Runnable callAndVerify = () -> {
asyncResource.executeAsync(response).thenRun(verify); }
3. Then create the task to be tested:
ExecutorService executor = Executors.newSingleThreadExecutor();
AsyncResource asyncResource = new AsyncResource();
asyncResource.setExecutorService(executor);
CompletableFuture
.runAsync(train, executor)
.thenRun(callAndVerify);
Because this is a unit test, we might want to make it fail if the response is not seen after a given amount of time. We can do that with the get()
method from Future
implemented in CompletableFuture
.
Analyzing the Links of a Web Page Asynchronously
Let's write some asynchronous code that will automatically analyze the links of a web page and display them in a Swing panel.
We want to do the following:
- Read the content of the web page.
- Then get the links from this page.
- Then display them in a Swing panel.
Remember that modifying a Swing component should be done from the proper thread, and we certainly do not want to run long tasks in this thread.
The complete pattern is simple:
CompletableFuture.supplyAsync(
() -> readPage("[http://whatever.com/](http://whatever.com/)")
)
.thenApply(page -> linkParser.getLinks(page))
.thenAcceptAsync(
links -> displayPanel.display(links),
executor
);
The first step is to create a Supplier
that is executed asynchronously. It returns the content of the web page as a String (for instance).
Then the second step gets this page and passes it to the link parser. It is a function that returns a List<String>
(for instance). These two first tasks are executed in the same thread. We could change that in case we have a first pool of threads dedicated to I/O operations and a second one for CPU operations.
Then, the last step just takes the list of links and displays it. Now this task accesses a Swing component, so it should be executed in the Swing thread. So we pass the right executor as a parameter to do that.
The nice thing is this: the Executor
interface is a functional interface. We can implement it with a lambda:
Executor executor = runnable -> SwingUtilities.invokeLater(runnable);
We can leverage the method-reference syntax to write the final version of this pattern:
CompletableFuture.supplyAsync(
() -> readPage("[http://whatever.com/](http://whatever.com/)")
)
.thenApply(Parser::getLinks)
.thenAcceptAsync(
DisplayPanel::display,
SwingUtilities::invokeLater
);
CompletableFuture
s along with lambdas and method references allow for the writing of very elegant patterns.
Exception Handling
The CompletionStage
API also exposes exception handling patterns. Let's see that in an example.
Suppose we have the processing chain shown in Figure 1.
data:image/s3,"s3://crabby-images/f1146/f11465d1e4687883a431946fcb41acbfe5ee893a" alt="f1.png"
Figure 1. Processing chain
All these CompletableFuture
s are chained together using the patterns we have seen in the previous sections.
Suppose now that CF21 raises an exception. If nothing has been written to handle this exception, all the downstream CompletableFuture
s are in error. This means two things:
- The call to
isCompletedExceptionally()
returns true
on the CF21, CF31, and CF41 CompletableFuture
s.
- The call to
get()
on these objects throws an ExecutionException
, which causes the root exception raised by CF21.
We can handle exceptions in CompletableFuture
s chains using the pattern shown in Figure 2.
cf30 = cf21.exceptionally();
data:image/s3,"s3://crabby-images/5c9ab/5c9abdbe82e44ecc269a3f3df229e61bb4ee3965" alt="f2.png"
Figure 2. Pattern to handle exceptions
This pattern creates a CompletableFuture
that has the following properties:
- If CF21 completes normally, then CF30 returns the same value as CF21, transparently.
- If CF21 raises an exception, then CF30 is able to catch it, and can transmit a normal value to CF31.
There are several methods to do that, with different ways of accepting the exception.
The exceptionally(Function<Throwable, T> function)
call is the simplest one. It returns a CompletionStage
that will complete normally if the upstream CompletionStage
also completes normally. The result returned is the same as the result of the upstream CompletionStage
. On the other hand, if this upstream CompletionStage
raises an exception, this exception is passed to the provided function. The returned CompletionStage
then completes normally returning the result of the provided function. There is no asynchronous version of this method.
The handle(BiFunction<T, Throwable, R> bifunction)
call has the same semantics. It returns a CompletionStage
that completes normally with the result of the provided bifunction
. This bifunction
is called with a null exception if the upstream CompletionStage
completes normally, or it is called with a null result if it completes exceptionally. In both cases, the returned CompletionStage
completes normally. This method has two sister methods called handleAsync(
). These two methods work the same way, but asynchronously, in another executor. This executor can be provided as a parameter. If it isn't, then the common fork/join pool is used.
The third method that can handle exceptions is whenComplete(BiConsumer<T, Throwable> biconsumer)
. Whereas handle()
returns a CompletionStage
that completes normally, the CompletionStage
returned by whenComplete()
does not. It follows the behavior of the CompletionStage
it is built on. So if the upstream CompletionStage
completes exceptionally, the CompletionStage
returned by whenComplete()
also completes exceptionally. The provided biconsumer
is called with the returned value of the upstream CompletionStage
and its returned value. As in the handle()
case, one of these two objects is null. The biconsumer
is just there to consume those values; it does not return anything. So it is merely a callback that does not interfere in the processing pipeline of CompletionStage
s. As for the handle()
method, this method also has two sister methods called whenCompleteAsync()
. These two methods work asynchronously, either in the common fork/join pool or in the provided executor.
Conclusion
The CompletionStage
interface and CompletableFuture
class bring new ways of processing data asynchronously. This API is quite complex, mainly due to the number of methods exposed by the new interface and the new class, but that makes this API very rich with lots of opportunities for finely tuning asynchronous data processing pipelines to perfectly suit the needs of your applications.
This API is built on lambda expressions, leading to very clean and very elegant patterns. It gives a fine control for which thread should execute each task. It also allows the chaining and composition of tasks in a very rich way, and it has a very clean way of handling exceptions.
See Also
About the Author
José Paumard is an assistant professor at the Institut Galilée (Université Paris 13), and has a PhD in applied mathematics from the ENS de Cachan. He has been teaching about Java technologies at the university since 1998. He has also worked as an independent consultant for twenty years and is a well-known expert Java/Java EE/software craftsman and trainer. Paumard gives talks at conferences, including JavaOne and Devoxx. He also writes technical articles for various media including Java Magazine and Oracle Technology Network. Passionate about education, he publishes massive open online courses (MOOC) for several companies, for example, for Oracle Virtual Technology Summit, PluralSight, Microsoft Virtual Academy, and Voxxed. He also is a member of the Java community in Paris, has been one of the lead members of the Paris JUG for six years, and is cofounder of Devoxx France. Follow him @JosePaumard.
Join the Conversation
Join the Java community conversation on Facebook, Twitter, and the Oracle Java Blog!