Contents

The aggregate() DSL takes two mandatory arguments: a correlation expression (how to group messages) and an AggregationStrategy (how to combine them). A completion condition determines when the group is released downstream.

@Component public class OrderBatchRoute extends RouteBuilder { @Override public void configure() { from("direct:orderLine") // Group messages by orderId header .aggregate(header("orderId"), new OrderLineAggregator()) .completionSize(10) // release when 10 lines collected .completionTimeout(5000) // or after 5 seconds, whichever first .to("direct:processOrderBatch"); } }

The strategy receives the current aggregate and the new incoming message, and returns the updated aggregate. On the first message, oldExchange is null.

import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; import java.util.*; public class OrderLineAggregator implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { List<OrderLine> lines; if (oldExchange == null) { // First message in the group — initialise the list lines = new ArrayList<>(); } else { lines = oldExchange.getIn().getBody(List.class); } OrderLine line = newExchange.getIn().getBody(OrderLine.class); lines.add(line); // Return oldExchange updated (or newExchange on first call) Exchange result = oldExchange != null ? oldExchange : newExchange; result.getIn().setBody(lines); return result; } } // Simpler — using a lambda (Java 8+) .aggregate(header("orderId"), (oldExchange, newExchange) -> { List<OrderLine> lines = oldExchange == null ? new ArrayList<>() : oldExchange.getIn().getBody(List.class); lines.add(newExchange.getIn().getBody(OrderLine.class)); Exchange result = oldExchange != null ? oldExchange : newExchange; result.getIn().setBody(lines); return result; })
ConditionDSLUse When
Fixed size.completionSize(N)Batch exactly N messages (e.g., bulk DB insert of 100)
Timeout.completionTimeout(ms)Flush incomplete groups after a time window
Predicate.completionPredicate(pred)Custom logic — e.g., last message has a "done" flag
All from split.completionFromBatchConsumer()Collect all messages produced by a Splitter
Size expression.completionSizeExpression(expr)Dynamic batch size from a header or bean
// Multiple conditions — release on whichever fires first .aggregate(header("customerId"), new CustomerEventAggregator()) .completionSize(50) .completionTimeout(10_000) .completionPredicate(body().contains("FINAL")) // or if payload contains marker .eagerCheckCompletion() // check predicate on each individual message, not just aggregate

The timeout fires a timer per correlation group. If a group hasn't reached its completionSize within the timeout, it is released with whatever messages have been collected — preventing groups from waiting forever on a missing message.

from("kafka:sensor.readings") .aggregate(header("deviceId"), new SensorReadingAggregator()) .completionSize(100) .completionTimeout(30_000) // flush every 30s even if under 100 readings .completionTimeoutCheckerInterval(1_000) // check every 1s (default 1s) .discardOnAggregationFailure() // discard group on strategy exception .to("direct:persistSensorBatch"); Each correlation group has its own independent timeout timer. A group for device "A" timing out does not affect device "B"'s timer. Camel manages this with an in-memory map (or persistent repository for crash recovery).

The class below shows the implementation. Key points are highlighted in the inline comments.

// Release the group when the aggregated body has a "complete" flag AggregationStrategy strategy = (old, nw) -> { // ... aggregate into a Map return old != null ? old : nw; }; from("direct:orderEvent") .aggregate(header("orderId"), strategy) .completionPredicate( // Group complete when header "eventType" == "ORDER_CLOSED" header("eventType").isEqualTo("ORDER_CLOSED") ) .eagerCheckCompletion() // evaluate on EACH new message before aggregating .completionTimeout(60_000) // safety net — release after 60s regardless .to("direct:closeOrder");

By default, Camel stores in-progress aggregations in memory — lost on restart. For crash recovery, use a JdbcAggregationRepository backed by a database table.

<!-- pom.xml --> <dependency> <groupId>org.apache.camel.springboot</groupId> <artifactId>camel-sql-starter</artifactId> </dependency> -- Required schema for JdbcAggregationRepository CREATE TABLE camel_aggregation ( id VARCHAR(255) NOT NULL, exchange_id VARCHAR(255) NOT NULL, exchange BYTEA NOT NULL, version BIGINT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), completed BOOLEAN NOT NULL DEFAULT FALSE, PRIMARY KEY (id) ); CREATE TABLE camel_aggregation_completed ( id VARCHAR(255) NOT NULL, exchange_id VARCHAR(255) NOT NULL, exchange BYTEA NOT NULL, version BIGINT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), completed BOOLEAN NOT NULL DEFAULT TRUE, PRIMARY KEY (id) ); @Configuration public class AggregationConfig { @Bean public JdbcAggregationRepository jdbcAggregationRepository(DataSource dataSource) { JdbcAggregationRepository repo = new JdbcAggregationRepository(dataSource, "camel_aggregation"); repo.setCompletedSuffix("_completed"); repo.setReturnOldExchangeOnCompletion(false); return repo; } } // Use in route from("direct:orderLine") .aggregate(header("orderId"), new OrderLineAggregator()) .aggregationRepository(jdbcAggregationRepository) .completionSize(10) .completionTimeout(5000) .to("direct:processOrderBatch");

The class below shows the implementation. Key points are highlighted in the inline comments.

// parallelProcessing — downstream route runs in parallel after aggregation from("direct:orderLine") .aggregate(header("orderId"), new OrderLineAggregator()) .completionSize(50) .completionTimeout(10_000) .parallelProcessing() // process released groups in parallel .executorService(Executors.newFixedThreadPool(4)) .to("direct:bulkInsert");

The class below shows the implementation. Key points are highlighted in the inline comments.

import org.apache.camel.processor.aggregate.*; // Collect all bodies into a List .aggregate(header("batchId"), new GroupedBodyAggregationStrategy()) // Collect all exchanges (full exchange list) .aggregate(header("batchId"), new GroupedExchangeAggregationStrategy()) // String concatenation with a separator .aggregate(header("batchId"), AggregationStrategies.string(",")) // Use the latest message body (last wins) .aggregate(header("batchId"), AggregationStrategies.useLatest()) // Use the original (first) message body (first wins) .aggregate(header("batchId"), AggregationStrategies.useOriginal())