CompletableFuture Overview

Sofiya
Khomyn

Software Engineer
@EPAM

CompletableFuture Overview


As the technologies around us develop at breakneck speed, we need to keep up. This article will give you insight into how to use Java 8’s CompletableFuture (CF) to facilitate writing multithreaded code. A single class that not only provides you with a strong API, but also introduces a whole new concept to Java. So, let’s delve into the world of CompletableFuture together!

Plain old Java, or Preconditions

For starters, let’s refresh our memories regarding what was difficult about previous Java versions.

Over the past few years, the world of programming has changed dramatically, pushing us to grow and broaden our outlook on how software should be written.

Multicore processors

Firstly, the use of multicore processors allows us to speed up our applications significantly. All we have to do is write software that runs in parallel. For instance, we can split a large task into smaller subtasks and make these run on different cores, processors or even machines. However, this requires putting substantial effort into writing applications that will run in parallel.

The diagrams below are an example of how the code would run.

p 1

Nowadays it can be done in more effective ways than working directly with threads, because these complicate our life a lot by:

  • increasing the code complexity;
  • causing troubles with synchronization;
  • allowing deadlocks;
  • complicating debugging and testing.

As an alternative, the fork/join framework (since Java 7) and parallel streams (since Java 8) can be used to simplify development.

Multiple data sources

Secondly, nowadays most applications are mash-ups. In other words, they use data from multiple sources and then aggregate it to simplify your users’ life. Obviously, here you want to get that response from the remote service ASAP. You don’t want to block.

Previously, we could have used a Callable interface, introduced the Future and submitted the computation to the executor. Unfortunately, this way there is a pretty good chance that our code will block for God knows how long. When what we really want here is a non-blocking approach.

Non-blocking fashion or the key to the concept

From the moment you make a request to that remote service until you get a response, your code has nothing to do. This is blocking – just waiting for the response. All the time the program spends in this state is completely wasted. But what if you could do other work instead of waiting?

This is where the asynchronous comes in – the method returns immediately, or at least before the computation is done, delegating it to another thread, which runs in parallel. This way, the program can move on to running some other tasks while the previous is still being executed.

 

p 3

This is the principle that CompletableFuture works on. It brings a whole new concept to Java – a concept of promises.

Concept

CF gives us the promise that a computation will eventually be completed. The result of the computation will be provided, even if the operation fails. The callback is attached and when the computation is ready, you get notified and can perform other actions. Meanwhile, your code is not stuck – it continues working on other tasks while the computation is still running.

Fascinating, isn’t it?

API

Besides a non-blocking fashion, CF has much more to offer. Let’s look at the API to get a better picture.

So, CF is a single class, added in Java 8 to the java.util.concurrent package. CF implements two interfaces: Future and CompletionStage. We will focus on the latter. CompletionStage offers nearly sixty methods, which allow it to perform more advanced operations.

Pitfalls

Note: All the examples log the thread name so that you can run the code and check many pitfalls by yourself.

  1. Simplest asynchronous computation
private static ExecutorService executor =
       Executors.newCachedThreadPool();
public static void main(String[] args)
       throws ExecutionException, InterruptedException {
   CompletableFuture promise = CompletableFuture
           .supplyAsync(() -> {
               System.out.println("Action ran in: " +
                       Thread.currentThread().getName());
               return 12;
           }, executor);
   int completedFuture = promise.get();
   System.out.println("The new CompletableFuture: " +
           completedFuture);
   executor.shutdown();
}

Output:

Action ran in: pool-1-thread-1
The new CompletableFuture: 12

This is the canonical way of creating a new CF. As a result you get an actual type (Integer), wrapped into CF: CompletableFuture<Integer>.

Also, take into consideration that the custom executor is specified here. Not specifying it is rather tricky. And for a good reason – because the new CF will be created in the common pool of  ForkJoinPool (for more details, see https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html#commonPool– ). Why is that bad? Well, we don’t have any control over this pool. Moreover, you may have to wait for the execution, because the pool is filled with a lot of other tasks. That’s why the better approach is to specify it. This way, we get something like submit() in Callable.

So, specify the custom executor.

  1. Chaining callbacks
CompletableFuture
       .supplyAsync(() -> {
           LOGGER.info("CF created in: " +
                   Thread.currentThread().getName());
           return quoteUtil.getQuote();
       }, executor)
       .thenApply(result -> {
           LOGGER.info("Action on CF ran in: " +
                   Thread.currentThread().getName());
           return result.toString().concat(MOVIE_NAME);
       })
       .thenApplyAsync(result -> {
           LOGGER.info("Another action ran in: " +
                   Thread.currentThread().getName());
           return result.length();
       }, executor)
       .thenAccept(System.out::println);

long timeout = 2000;
executor.awaitTermination(timeout, TimeUnit.MILLISECONDS);
executor.shutdown();

Output:

2017-01-23 22:53:39 INFO  ThenApplyAsync:29 - CF created in: pool-1-thread-1

2017-01-23 22:53:39 INFO  ThenApplyAsync:34 - Action on CF ran in: pool-1-thread-1

2017-01-23 22:53:39 INFO  ThenApplyAsync:39 - Another action ran in: pool-1-thread-2

86

Seems straightforward, doesn’t it?

Well, by using the thenApply method we can chain the callbacks and modify the CF as it accepts and returns the Future of a predefined type.

Also, you can use thenApplyAsync for the same purposes. Here is where it gets more interesting.

Almost all the CF methods have async variants. They make the computation execute in a different thread. It is a bit of a pitfall, as there may be no thread available and you have to wait. This way, the reaction on the completion of computation won’t be as fast as you might expect. Moreover, some extra time will be spent on the context switch. So, simply avoid overusing the asynchronous.

Finally, you can use thenAccept to attach a final callback. It takes a Consumer, which is responsible for handling the result when it is ready.

At this point, we’ve seen how thenApply works. Can you see the next pitfall coming?

Well, let’s look at it.

CompletableFuture<CompletableFuture> promise = CompletableFuture
        .supplyAsync(() -> quoteUtil.getQuote(), executor)
        .thenApply(result ->
                CompletableFuture.supplyAsync(() ->
                        quoteUtil.appendQuote(result)));

In this case, the result type of this computation includes two CFs, nested in each other. Unfortunately, thenApply doesn’t get it that we don’t need those nestings.

Here is where thenCompose comes in.

 CompletableFuture promise = CompletableFuture
       .supplyAsync(() ->
               quoteUtil.getQuote(), executor)
       .thenCompose(result ->
               CompletableFuture.supplyAsync(() ->
                       quoteUtil.appendQuote(result)));

Here we’ve successfully merged the results of two dependent computations into one as a solution.

So, be careful with nestings.

  1. Working with parallel computations
CompletableFuture quote = CompletableFuture
       .supplyAsync(() ->
               quoteUtil.getQuote(), executor);
CompletableFuture releaseYear = CompletableFuture
       .supplyAsync(() -> "\n1985");
quote
       .thenCombine(releaseYear,
               (String quoteResult, String releaseYearResult) ->
                       quoteResult.concat(releaseYearResult))
       .thenAccept(System.out::println);

In this example we successfully merged the results of two independent computations into one. By independent, I mean running in parallel threads.

Keep in mind that thenCombine works only when the two computations have already completed. Also, the order is not guaranteed – the second computation may start (and finish) executing before the first one.

So, everything is done in a non-blocking fashion.

  1. Working with a few CF instances

Up to this point we’ve been working with just two instances of CFs. What if we need more?

Let’s introduce three different CFs and wait for the completion of:

CompletableFuture quote = CompletableFuture
       .supplyAsync(() ->
               quoteUtil.getQuote());

CompletableFuture<List> quotes = CompletableFuture
       .supplyAsync(() ->
               quoteUtil.getQuotes());
CompletableFuture releaseYear = CompletableFuture
       .supplyAsync(() ->
               "1985");

all of them:

CompletableFuture promise = CompletableFuture.
       allOf(quote, quotes, releaseYear);

or only the fastest of them:

CompletableFuture<Object> anyOf = CompletableFuture
       .anyOf(quote, quotes, releaseYear)

At first glance, these methods look pretty simple, without any pitfalls. But, look closely…

  • when waiting for the result of execution of all futures, we get CompletableFuture<Void>. Isn’t that weird? Why not return the List of some type… Hmmm, which type should it be…

Frankly speaking, CompletableFuture<Void> seems like a good solution, as the types of Futures we are waiting for are not obliged to be the same.

  • when waiting for the result of execution of the fastest future, we get CompletableFuture<Object>. All other computations continue running in background.

The rule here is that the one who is the fastest always wins. Thus, if the fastest computation fails with the exception, this exception (wrapped into CompletionException) will be the final result. If the fastest is OK and some other computation fails, the exception gets ignored as we’ve already got the desired result.

So, keep track of the return types.

  1. Exceptions

How do you think exceptions are handled when working with CF-specific methods? Can you predict what’s going to happen as a result of the following computation?

CompletableFuture quoteLength = CompletableFuture
       .supplyAsync(() ->
               quote.length(), executor);
quoteLength
       .thenApply(result -> result + 12)
       .thenAccept(System.out::println)

Output:

Process finished with exit code 0

Shouldn’t a NullPointerException be here instead? Unfortunately, the exception simply gets lost, swallowed along the way.

Let’s try adding: quoteLength.get();

Output:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.NullPointerException

Finally, we get notified about the exception that’s occurred. Again, in the wrong way.

The CF policy about exceptions is that we should get an original exception, wrapped into CompletionException.

Obviously, thenApply doesn’t know how to handle them correctly. That’s why CF introduces specific methods for exception handling:

  • handle:
String quote = null;
CompletableFuture quoteLength = CompletableFuture
       .supplyAsync(() ->
               quote.length(), executor);
CompletableFuture recovered =
       quoteLength
               .handle((result, throwable) -> {
                   if (throwable != null) {
                       return "No quote available: "
                               + throwable;
                   } else {
                       return result.toString();
                   }
               });

recovered.thenAccept(System.out::println);
Output:

No quote available

This method takes only one parameter and gives the possibility of logging the error.

It can be used on each chain of the callbacks. The reason is that it will be executed only if the computation fails.

So, use exceptionally() and handle() for exception handling with CF.

Takeaways

Putting everything in a nutshell, I hope that after reading this article you:

  • understand the input CF makes;
  • can avoid the pitfalls when using it;
  • remember to specify the custom executor when working with CF;
  • know when nestings can occur & how to solve this;
  • keep track of the return types;
  • handle exceptions in the right way;
  • are looking forward for my next article. ☺