Contents
- Aggregator Basics
- AggregationStrategy
- Completion Conditions
- Completion Timeout
- Completion Predicate
- Persistent Aggregation Repository
- Parallel Aggregation
- Built-in Aggregation Strategies
The aggregate() DSL takes two mandatory arguments: a correlation expression (how to group messages) and an AggregationStrategy (how to combine them). A completion condition determines when the group is released downstream.
@Component
public class OrderBatchRoute extends RouteBuilder {
@Override
public void configure() {
from("direct:orderLine")
// Group messages by orderId header
.aggregate(header("orderId"), new OrderLineAggregator())
.completionSize(10) // release when 10 lines collected
.completionTimeout(5000) // or after 5 seconds, whichever first
.to("direct:processOrderBatch");
}
}
The strategy receives the current aggregate and the new incoming message, and returns the updated aggregate. On the first message, oldExchange is null.
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import java.util.*;
public class OrderLineAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
List<OrderLine> lines;
if (oldExchange == null) {
// First message in the group — initialise the list
lines = new ArrayList<>();
} else {
lines = oldExchange.getIn().getBody(List.class);
}
OrderLine line = newExchange.getIn().getBody(OrderLine.class);
lines.add(line);
// Return oldExchange updated (or newExchange on first call)
Exchange result = oldExchange != null ? oldExchange : newExchange;
result.getIn().setBody(lines);
return result;
}
}
// Simpler — using a lambda (Java 8+)
.aggregate(header("orderId"),
(oldExchange, newExchange) -> {
List<OrderLine> lines = oldExchange == null
? new ArrayList<>()
: oldExchange.getIn().getBody(List.class);
lines.add(newExchange.getIn().getBody(OrderLine.class));
Exchange result = oldExchange != null ? oldExchange : newExchange;
result.getIn().setBody(lines);
return result;
})
| Condition | DSL | Use When |
| Fixed size | .completionSize(N) | Batch exactly N messages (e.g., bulk DB insert of 100) |
| Timeout | .completionTimeout(ms) | Flush incomplete groups after a time window |
| Predicate | .completionPredicate(pred) | Custom logic — e.g., last message has a "done" flag |
| All from split | .completionFromBatchConsumer() | Collect all messages produced by a Splitter |
| Size expression | .completionSizeExpression(expr) | Dynamic batch size from a header or bean |
// Multiple conditions — release on whichever fires first
.aggregate(header("customerId"), new CustomerEventAggregator())
.completionSize(50)
.completionTimeout(10_000)
.completionPredicate(body().contains("FINAL")) // or if payload contains marker
.eagerCheckCompletion() // check predicate on each individual message, not just aggregate
The timeout fires a timer per correlation group. If a group hasn't reached its completionSize within the timeout, it is released with whatever messages have been collected — preventing groups from waiting forever on a missing message.
from("kafka:sensor.readings")
.aggregate(header("deviceId"), new SensorReadingAggregator())
.completionSize(100)
.completionTimeout(30_000) // flush every 30s even if under 100 readings
.completionTimeoutCheckerInterval(1_000) // check every 1s (default 1s)
.discardOnAggregationFailure() // discard group on strategy exception
.to("direct:persistSensorBatch");
Each correlation group has its own independent timeout timer. A group for device "A" timing out does not affect device "B"'s timer. Camel manages this with an in-memory map (or persistent repository for crash recovery).
The class below shows the implementation. Key points are highlighted in the inline comments.
// Release the group when the aggregated body has a "complete" flag
AggregationStrategy strategy = (old, nw) -> {
// ... aggregate into a Map
return old != null ? old : nw;
};
from("direct:orderEvent")
.aggregate(header("orderId"), strategy)
.completionPredicate(
// Group complete when header "eventType" == "ORDER_CLOSED"
header("eventType").isEqualTo("ORDER_CLOSED")
)
.eagerCheckCompletion() // evaluate on EACH new message before aggregating
.completionTimeout(60_000) // safety net — release after 60s regardless
.to("direct:closeOrder");
By default, Camel stores in-progress aggregations in memory — lost on restart. For crash recovery, use a JdbcAggregationRepository backed by a database table.
<!-- pom.xml -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-sql-starter</artifactId>
</dependency>
-- Required schema for JdbcAggregationRepository
CREATE TABLE camel_aggregation (
id VARCHAR(255) NOT NULL,
exchange_id VARCHAR(255) NOT NULL,
exchange BYTEA NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed BOOLEAN NOT NULL DEFAULT FALSE,
PRIMARY KEY (id)
);
CREATE TABLE camel_aggregation_completed (
id VARCHAR(255) NOT NULL,
exchange_id VARCHAR(255) NOT NULL,
exchange BYTEA NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed BOOLEAN NOT NULL DEFAULT TRUE,
PRIMARY KEY (id)
);
@Configuration
public class AggregationConfig {
@Bean
public JdbcAggregationRepository jdbcAggregationRepository(DataSource dataSource) {
JdbcAggregationRepository repo =
new JdbcAggregationRepository(dataSource, "camel_aggregation");
repo.setCompletedSuffix("_completed");
repo.setReturnOldExchangeOnCompletion(false);
return repo;
}
}
// Use in route
from("direct:orderLine")
.aggregate(header("orderId"), new OrderLineAggregator())
.aggregationRepository(jdbcAggregationRepository)
.completionSize(10)
.completionTimeout(5000)
.to("direct:processOrderBatch");
The class below shows the implementation. Key points are highlighted in the inline comments.
// parallelProcessing — downstream route runs in parallel after aggregation
from("direct:orderLine")
.aggregate(header("orderId"), new OrderLineAggregator())
.completionSize(50)
.completionTimeout(10_000)
.parallelProcessing() // process released groups in parallel
.executorService(Executors.newFixedThreadPool(4))
.to("direct:bulkInsert");
The class below shows the implementation. Key points are highlighted in the inline comments.
import org.apache.camel.processor.aggregate.*;
// Collect all bodies into a List
.aggregate(header("batchId"), new GroupedBodyAggregationStrategy())
// Collect all exchanges (full exchange list)
.aggregate(header("batchId"), new GroupedExchangeAggregationStrategy())
// String concatenation with a separator
.aggregate(header("batchId"), AggregationStrategies.string(","))
// Use the latest message body (last wins)
.aggregate(header("batchId"), AggregationStrategies.useLatest())
// Use the original (first) message body (first wins)
.aggregate(header("batchId"), AggregationStrategies.useOriginal())