Contents
- Core Concepts & Work Stealing
- RecursiveTask — Returning a Value
- RecursiveAction — Side-Effect Tasks
- ForkJoinPool — Configuration & Submission
- Fork/Join vs invoke Pattern
- Choosing the Right Threshold
- Common Pool & Parallel Streams
- Pitfalls & Best Practices
The Fork/Join Framework splits a large task into smaller subtasks recursively, executes them in parallel, and merges results. The key innovation is work stealing: idle threads steal tasks from the tail of busy threads' deques, keeping all cores saturated with minimal synchronization overhead.
The two base classes are:
- RecursiveTask<V> — computes and returns a result
- RecursiveAction — computes without returning a value (side effects)
Fork/Join works best when subtasks are independent (no shared mutable state) and there is enough work to justify the splitting overhead. A threshold of roughly 1000–10 000 units of work per leaf task is a common starting point.
Classic example: parallel array sum. Below the threshold the task computes directly; above it, the range is halved, both halves forked, then joined.
import java.util.concurrent.RecursiveTask;
public class ParallelSum extends RecursiveTask<Long> {
private static final int THRESHOLD = 5_000;
private final long[] data;
private final int lo, hi;
public ParallelSum(long[] data, int lo, int hi) {
this.data = data;
this.lo = lo;
this.hi = hi;
}
@Override
protected Long compute() {
int size = hi - lo;
if (size <= THRESHOLD) {
// Base case — sequential sum
long sum = 0;
for (int i = lo; i < hi; i++) sum += data[i];
return sum;
}
// Recursive case — split in half
int mid = lo + size / 2;
ParallelSum left = new ParallelSum(data, lo, mid);
ParallelSum right = new ParallelSum(data, mid, hi);
left.fork(); // submit left to pool asynchronously
long rightResult = right.compute(); // compute right in current thread
long leftResult = left.join(); // wait for left
return leftResult + rightResult;
}
}
Always fork() one subtask and compute() the other directly in the current thread. Forking both and joining both wastes a thread context.
import java.util.concurrent.ForkJoinPool;
long[] data = new long[1_000_000];
// … fill data …
ForkJoinPool pool = ForkJoinPool.commonPool();
long total = pool.invoke(new ParallelSum(data, 0, data.length));
System.out.println("Sum: " + total);
RecursiveAction is used when the parallel work mutates an array in-place (sorting, normalizing, etc.) rather than returning a value.
import java.util.concurrent.RecursiveAction;
public class ParallelNormalizer extends RecursiveAction {
private static final int THRESHOLD = 2_000;
private final double[] data;
private final int lo, hi;
private final double max;
public ParallelNormalizer(double[] data, int lo, int hi, double max) {
this.data = data; this.lo = lo; this.hi = hi; this.max = max;
}
@Override
protected void compute() {
if (hi - lo <= THRESHOLD) {
for (int i = lo; i < hi; i++) data[i] /= max;
return;
}
int mid = lo + (hi - lo) / 2;
invokeAll(
new ParallelNormalizer(data, lo, mid, max),
new ParallelNormalizer(data, mid, hi, max)
);
}
}
invokeAll() is a convenience that forks all tasks and joins them — equivalent to fork + compute + join but less error-prone when splitting into more than two subtasks.
You can use the shared common pool or create a dedicated pool with a specific parallelism level.
// Shared common pool — parallelism = Runtime.getRuntime().availableProcessors() - 1
ForkJoinPool common = ForkJoinPool.commonPool();
System.out.println("Parallelism: " + common.getParallelism());
// Dedicated pool — useful for isolating latency-sensitive workloads
ForkJoinPool custom = new ForkJoinPool(
4, // parallelism
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, // uncaught exception handler
false // asyncMode (LIFO vs FIFO)
);
try {
Long result = custom.invoke(new ParallelSum(data, 0, data.length));
System.out.println("Result: " + result);
} finally {
custom.shutdown();
}
// Three submission methods:
// pool.invoke(task) — blocks until done, returns result
// pool.submit(task) — returns Future<T>, non-blocking
// pool.execute(task) — fire-and-forget (RecursiveAction)
Do not use the common pool for tasks that block on I/O. Blocking workers reduces available parallelism for all other users of the common pool (including parallel streams). Use a dedicated pool or ManagedBlocker instead.
Two idiomatic patterns for splitting work:
// Pattern 1 — fork/compute/join (most efficient for 2 subtasks)
left.fork();
long r2 = right.compute();
long r1 = left.join();
return r1 + r2;
// Pattern 2 — invokeAll (cleaner for N subtasks)
List<ParallelSum> subtasks = new ArrayList<>();
for (int[] range : splitRanges(lo, hi, 4)) {
subtasks.add(new ParallelSum(data, range[0], range[1]));
}
invokeAll(subtasks); // forks all, then joins all
return subtasks.stream().mapToLong(RecursiveTask::join).sum();
The threshold controls the granularity of parallelism. Too small → massive overhead from task creation. Too large → poor core utilization.
// Rule of thumb: aim for leaf tasks that take ~100µs–1ms each
// For CPU-bound work on an 8-core machine processing 1M elements:
// 1_000_000 / 8 = 125_000 elements/core at minimum
// A threshold of 10_000–50_000 usually works well
// Adaptive threshold based on available parallelism:
int threshold = Math.max(1000,
data.length / (ForkJoinPool.commonPool().getParallelism() * 4));
Profile first. Use JFR (Java Flight Recorder) or JMH microbenchmarks to measure actual speedup. Fork/Join adds overhead — for small datasets sequential code is faster.
Parallel streams and Arrays.parallelSort both use the common ForkJoinPool. You can submit your own tasks to the common pool while a parallel stream is running — they share the same work-stealing queue.
// Parallel stream internally uses ForkJoinPool.commonPool()
long sum = LongStream.rangeClosed(1, 1_000_000)
.parallel()
.sum();
// Running a parallel stream inside a custom ForkJoinPool scopes it to that pool
ForkJoinPool custom = new ForkJoinPool(2); // limit to 2 threads
long result = custom.submit(() ->
LongStream.rangeClosed(1, 1_000_000).parallel().sum()
).get();
custom.shutdown();
Running a parallel stream inside a custom ForkJoinPool is an undocumented but widely-used trick to control parallelism per operation. It works reliably in Java 8–21.
// ❌ WRONG — forking both subtasks and joining both
left.fork();
right.fork();
return left.join() + right.join(); // right.join() runs on current thread doing nothing useful
// ✅ CORRECT — fork one, compute the other in current thread
left.fork();
long r = right.compute();
return left.join() + r;
// ❌ WRONG — blocking inside a task (starves the pool)
@Override
protected Long compute() {
Thread.sleep(1000); // blocks a ForkJoin worker thread!
return 42L;
}
// ✅ CORRECT — use ManagedBlocker for blocking operations
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
volatile boolean done = false;
@Override public boolean block() throws InterruptedException {
Thread.sleep(1000);
done = true;
return true;
}
@Override public boolean isReleasable() { return done; }
});
// ❌ WRONG — shared mutable state across subtasks (data race)
long[] shared = new long[1]; // shared across tasks
@Override protected Long compute() { shared[0] += localSum; return shared[0]; }
// ✅ CORRECT — each task returns its own value; combine in parent
Fork/Join tasks should never perform blocking I/O, acquire locks, or share mutable state. They are designed for pure CPU-bound, independent divide-and-conquer work.