Contents

The Fork/Join Framework splits a large task into smaller subtasks recursively, executes them in parallel, and merges results. The key innovation is work stealing: idle threads steal tasks from the tail of busy threads' deques, keeping all cores saturated with minimal synchronization overhead.

The two base classes are:

Fork/Join works best when subtasks are independent (no shared mutable state) and there is enough work to justify the splitting overhead. A threshold of roughly 1000–10 000 units of work per leaf task is a common starting point.

Classic example: parallel array sum. Below the threshold the task computes directly; above it, the range is halved, both halves forked, then joined.

import java.util.concurrent.RecursiveTask; public class ParallelSum extends RecursiveTask<Long> { private static final int THRESHOLD = 5_000; private final long[] data; private final int lo, hi; public ParallelSum(long[] data, int lo, int hi) { this.data = data; this.lo = lo; this.hi = hi; } @Override protected Long compute() { int size = hi - lo; if (size <= THRESHOLD) { // Base case — sequential sum long sum = 0; for (int i = lo; i < hi; i++) sum += data[i]; return sum; } // Recursive case — split in half int mid = lo + size / 2; ParallelSum left = new ParallelSum(data, lo, mid); ParallelSum right = new ParallelSum(data, mid, hi); left.fork(); // submit left to pool asynchronously long rightResult = right.compute(); // compute right in current thread long leftResult = left.join(); // wait for left return leftResult + rightResult; } } Always fork() one subtask and compute() the other directly in the current thread. Forking both and joining both wastes a thread context. import java.util.concurrent.ForkJoinPool; long[] data = new long[1_000_000]; // … fill data … ForkJoinPool pool = ForkJoinPool.commonPool(); long total = pool.invoke(new ParallelSum(data, 0, data.length)); System.out.println("Sum: " + total);

RecursiveAction is used when the parallel work mutates an array in-place (sorting, normalizing, etc.) rather than returning a value.

import java.util.concurrent.RecursiveAction; public class ParallelNormalizer extends RecursiveAction { private static final int THRESHOLD = 2_000; private final double[] data; private final int lo, hi; private final double max; public ParallelNormalizer(double[] data, int lo, int hi, double max) { this.data = data; this.lo = lo; this.hi = hi; this.max = max; } @Override protected void compute() { if (hi - lo <= THRESHOLD) { for (int i = lo; i < hi; i++) data[i] /= max; return; } int mid = lo + (hi - lo) / 2; invokeAll( new ParallelNormalizer(data, lo, mid, max), new ParallelNormalizer(data, mid, hi, max) ); } }

invokeAll() is a convenience that forks all tasks and joins them — equivalent to fork + compute + join but less error-prone when splitting into more than two subtasks.

You can use the shared common pool or create a dedicated pool with a specific parallelism level.

// Shared common pool — parallelism = Runtime.getRuntime().availableProcessors() - 1 ForkJoinPool common = ForkJoinPool.commonPool(); System.out.println("Parallelism: " + common.getParallelism()); // Dedicated pool — useful for isolating latency-sensitive workloads ForkJoinPool custom = new ForkJoinPool( 4, // parallelism ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, // uncaught exception handler false // asyncMode (LIFO vs FIFO) ); try { Long result = custom.invoke(new ParallelSum(data, 0, data.length)); System.out.println("Result: " + result); } finally { custom.shutdown(); } // Three submission methods: // pool.invoke(task) — blocks until done, returns result // pool.submit(task) — returns Future<T>, non-blocking // pool.execute(task) — fire-and-forget (RecursiveAction) Do not use the common pool for tasks that block on I/O. Blocking workers reduces available parallelism for all other users of the common pool (including parallel streams). Use a dedicated pool or ManagedBlocker instead.

Two idiomatic patterns for splitting work:

// Pattern 1 — fork/compute/join (most efficient for 2 subtasks) left.fork(); long r2 = right.compute(); long r1 = left.join(); return r1 + r2; // Pattern 2 — invokeAll (cleaner for N subtasks) List<ParallelSum> subtasks = new ArrayList<>(); for (int[] range : splitRanges(lo, hi, 4)) { subtasks.add(new ParallelSum(data, range[0], range[1])); } invokeAll(subtasks); // forks all, then joins all return subtasks.stream().mapToLong(RecursiveTask::join).sum();

The threshold controls the granularity of parallelism. Too small → massive overhead from task creation. Too large → poor core utilization.

// Rule of thumb: aim for leaf tasks that take ~100µs–1ms each // For CPU-bound work on an 8-core machine processing 1M elements: // 1_000_000 / 8 = 125_000 elements/core at minimum // A threshold of 10_000–50_000 usually works well // Adaptive threshold based on available parallelism: int threshold = Math.max(1000, data.length / (ForkJoinPool.commonPool().getParallelism() * 4)); Profile first. Use JFR (Java Flight Recorder) or JMH microbenchmarks to measure actual speedup. Fork/Join adds overhead — for small datasets sequential code is faster.

Parallel streams and Arrays.parallelSort both use the common ForkJoinPool. You can submit your own tasks to the common pool while a parallel stream is running — they share the same work-stealing queue.

// Parallel stream internally uses ForkJoinPool.commonPool() long sum = LongStream.rangeClosed(1, 1_000_000) .parallel() .sum(); // Running a parallel stream inside a custom ForkJoinPool scopes it to that pool ForkJoinPool custom = new ForkJoinPool(2); // limit to 2 threads long result = custom.submit(() -> LongStream.rangeClosed(1, 1_000_000).parallel().sum() ).get(); custom.shutdown(); Running a parallel stream inside a custom ForkJoinPool is an undocumented but widely-used trick to control parallelism per operation. It works reliably in Java 8–21.
// ❌ WRONG — forking both subtasks and joining both left.fork(); right.fork(); return left.join() + right.join(); // right.join() runs on current thread doing nothing useful // ✅ CORRECT — fork one, compute the other in current thread left.fork(); long r = right.compute(); return left.join() + r; // ❌ WRONG — blocking inside a task (starves the pool) @Override protected Long compute() { Thread.sleep(1000); // blocks a ForkJoin worker thread! return 42L; } // ✅ CORRECT — use ManagedBlocker for blocking operations ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() { volatile boolean done = false; @Override public boolean block() throws InterruptedException { Thread.sleep(1000); done = true; return true; } @Override public boolean isReleasable() { return done; } }); // ❌ WRONG — shared mutable state across subtasks (data race) long[] shared = new long[1]; // shared across tasks @Override protected Long compute() { shared[0] += localSum; return shared[0]; } // ✅ CORRECT — each task returns its own value; combine in parent Fork/Join tasks should never perform blocking I/O, acquire locks, or share mutable state. They are designed for pure CPU-bound, independent divide-and-conquer work.