Contents
- Creating Futures
- Chaining: thenApply, thenCompose, thenAccept
- Combining Futures: thenCombine, allOf, anyOf
- Exception Handling
- Custom Executors and Async Variants
There are several ways to create a CompletableFuture: run a task asynchronously, supply a value, or complete it manually.
import java.util.concurrent.CompletableFuture;
// 1. Already-completed future (useful in tests / short-circuit paths)
CompletableFuture<String> done = CompletableFuture.completedFuture("hello");
// 2. Run a Runnable asynchronously (no return value)
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
System.out.println("Running in: " + Thread.currentThread().getName());
});
// 3. Supply a value asynchronously (Supplier<T>)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// Runs in ForkJoinPool.commonPool() by default
return fetchUserName(42);
});
// 4. Manually completing a future (acts as a promise)
CompletableFuture<String> promise = new CompletableFuture<>();
// On another thread or callback:
promise.complete("result"); // normal completion
promise.completeExceptionally(new RuntimeException("oops")); // failure
// Blocking get (avoid in production code — use thenAccept/join in pipelines)
String value = future.get(); // throws checked exceptions
String value2 = future.join(); // throws unchecked CompletionException
Chaining transforms the result of one stage into the next. Choose the right operator based on whether the next step is synchronous or itself async:
CompletableFuture<String> pipeline =
CompletableFuture.supplyAsync(() -> fetchUserId("alice")) // Long → Long
.thenApply(id -> fetchUserName(id)) // Long → String (sync)
.thenApply(String::toUpperCase); // String → String (sync)
// thenCompose — when the next step itself returns a CompletableFuture
// (avoids CompletableFuture<CompletableFuture<String>>)
CompletableFuture<Order> orderPipeline =
CompletableFuture.supplyAsync(() -> fetchUserId("alice"))
.thenCompose(id -> fetchUserAsync(id)) // returns CF<User>
.thenCompose(user -> fetchOrderAsync(user.id())); // returns CF<Order>
// thenAccept — consume the result (terminal, returns CF<Void>)
pipeline.thenAccept(name -> System.out.println("Hello, " + name));
// thenRun — execute a Runnable after completion (no access to result)
pipeline.thenRun(() -> System.out.println("Pipeline done"));
// Async variants run the function on a separate thread
// thenApplyAsync, thenComposeAsync, thenAcceptAsync
pipeline.thenApplyAsync(s -> heavyTransform(s)); // runs on commonPool
thenApply is like map (synchronous transform), thenCompose is like flatMap (async transform). Using thenApply when the function returns a CompletableFuture gives you a nested CompletableFuture<CompletableFuture<T>> — use thenCompose instead.
When you need results from multiple independent futures, three combinators cover the common cases. thenCombine waits for exactly two futures to complete and merges their results using a BiFunction — both run concurrently and the function runs only when both are done. CompletableFuture.allOf() accepts any number of futures and returns a CompletableFuture<Void> that completes when every input is done; because the return type is Void, you must call join() on each original future to retrieve their individual results. CompletableFuture.anyOf() completes as soon as the first future finishes, returning its result as a CompletableFuture<Object>; the remaining futures continue running but their results are ignored. anyOf is the hedged-request pattern: fan out to multiple services and use whichever responds first.
// thenCombine — combine the results of two independent futures
CompletableFuture<String> nameF = CompletableFuture.supplyAsync(() -> fetchName(1));
CompletableFuture<Integer> scoreF = CompletableFuture.supplyAsync(() -> fetchScore(1));
CompletableFuture<String> combined = nameF.thenCombine(scoreF,
(name, score) -> name + " scored " + score);
System.out.println(combined.join()); // e.g., "Alice scored 95"
// allOf — wait for ALL futures to complete (returns CF<Void>)
List<CompletableFuture<String>> futures = List.of(
CompletableFuture.supplyAsync(() -> fetchPage("A")),
CompletableFuture.supplyAsync(() -> fetchPage("B")),
CompletableFuture.supplyAsync(() -> fetchPage("C"))
);
CompletableFuture<Void> all = CompletableFuture.allOf(
futures.toArray(CompletableFuture[]::new));
// Collect all results once they're done
CompletableFuture<List<String>> allResults = all.thenApply(v ->
futures.stream().map(CompletableFuture::join).toList());
List<String> pages = allResults.join();
// anyOf — complete as soon as ANY future finishes (returns CF<Object>)
CompletableFuture<Object> fastest = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> callServer1()),
CompletableFuture.supplyAsync(() -> callServer2()),
CompletableFuture.supplyAsync(() -> callServer3())
);
System.out.println("Fastest result: " + fastest.join());
Exceptions in async pipelines are wrapped in CompletionException. Use exceptionally, handle, or whenComplete to intercept and recover:
// exceptionally — provide a fallback value when an exception occurs
CompletableFuture<String> safe = CompletableFuture
.supplyAsync(() -> riskyFetch()) // may throw
.exceptionally(ex -> {
log.warn("Fetch failed: " + ex.getMessage());
return "default"; // fallback
});
// handle — called for both success AND failure (like finally with a value)
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> riskyFetch())
.handle((value, ex) -> {
if (ex != null) return "fallback for " + ex.getMessage();
return value.toUpperCase();
});
// whenComplete — observe result/exception without transforming
CompletableFuture<String> logged = CompletableFuture
.supplyAsync(() -> riskyFetch())
.whenComplete((value, ex) -> {
if (ex != null) log.error("Failed", ex);
else log.info("Success: " + value);
}); // passes through value or exception unchanged
// Wrapping exceptions properly
CompletableFuture<String> future = new CompletableFuture<>();
executor.submit(() -> {
try {
future.complete(compute());
} catch (Exception e) {
future.completeExceptionally(e); // propagate to pipeline
}
});
If you call join() or get() on a failed future without an exception handler earlier in the chain, the exception is re-thrown wrapped in CompletionException (for join) or ExecutionException (for get). Always handle exceptions in the pipeline rather than at the terminal call.
By default, async stages run on ForkJoinPool.commonPool(). Provide a custom Executor for I/O-bound tasks to avoid starving the common pool:
// Dedicated executor for I/O-bound tasks
ExecutorService ioPool = Executors.newVirtualThreadPerTaskExecutor(); // Java 21
// or: Executors.newFixedThreadPool(20) for older Java
// Pass executor to *Async variants
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> fetchFromDatabase(), ioPool)
.thenApplyAsync(data -> transform(data), ioPool)
.thenAcceptAsync(result -> sendResponse(result), ioPool);
// Timeout (Java 9+)
CompletableFuture<String> withTimeout = CompletableFuture
.supplyAsync(() -> slowFetch())
.orTimeout(5, TimeUnit.SECONDS); // completes exceptionally after 5s
// Completes with default value instead of exception on timeout
CompletableFuture<String> withDefault = CompletableFuture
.supplyAsync(() -> slowFetch())
.completeOnTimeout("default", 5, TimeUnit.SECONDS);
// Parallel fan-out pattern
record UserProfile(User user, List<Order> orders, List<Review> reviews) {}
CompletableFuture<UserProfile> profile(long userId) {
var userF = CompletableFuture.supplyAsync(() -> fetchUser(userId), ioPool);
var ordersF = CompletableFuture.supplyAsync(() -> fetchOrders(userId), ioPool);
var reviewsF = CompletableFuture.supplyAsync(() -> fetchReviews(userId), ioPool);
return CompletableFuture.allOf(userF, ordersF, reviewsF)
.thenApply(v -> new UserProfile(
userF.join(), ordersF.join(), reviewsF.join()));
}
With Java 21 virtual threads, use Executors.newVirtualThreadPerTaskExecutor() for I/O-bound CompletableFuture pipelines. Each task gets its own lightweight virtual thread, eliminating thread-pool sizing concerns.