Contents
- Content-Based Router
- Splitter
- Aggregator
- Splitter + Aggregator Combined
- Multicast
- Recipient List
- Wire Tap
- Throttler
The Content-Based Router inspects the message and routes it to different endpoints based on its content.
In Camel this is the choice() / when() / otherwise() construct.
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class OrderRouter extends RouteBuilder {
@Override
public void configure() {
from("direct:orders")
.choice()
.when(jsonpath("$.type").isEqualTo("ELECTRONICS"))
.to("direct:electronics")
.when(jsonpath("$.type").isEqualTo("GROCERY"))
.to("direct:grocery")
.when(simple("${header.priority} == 'HIGH'"))
.to("direct:priority-processing")
.otherwise()
.to("direct:general-processing")
.end()
.log("Order routed: ${header.orderId}");
}
}
Each when() clause accepts a Predicate. You can use simple(),
jsonpath(), xpath(), or any custom predicate.
The first matching when() wins — subsequent conditions are not evaluated.
Always include an otherwise() clause to handle messages that don't match any condition. Without it, unmatched messages are silently dropped from the route.
The Splitter breaks a single message into multiple parts and processes each part independently.
A common use case is splitting a batch payload (JSON array, XML list, CSV lines) into individual items.
@Component
public class BatchOrderSplitter extends RouteBuilder {
@Override
public void configure() {
from("direct:batch-orders")
// Split a JSON array into individual order objects
.split(jsonpath("$.orders[*]"))
.streaming() // process one item at a time (low memory)
.log("Processing order: ${body}")
.to("direct:process-single-order")
.end()
.log("Batch complete");
}
}
By default, split items are processed sequentially. Add .parallelProcessing()
to process them concurrently on a thread pool.
from("direct:batch-orders")
.split(jsonpath("$.orders[*]"))
.streaming()
.parallelProcessing() // concurrent processing
.executorService(myThreadPool) // optional: custom thread pool
.to("direct:process-single-order")
.end();
With .parallelProcessing(), exceptions from one split item do not stop the others by default. Use .stopOnException() if you need fail-fast behaviour.
The Aggregator collects related messages over time and combines them into a single message.
You need three things: a correlation expression (which messages belong together),
an AggregationStrategy (how to combine them), and a completion condition
(when the group is done).
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
public class OrderAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
// First message in the group — wrap the body in a list
List<String> orders = new ArrayList<>();
orders.add(newExchange.getIn().getBody(String.class));
newExchange.getIn().setBody(orders);
return newExchange;
}
// Subsequent messages — add to the existing list
List<String> orders = oldExchange.getIn().getBody(List.class);
orders.add(newExchange.getIn().getBody(String.class));
return oldExchange;
}
}
@Component
public class OrderAggregatorRoute extends RouteBuilder {
@Override
public void configure() {
from("direct:incoming-orders")
.aggregate(header("customerId"), new OrderAggregationStrategy())
.completionSize(10) // complete after 10 messages
.completionTimeout(5000) // or after 5 seconds of inactivity
.completionInterval(30000) // or every 30 seconds regardless
.log("Aggregated ${body.size()} orders for customer ${header.customerId}")
.to("direct:process-batch");
}
}
The correlation expression groups messages — here, all orders with the same customerId header
are aggregated together. The group is flushed when any completion condition is met.
For production use, configure a persistent AggregationRepository (e.g., JDBC or Hazelcast) so in-flight groups survive application restarts. The default in-memory repository loses data on shutdown.
A common pattern is to split a message, process each fragment, then aggregate the results
back into a single response. The Splitter has a built-in aggregation feature for this.
@Component
public class EnrichmentRoute extends RouteBuilder {
@Override
public void configure() {
from("direct:enrich-products")
.split(jsonpath("$.products[*]"), new ProductAggregationStrategy())
.parallelProcessing()
// Enrich each product with pricing info
.setHeader("productId", jsonpath("$.id"))
.enrich("direct:fetch-price", this::mergePrice)
.end()
// After .end(), the body contains the aggregated result
.log("All products enriched: ${body}")
.to("direct:send-response");
}
private Exchange mergePrice(Exchange original, Exchange priceResponse) {
// Merge the price data into the original product
String product = original.getIn().getBody(String.class);
String price = priceResponse.getIn().getBody(String.class);
original.getIn().setBody(product + ", price=" + price);
return original;
}
}
Pass the AggregationStrategy as the second argument to split().
Camel calls it once for each fragment and returns the final aggregated exchange after .end().
Multicast sends the same message to multiple endpoints simultaneously.
Unlike choice(), which picks one destination, multicast fans out to all destinations.
@Component
public class OrderMulticastRoute extends RouteBuilder {
@Override
public void configure() {
from("direct:new-order")
.multicast()
.parallelProcessing() // send to all targets concurrently
.to("direct:inventory-service",
"direct:payment-service",
"direct:notification-service")
.end()
.log("Order dispatched to all services");
}
}
Each endpoint receives an independent copy of the exchange. Changes made by one
endpoint do not affect the others.
// Multicast with aggregation — combine the results from all endpoints
from("direct:new-order")
.multicast(new OrderResultAggregationStrategy())
.parallelProcessing()
.stopOnException() // fail if any target throws
.timeout(3000) // timeout after 3 seconds
.to("direct:inventory-service",
"direct:payment-service",
"direct:notification-service")
.end()
.log("Combined result: ${body}");
Without .stopOnException(), a failure in one multicast branch does not stop the others. If you need all-or-nothing semantics, combine .stopOnException() with a try/catch block or an error handler.
The Recipient List is like Multicast, but the list of destinations is determined at runtime
from the message headers or body rather than being hardcoded in the route.
@Component
public class DynamicRoutingRoute extends RouteBuilder {
@Override
public void configure() {
// Destinations come from a comma-separated header
from("direct:dispatch")
.recipientList(header("destinations"))
.delimiter(",")
.parallelProcessing()
.end();
}
}
// Set the header before calling the route
from("direct:order-entry")
.process(exchange -> {
String type = exchange.getIn().getHeader("orderType", String.class);
List<String> targets = new ArrayList<>();
targets.add("direct:audit-log"); // always audit
if ("INTERNATIONAL".equals(type)) {
targets.add("direct:customs-check");
targets.add("direct:forex-service");
} else {
targets.add("direct:domestic-shipping");
}
exchange.getIn().setHeader("destinations", String.join(",", targets));
})
.to("direct:dispatch");
The Recipient List supports the same options as Multicast: .parallelProcessing(), .stopOnException(), .timeout(), and an optional AggregationStrategy to combine results.
Wire Tap sends a copy of the message to a secondary endpoint without affecting
the main route. It is fire-and-forget — the main route continues immediately without waiting for the tap.
@Component
public class AuditWireTapRoute extends RouteBuilder {
@Override
public void configure() {
from("direct:payments")
.wireTap("direct:audit-log") // async copy to audit
.to("direct:process-payment") // main flow continues immediately
.log("Payment processed");
// Audit route runs independently
from("direct:audit-log")
.log("Audit: ${body}")
.to("jpa:com.example.AuditEntry");
}
}
You can also transform the tapped message before sending it, which is useful for stripping
sensitive fields or enriching with metadata.
from("direct:payments")
.wireTap("direct:audit-log")
.newExchangeBody(simple("Audit: orderId=${header.orderId}, timestamp=${date:now}"))
.end()
.to("direct:process-payment");
Wire Tap is ideal for logging, monitoring, and analytics where the secondary processing must not slow down or disrupt the main message flow.
The Throttler limits the number of messages that pass through a route within a given time window.
This is useful for protecting downstream systems from being overwhelmed by bursts of traffic.
@Component
public class RateLimitRoute extends RouteBuilder {
@Override
public void configure() {
from("seda:incoming-requests")
.throttle(100) // max 100 messages per second
.timePeriodMillis(1000)
.to("direct:api-call")
.log("Request sent within rate limit");
}
}
The throttle rate can also be dynamic, driven by a header or expression.
from("seda:incoming-requests")
.throttle(header("maxRate")) // rate from message header
.timePeriodMillis(1000)
.rejectExecution(true) // throw exception instead of queuing
.to("direct:api-call");
By default, excess messages are delayed until the next time window opens. Set
.rejectExecution(true) to throw a ThrottlerRejectExecutionException instead,
which you can handle with an onException block.
Summary of patterns
| Pattern | DSL | Use case |
| Content-Based Router | choice().when().otherwise() | Route to different endpoints based on message content |
| Splitter | split(expression) | Break a batch into individual items for processing |
| Aggregator | aggregate(correlation, strategy) | Collect related messages into a single batch |
| Multicast | multicast().to(a, b, c) | Send the same message to multiple fixed endpoints |
| Recipient List | recipientList(expression) | Send to dynamic endpoints determined at runtime |
| Wire Tap | wireTap(endpoint) | Fire-and-forget copy for audit, logging, analytics |
| Throttler | throttle(rate) | Rate-limit messages to protect downstream systems |