Contents

The Content-Based Router inspects the message and routes it to different endpoints based on its content. In Camel this is the choice() / when() / otherwise() construct.

import org.apache.camel.builder.RouteBuilder; import org.springframework.stereotype.Component; @Component public class OrderRouter extends RouteBuilder { @Override public void configure() { from("direct:orders") .choice() .when(jsonpath("$.type").isEqualTo("ELECTRONICS")) .to("direct:electronics") .when(jsonpath("$.type").isEqualTo("GROCERY")) .to("direct:grocery") .when(simple("${header.priority} == 'HIGH'")) .to("direct:priority-processing") .otherwise() .to("direct:general-processing") .end() .log("Order routed: ${header.orderId}"); } }

Each when() clause accepts a Predicate. You can use simple(), jsonpath(), xpath(), or any custom predicate. The first matching when() wins — subsequent conditions are not evaluated.

Always include an otherwise() clause to handle messages that don't match any condition. Without it, unmatched messages are silently dropped from the route.

The Splitter breaks a single message into multiple parts and processes each part independently. A common use case is splitting a batch payload (JSON array, XML list, CSV lines) into individual items.

@Component public class BatchOrderSplitter extends RouteBuilder { @Override public void configure() { from("direct:batch-orders") // Split a JSON array into individual order objects .split(jsonpath("$.orders[*]")) .streaming() // process one item at a time (low memory) .log("Processing order: ${body}") .to("direct:process-single-order") .end() .log("Batch complete"); } }

By default, split items are processed sequentially. Add .parallelProcessing() to process them concurrently on a thread pool.

from("direct:batch-orders") .split(jsonpath("$.orders[*]")) .streaming() .parallelProcessing() // concurrent processing .executorService(myThreadPool) // optional: custom thread pool .to("direct:process-single-order") .end(); With .parallelProcessing(), exceptions from one split item do not stop the others by default. Use .stopOnException() if you need fail-fast behaviour.

The Aggregator collects related messages over time and combines them into a single message. You need three things: a correlation expression (which messages belong together), an AggregationStrategy (how to combine them), and a completion condition (when the group is done).

import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; public class OrderAggregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { // First message in the group — wrap the body in a list List<String> orders = new ArrayList<>(); orders.add(newExchange.getIn().getBody(String.class)); newExchange.getIn().setBody(orders); return newExchange; } // Subsequent messages — add to the existing list List<String> orders = oldExchange.getIn().getBody(List.class); orders.add(newExchange.getIn().getBody(String.class)); return oldExchange; } } @Component public class OrderAggregatorRoute extends RouteBuilder { @Override public void configure() { from("direct:incoming-orders") .aggregate(header("customerId"), new OrderAggregationStrategy()) .completionSize(10) // complete after 10 messages .completionTimeout(5000) // or after 5 seconds of inactivity .completionInterval(30000) // or every 30 seconds regardless .log("Aggregated ${body.size()} orders for customer ${header.customerId}") .to("direct:process-batch"); } }

The correlation expression groups messages — here, all orders with the same customerId header are aggregated together. The group is flushed when any completion condition is met.

For production use, configure a persistent AggregationRepository (e.g., JDBC or Hazelcast) so in-flight groups survive application restarts. The default in-memory repository loses data on shutdown.

A common pattern is to split a message, process each fragment, then aggregate the results back into a single response. The Splitter has a built-in aggregation feature for this.

@Component public class EnrichmentRoute extends RouteBuilder { @Override public void configure() { from("direct:enrich-products") .split(jsonpath("$.products[*]"), new ProductAggregationStrategy()) .parallelProcessing() // Enrich each product with pricing info .setHeader("productId", jsonpath("$.id")) .enrich("direct:fetch-price", this::mergePrice) .end() // After .end(), the body contains the aggregated result .log("All products enriched: ${body}") .to("direct:send-response"); } private Exchange mergePrice(Exchange original, Exchange priceResponse) { // Merge the price data into the original product String product = original.getIn().getBody(String.class); String price = priceResponse.getIn().getBody(String.class); original.getIn().setBody(product + ", price=" + price); return original; } }

Pass the AggregationStrategy as the second argument to split(). Camel calls it once for each fragment and returns the final aggregated exchange after .end().

Multicast sends the same message to multiple endpoints simultaneously. Unlike choice(), which picks one destination, multicast fans out to all destinations.

@Component public class OrderMulticastRoute extends RouteBuilder { @Override public void configure() { from("direct:new-order") .multicast() .parallelProcessing() // send to all targets concurrently .to("direct:inventory-service", "direct:payment-service", "direct:notification-service") .end() .log("Order dispatched to all services"); } }

Each endpoint receives an independent copy of the exchange. Changes made by one endpoint do not affect the others.

// Multicast with aggregation — combine the results from all endpoints from("direct:new-order") .multicast(new OrderResultAggregationStrategy()) .parallelProcessing() .stopOnException() // fail if any target throws .timeout(3000) // timeout after 3 seconds .to("direct:inventory-service", "direct:payment-service", "direct:notification-service") .end() .log("Combined result: ${body}"); Without .stopOnException(), a failure in one multicast branch does not stop the others. If you need all-or-nothing semantics, combine .stopOnException() with a try/catch block or an error handler.

The Recipient List is like Multicast, but the list of destinations is determined at runtime from the message headers or body rather than being hardcoded in the route.

@Component public class DynamicRoutingRoute extends RouteBuilder { @Override public void configure() { // Destinations come from a comma-separated header from("direct:dispatch") .recipientList(header("destinations")) .delimiter(",") .parallelProcessing() .end(); } } // Set the header before calling the route from("direct:order-entry") .process(exchange -> { String type = exchange.getIn().getHeader("orderType", String.class); List<String> targets = new ArrayList<>(); targets.add("direct:audit-log"); // always audit if ("INTERNATIONAL".equals(type)) { targets.add("direct:customs-check"); targets.add("direct:forex-service"); } else { targets.add("direct:domestic-shipping"); } exchange.getIn().setHeader("destinations", String.join(",", targets)); }) .to("direct:dispatch"); The Recipient List supports the same options as Multicast: .parallelProcessing(), .stopOnException(), .timeout(), and an optional AggregationStrategy to combine results.

Wire Tap sends a copy of the message to a secondary endpoint without affecting the main route. It is fire-and-forget — the main route continues immediately without waiting for the tap.

@Component public class AuditWireTapRoute extends RouteBuilder { @Override public void configure() { from("direct:payments") .wireTap("direct:audit-log") // async copy to audit .to("direct:process-payment") // main flow continues immediately .log("Payment processed"); // Audit route runs independently from("direct:audit-log") .log("Audit: ${body}") .to("jpa:com.example.AuditEntry"); } }

You can also transform the tapped message before sending it, which is useful for stripping sensitive fields or enriching with metadata.

from("direct:payments") .wireTap("direct:audit-log") .newExchangeBody(simple("Audit: orderId=${header.orderId}, timestamp=${date:now}")) .end() .to("direct:process-payment"); Wire Tap is ideal for logging, monitoring, and analytics where the secondary processing must not slow down or disrupt the main message flow.

The Throttler limits the number of messages that pass through a route within a given time window. This is useful for protecting downstream systems from being overwhelmed by bursts of traffic.

@Component public class RateLimitRoute extends RouteBuilder { @Override public void configure() { from("seda:incoming-requests") .throttle(100) // max 100 messages per second .timePeriodMillis(1000) .to("direct:api-call") .log("Request sent within rate limit"); } }

The throttle rate can also be dynamic, driven by a header or expression.

from("seda:incoming-requests") .throttle(header("maxRate")) // rate from message header .timePeriodMillis(1000) .rejectExecution(true) // throw exception instead of queuing .to("direct:api-call");

By default, excess messages are delayed until the next time window opens. Set .rejectExecution(true) to throw a ThrottlerRejectExecutionException instead, which you can handle with an onException block.

Summary of patterns
PatternDSLUse case
Content-Based Routerchoice().when().otherwise()Route to different endpoints based on message content
Splittersplit(expression)Break a batch into individual items for processing
Aggregatoraggregate(correlation, strategy)Collect related messages into a single batch
Multicastmulticast().to(a, b, c)Send the same message to multiple fixed endpoints
Recipient ListrecipientList(expression)Send to dynamic endpoints determined at runtime
Wire TapwireTap(endpoint)Fire-and-forget copy for audit, logging, analytics
Throttlerthrottle(rate)Rate-limit messages to protect downstream systems