Contents
- Creating Futures
- Chaining: thenApply, thenCompose, thenAccept
- Combining Futures: thenCombine, allOf, anyOf
- Exception Handling
- Custom Executors and Async Variants
There are several ways to create a CompletableFuture: run a task asynchronously, supply a value, or complete it manually.
import java.util.concurrent.CompletableFuture;
// 1. Already-completed future (useful in tests / short-circuit paths)
CompletableFuture<String> done = CompletableFuture.completedFuture("hello");
// 2. Run a Runnable asynchronously (no return value)
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
System.out.println("Running in: " + Thread.currentThread().getName());
});
// 3. Supply a value asynchronously (Supplier<T>)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// Runs in ForkJoinPool.commonPool() by default
return fetchUserName(42);
});
// 4. Manually completing a future (acts as a promise)
CompletableFuture<String> promise = new CompletableFuture<>();
// On another thread or callback:
promise.complete("result"); // normal completion
promise.completeExceptionally(new RuntimeException("oops")); // failure
// Blocking get (avoid in production code — use thenAccept/join in pipelines)
String value = future.get(); // throws checked exceptions
String value2 = future.join(); // throws unchecked CompletionException
Chaining transforms the result of one stage into the next. Choose the right operator based on whether the next step is synchronous or itself async:
CompletableFuture<String> pipeline =
CompletableFuture.supplyAsync(() -> fetchUserId("alice")) // Long → Long
.thenApply(id -> fetchUserName(id)) // Long → String (sync)
.thenApply(String::toUpperCase); // String → String (sync)
// thenCompose — when the next step itself returns a CompletableFuture
// (avoids CompletableFuture<CompletableFuture<String>>)
CompletableFuture<Order> orderPipeline =
CompletableFuture.supplyAsync(() -> fetchUserId("alice"))
.thenCompose(id -> fetchUserAsync(id)) // returns CF<User>
.thenCompose(user -> fetchOrderAsync(user.id())); // returns CF<Order>
// thenAccept — consume the result (terminal, returns CF<Void>)
pipeline.thenAccept(name -> System.out.println("Hello, " + name));
// thenRun — execute a Runnable after completion (no access to result)
pipeline.thenRun(() -> System.out.println("Pipeline done"));
// Async variants run the function on a separate thread
// thenApplyAsync, thenComposeAsync, thenAcceptAsync
pipeline.thenApplyAsync(s -> heavyTransform(s)); // runs on commonPool
thenApply is like map (synchronous transform), thenCompose is like flatMap (async transform). Using thenApply when the function returns a CompletableFuture gives you a nested CompletableFuture<CompletableFuture<T>> — use thenCompose instead.
// thenCombine — combine the results of two independent futures
CompletableFuture<String> nameF = CompletableFuture.supplyAsync(() -> fetchName(1));
CompletableFuture<Integer> scoreF = CompletableFuture.supplyAsync(() -> fetchScore(1));
CompletableFuture<String> combined = nameF.thenCombine(scoreF,
(name, score) -> name + " scored " + score);
System.out.println(combined.join()); // e.g., "Alice scored 95"
// allOf — wait for ALL futures to complete (returns CF<Void>)
List<CompletableFuture<String>> futures = List.of(
CompletableFuture.supplyAsync(() -> fetchPage("A")),
CompletableFuture.supplyAsync(() -> fetchPage("B")),
CompletableFuture.supplyAsync(() -> fetchPage("C"))
);
CompletableFuture<Void> all = CompletableFuture.allOf(
futures.toArray(CompletableFuture[]::new));
// Collect all results once they're done
CompletableFuture<List<String>> allResults = all.thenApply(v ->
futures.stream().map(CompletableFuture::join).toList());
List<String> pages = allResults.join();
// anyOf — complete as soon as ANY future finishes (returns CF<Object>)
CompletableFuture<Object> fastest = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> callServer1()),
CompletableFuture.supplyAsync(() -> callServer2()),
CompletableFuture.supplyAsync(() -> callServer3())
);
System.out.println("Fastest result: " + fastest.join());
Exceptions in async pipelines are wrapped in CompletionException. Use exceptionally, handle, or whenComplete to intercept and recover:
// exceptionally — provide a fallback value when an exception occurs
CompletableFuture<String> safe = CompletableFuture
.supplyAsync(() -> riskyFetch()) // may throw
.exceptionally(ex -> {
log.warn("Fetch failed: " + ex.getMessage());
return "default"; // fallback
});
// handle — called for both success AND failure (like finally with a value)
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> riskyFetch())
.handle((value, ex) -> {
if (ex != null) return "fallback for " + ex.getMessage();
return value.toUpperCase();
});
// whenComplete — observe result/exception without transforming
CompletableFuture<String> logged = CompletableFuture
.supplyAsync(() -> riskyFetch())
.whenComplete((value, ex) -> {
if (ex != null) log.error("Failed", ex);
else log.info("Success: " + value);
}); // passes through value or exception unchanged
// Wrapping exceptions properly
CompletableFuture<String> future = new CompletableFuture<>();
executor.submit(() -> {
try {
future.complete(compute());
} catch (Exception e) {
future.completeExceptionally(e); // propagate to pipeline
}
});
If you call join() or get() on a failed future without an exception handler earlier in the chain, the exception is re-thrown wrapped in CompletionException (for join) or ExecutionException (for get). Always handle exceptions in the pipeline rather than at the terminal call.
By default, async stages run on ForkJoinPool.commonPool(). Provide a custom Executor for I/O-bound tasks to avoid starving the common pool:
// Dedicated executor for I/O-bound tasks
ExecutorService ioPool = Executors.newVirtualThreadPerTaskExecutor(); // Java 21
// or: Executors.newFixedThreadPool(20) for older Java
// Pass executor to *Async variants
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> fetchFromDatabase(), ioPool)
.thenApplyAsync(data -> transform(data), ioPool)
.thenAcceptAsync(result -> sendResponse(result), ioPool);
// Timeout (Java 9+)
CompletableFuture<String> withTimeout = CompletableFuture
.supplyAsync(() -> slowFetch())
.orTimeout(5, TimeUnit.SECONDS); // completes exceptionally after 5s
// Completes with default value instead of exception on timeout
CompletableFuture<String> withDefault = CompletableFuture
.supplyAsync(() -> slowFetch())
.completeOnTimeout("default", 5, TimeUnit.SECONDS);
// Parallel fan-out pattern
record UserProfile(User user, List<Order> orders, List<Review> reviews) {}
CompletableFuture<UserProfile> profile(long userId) {
var userF = CompletableFuture.supplyAsync(() -> fetchUser(userId), ioPool);
var ordersF = CompletableFuture.supplyAsync(() -> fetchOrders(userId), ioPool);
var reviewsF = CompletableFuture.supplyAsync(() -> fetchReviews(userId), ioPool);
return CompletableFuture.allOf(userF, ordersF, reviewsF)
.thenApply(v -> new UserProfile(
userF.join(), ordersF.join(), reviewsF.join()));
}
With Java 21 virtual threads, use Executors.newVirtualThreadPerTaskExecutor() for I/O-bound CompletableFuture pipelines. Each task gets its own lightweight virtual thread, eliminating thread-pool sizing concerns.