Contents
- Basic Splitter
- Parallel Splitter
- Aggregating Split Results
- Streaming Split (Large Payloads)
- Split Index & Size Headers
- Scatter-Gather with recipientList
- Multicast — Same Message, Multiple Targets
- Error Handling in Splits
Split a list body or a tokenised string into individual messages:
@Component
public class OrderSplitRoute extends RouteBuilder {
@Override
public void configure() {
// Split a List<OrderLine> body into individual OrderLine messages
from("direct:orderWithLines")
.split(body()) // iterates over List, array, or Iterator
.to("direct:processOrderLine");
// Split a CSV line by comma
from("direct:csvLine")
.split(body().tokenize(","))
.trim()
.to("direct:processToken");
// Split an XML document — one message per - element
from("direct:xmlBatch")
.split().xpath("//item")
.to("direct:processItem");
}
}
Process split sub-messages in parallel using a thread pool. The route waits for all sub-messages to finish before continuing (default) unless you set streaming(true).
from("direct:bulkOrder")
.split(body())
.parallelProcessing() // process each item concurrently
.executorService(Executors.newFixedThreadPool(8))
.timeout(10_000) // cancel remaining if any takes > 10s
.stopOnException() // stop all on first failure
.to("direct:enrichOrderLine")
.end() // ← marks end of the split block; route continues here after all sub-msgs complete
.log("All order lines processed");
Combine all sub-message results back into one aggregated exchange using an AggregationStrategy:
// AggregationStrategy to collect all enriched lines back into a list
AggregationStrategy collectLines = (oldExchange, newExchange) -> {
List<OrderLine> lines = oldExchange == null
? new ArrayList<>()
: oldExchange.getIn().getBody(List.class);
lines.add(newExchange.getIn().getBody(OrderLine.class));
Exchange result = oldExchange != null ? oldExchange : newExchange;
result.getIn().setBody(lines);
return result;
};
from("direct:bulkOrder")
.split(body(), collectLines) // pass strategy as second arg
.parallelProcessing()
.to("direct:enrichOrderLine")
.end()
// Body is now List<OrderLine> — all enriched results
.to("direct:saveEnrichedBatch");
For very large files or result sets, streaming(true) processes sub-messages one at a time without loading the entire split into memory first. Essential for CSV files with millions of rows.
from("file:inbox?noop=true")
.split(body().tokenize("\n"))
.streaming() // read one line at a time — constant memory usage
.filter(body().isNotNull().and(body().isNotEqualTo("")))
.unmarshal().csv()
.to("direct:processRow")
.end()
.log("File processing complete");
When using streaming(true) you cannot use parallelProcessing() and an aggregating strategy together — streaming does not buffer all split results. For parallel streaming, process each sub-message independently and aggregate externally.
Send the same message to multiple recipients in parallel and aggregate all their responses. The recipient list is dynamic — determined at runtime from a header or bean.
from("direct:priceRequest")
// recipientList fans out to all suppliers listed in the header
.recipientList(header("supplierEndpoints")) // e.g., "direct:supplier1,direct:supplier2,direct:supplier3"
.parallelProcessing()
.timeout(5_000) // don't wait more than 5s per supplier
.aggregationStrategy(new BestPriceAggregator())
.ignoreInvalidEndpoints()
.end()
// Body is now the best price found across all suppliers
.to("direct:confirmBestPrice");
// BestPriceAggregator — keep the lowest price quote
public class BestPriceAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) return newExchange;
BigDecimal oldPrice = oldExchange.getIn().getBody(PriceQuote.class).getPrice();
BigDecimal newPrice = newExchange.getIn().getBody(PriceQuote.class).getPrice();
return newPrice.compareTo(oldPrice) < 0 ? newExchange : oldExchange;
}
}
Unlike recipientList (dynamic targets), multicast sends to a fixed set of endpoints defined at route-build time:
from("direct:orderCreated")
.multicast()
.parallelProcessing()
.aggregationStrategy(AggregationStrategies.useOriginal())
.to("direct:sendConfirmationEmail",
"direct:updateInventory",
"direct:notifyWarehouse")
.end()
.log("Order ${header.orderId} processed by all systems");
The class below shows the implementation. Key points are highlighted in the inline comments.
from("direct:bulkOrder")
.split(body())
.parallelProcessing()
.stopOnException() // stop remaining sub-messages on first exception
// OR use .continueOnException() to process all and collect failures
.doTry()
.to("direct:processOrderLine")
.doCatch(Exception.class)
.log(LoggingLevel.ERROR, "Failed to process line: ${exception.message}")
.to("direct:deadLetter")
.endDoTry()
.end();