Introduction
CompletableFuture
was one of the "small gifts" in Java 8. It is a clever class but not well-integrated into the rest of the JDK. Particularly, ExecutorService
still returns Future
s rather than CompletableFuture
s. No class in the JDK references completable futures.
The other odd thing about CompletableFuture
is that methods such as get()
declare throwing InterruptedException
but do not do so except under narrow circumstances: tasks which are interrupted and themselves throw InterruptedException
have those exceptions wrapped by ExecutionException
, making is difficult to handle interrupts in a general way. This is "baked into" the API, which provides only static factory methods accepting Runnable
or Supplier
(e.g., supplyAsync
), and clashes with standard ExecutorService
implementations.
Oddly the source for CompletableFuture
shows interrupts could have been addressed in a straight-forward way:
public T get() throws InterruptedException, ExecutionException { Object r; Throwable ex, cause; if ((r = result) == null && (r = waitingGet(true)) == null) throw new InterruptedException(); if (!(r instanceof AltResult)) { @SuppressWarnings("unchecked") T tr = (T) r; return tr; } if ((ex = ((AltResult)r).ex) == null) return null; if (ex instanceof CancellationException) throw (CancellationException)ex; // Hypothetical approach to exposing interrupts, NOT in the JDK: // if (ex instanceof InterruptedException) // throw (InterruptedException)ex; if ((ex instanceof CompletionException) && (cause = ex.getCause()) != null) ex = cause; throw new ExecutionException(ex); }
I suspect there is some deeper interaction I am missing that such an easy solution was avoided. (This also shows off nicely the new ability in Java 8 to annotate assignments.)
That I can tell CompletableFuture
was modeled on other libraries and languages, especially Guava's SettableFuture
and Akka's Promise
(formerly named CompletableFuture
). Tomasz Nurkiewicz points out the considerable value-add in the Java 8 variant. Koji Lin provides the slides.
Solution
Let's integrate CompletableFuture
into ExecutorService
.
The natural approach is to extend ExecutorService
, overriding methods which return Future
to return CompletableFuture
(covariant return from Java 5). This means updating methods which construct or return ExecutorService
to return, say, CompletableExecutorService
. My ideal solution uses a non-existent Java language feature, assignment to this
for delegation (alas not in this timeline). A practical solution is mixins. So let's write that:
public interface CompletableExecutorService extends ExecutorService { /** * @return a completable future representing pending completion of the * task, never missing */ @Nonnull @Override <T> CompletableFuture<T> submit(@Nonnull final Callable<T> task); /** * @return a completable future representing pending completion of the * task, never missing */ @Nonnull @Override <T> CompletableFuture<T> submit(@Nonnull final Runnable task, @Nullable final T result); /** * @return a completable future representing pending completion of the * task, never missing */ @Nonnull @Override CompletableFuture<?> submit(@Nonnull final Runnable task); }
A static factory method turns any ExecutorService
into a CompletableExecutorService
:
@Nonnull public static CompletableExecutorService completable( @Nonnull final ExecutorService threads) { return newMixin(CompletableExecutorService.class, new Overrides(threads), threads); }
The grunt work is in Overrides
:
public static final class Overrides { private final ExecutorService threads; private Overrides(final ExecutorService threads) { this.threads = threads; } @Nonnull public <T> CompletableFuture<T> submit( @Nonnull final Callable<T> task) { final CompletableFuture<T> cf = new UnwrappedCompletableFuture<>(); threads.submit(() -> { try { cf.complete(task.call()); } catch (final CancellationException e) { cf.cancel(true); } catch (final Exception e) { cf.completeExceptionally(e); } }); return cf; } @Nonnull public <T> CompletableFuture<T> submit(@Nonnull final Runnable task, @Nullable final T result) { return submit(callable(task, result)); } @Nonnull public CompletableFuture<?> submit(@Nonnull final Runnable task) { return submit(callable(task)); } }
What is UnwrappedCompletableFuture
? It handles the pesky issue mentioned above with interrupts:
private static final class UnwrappedCompletableFuture<T> extends CompletableFuture<T> { @Override public T get() throws InterruptedException, ExecutionException { return UnwrappedInterrupts.<T, RuntimeException>unwrap(super::get); } @Override public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return UnwrappedInterrupts.<T, TimeoutException>unwrap( () -> super.get(timeout, unit)); } @FunctionalInterface private interface UnwrappedInterrupts<T, E extends Exception> { T get() throws InterruptedException, ExecutionException, E; static <T, E extends Exception> T unwrap( final UnwrappedInterrupts<T, E> wrapped) throws InterruptedException, ExecutionException, E { try { return wrapped.get(); } catch (final ExecutionException e) { final Throwable cause = e.getCause(); if (cause instanceof InterruptedException) throw (InterruptedException) cause; throw e; } } } }