Contents

Split a list body or a tokenised string into individual messages:

@Component public class OrderSplitRoute extends RouteBuilder { @Override public void configure() { // Split a List<OrderLine> body into individual OrderLine messages from("direct:orderWithLines") .split(body()) // iterates over List, array, or Iterator .to("direct:processOrderLine"); // Split a CSV line by comma from("direct:csvLine") .split(body().tokenize(",")) .trim() .to("direct:processToken"); // Split an XML document — one message per element from("direct:xmlBatch") .split().xpath("//item") .to("direct:processItem"); } }

Process split sub-messages in parallel using a thread pool. The route waits for all sub-messages to finish before continuing (default) unless you set streaming(true).

from("direct:bulkOrder") .split(body()) .parallelProcessing() // process each item concurrently .executorService(Executors.newFixedThreadPool(8)) .timeout(10_000) // cancel remaining if any takes > 10s .stopOnException() // stop all on first failure .to("direct:enrichOrderLine") .end() // ← marks end of the split block; route continues here after all sub-msgs complete .log("All order lines processed");

Combine all sub-message results back into one aggregated exchange using an AggregationStrategy:

// AggregationStrategy to collect all enriched lines back into a list AggregationStrategy collectLines = (oldExchange, newExchange) -> { List<OrderLine> lines = oldExchange == null ? new ArrayList<>() : oldExchange.getIn().getBody(List.class); lines.add(newExchange.getIn().getBody(OrderLine.class)); Exchange result = oldExchange != null ? oldExchange : newExchange; result.getIn().setBody(lines); return result; }; from("direct:bulkOrder") .split(body(), collectLines) // pass strategy as second arg .parallelProcessing() .to("direct:enrichOrderLine") .end() // Body is now List<OrderLine> — all enriched results .to("direct:saveEnrichedBatch");

For very large files or result sets, streaming(true) processes sub-messages one at a time without loading the entire split into memory first. Essential for CSV files with millions of rows.

from("file:inbox?noop=true") .split(body().tokenize("\n")) .streaming() // read one line at a time — constant memory usage .filter(body().isNotNull().and(body().isNotEqualTo(""))) .unmarshal().csv() .to("direct:processRow") .end() .log("File processing complete"); When using streaming(true) you cannot use parallelProcessing() and an aggregating strategy together — streaming does not buffer all split results. For parallel streaming, process each sub-message independently and aggregate externally.

Camel automatically adds headers to each sub-message to identify its position in the split:

from("direct:orderLines") .split(body()) .process(exchange -> { int index = exchange.getProperty(ExchangePropertyKey.SPLIT_INDEX, Integer.class); int size = exchange.getProperty(ExchangePropertyKey.SPLIT_SIZE, Integer.class); boolean isLast = exchange.getProperty(ExchangePropertyKey.SPLIT_COMPLETE, Boolean.class); log.info("Processing item {} of {} (last={})", index + 1, size, isLast); }) .to("direct:processItem");

Send the same message to multiple recipients in parallel and aggregate all their responses. The recipient list is dynamic — determined at runtime from a header or bean.

from("direct:priceRequest") // recipientList fans out to all suppliers listed in the header .recipientList(header("supplierEndpoints")) // e.g., "direct:supplier1,direct:supplier2,direct:supplier3" .parallelProcessing() .timeout(5_000) // don't wait more than 5s per supplier .aggregationStrategy(new BestPriceAggregator()) .ignoreInvalidEndpoints() .end() // Body is now the best price found across all suppliers .to("direct:confirmBestPrice"); // BestPriceAggregator — keep the lowest price quote public class BestPriceAggregator implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) return newExchange; BigDecimal oldPrice = oldExchange.getIn().getBody(PriceQuote.class).getPrice(); BigDecimal newPrice = newExchange.getIn().getBody(PriceQuote.class).getPrice(); return newPrice.compareTo(oldPrice) < 0 ? newExchange : oldExchange; } }

Unlike recipientList (dynamic targets), multicast sends to a fixed set of endpoints defined at route-build time:

from("direct:orderCreated") .multicast() .parallelProcessing() .aggregationStrategy(AggregationStrategies.useOriginal()) .to("direct:sendConfirmationEmail", "direct:updateInventory", "direct:notifyWarehouse") .end() .log("Order ${header.orderId} processed by all systems");

The class below shows the implementation. Key points are highlighted in the inline comments.

from("direct:bulkOrder") .split(body()) .parallelProcessing() .stopOnException() // stop remaining sub-messages on first exception // OR use .continueOnException() to process all and collect failures .doTry() .to("direct:processOrderLine") .doCatch(Exception.class) .log(LoggingLevel.ERROR, "Failed to process line: ${exception.message}") .to("direct:deadLetter") .endDoTry() .end();