Contents
- Core Concepts — Messages, Channels & Endpoints
- Message Channels
- MessagingGateway — Clean Entry Points
- Service Activator & Transformer
- Message Router & Filter
- Splitter & Aggregator
- Integration Flow DSL
- Channel Adapters — File, HTTP & JMS
Spring Integration models everything as Messages flowing through Channels and being processed by Endpoints:
- Message<T> — immutable wrapper: payload (the data) + headers (metadata map)
- MessageChannel — the pipe between components. Point-to-point (DirectChannel) or publish-subscribe (PublishSubscribeChannel)
- Endpoints — the nodes that consume and produce messages: ServiceActivator, Transformer, Router, Filter, Splitter, Aggregator, Gateway
<!-- Maven dependency -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
import org.springframework.integration.channel.*;
import org.springframework.messaging.MessageChannel;
@Configuration
public class ChannelConfig {
// DirectChannel — synchronous, point-to-point (default)
// The sender's thread executes the handler directly
@Bean
public MessageChannel orderChannel() {
return new DirectChannel();
}
// QueueChannel — buffered, async. Sender puts, handler polls
@Bean
public MessageChannel asyncOrderChannel() {
return new QueueChannel(100); // capacity 100
}
// PublishSubscribeChannel — fan-out to all subscribers
// Optionally async with a TaskExecutor
@Bean
public MessageChannel eventBusChannel() {
PublishSubscribeChannel ch = new PublishSubscribeChannel(Executors.newCachedThreadPool());
return ch;
}
// PriorityChannel — poll by message priority header
@Bean
public MessageChannel priorityChannel() {
return new PriorityChannel(50,
Comparator.comparingInt(m -> (int) m.getHeaders()
.getOrDefault("priority", 0)));
}
}
// Sending a message manually
@Autowired @Qualifier("orderChannel") MessageChannel orderChannel;
Message<Order> msg = MessageBuilder
.withPayload(order)
.setHeader("source", "web")
.setHeader("priority", 3)
.build();
orderChannel.send(msg);
A @MessagingGateway interface hides the messaging infrastructure — callers invoke a plain Java method and Spring Integration handles message creation and channel routing transparently.
// Define the gateway interface
@MessagingGateway
public interface OrderGateway {
// Send to "orderChannel", receive reply from "orderReplyChannel"
@Gateway(requestChannel = "orderChannel", replyChannel = "orderReplyChannel")
OrderConfirmation placeOrder(Order order);
// Fire-and-forget (no reply)
@Gateway(requestChannel = "notificationChannel")
void notifyUser(Notification notification);
// With timeout
@Gateway(requestChannel = "externalApiChannel", replyTimeout = 5000)
ApiResponse callExternalApi(ApiRequest request);
}
// Use it like a plain service — no messaging API visible to the caller
@Service
@RequiredArgsConstructor
public class CheckoutService {
private final OrderGateway orderGateway;
public OrderConfirmation checkout(Cart cart) {
Order order = buildOrder(cart);
return orderGateway.placeOrder(order); // internally sends a Message
}
}
// @ServiceActivator — calls a method when a message arrives on the input channel
@MessageEndpoint
public class OrderProcessor {
@ServiceActivator(inputChannel = "orderChannel", outputChannel = "orderReplyChannel")
public OrderConfirmation process(Order order) {
// 'order' is the message payload — Spring unwraps it automatically
return OrderConfirmation.of(order.id(), "CONFIRMED", Instant.now());
}
// Access headers alongside payload
@ServiceActivator(inputChannel = "enrichedOrderChannel")
public void processWithHeaders(
@Payload Order order,
@Header("source") String source,
@Header(MessageHeaders.CORRELATION_ID) String correlationId) {
log.info("Processing order {} from {} (correlation: {})",
order.id(), source, correlationId);
}
}
// @Transformer — converts the payload type
@MessageEndpoint
public class OrderTransformer {
@Transformer(inputChannel = "rawOrderChannel", outputChannel = "orderChannel")
public Order transformRawToOrder(String rawJson) {
return objectMapper.readValue(rawJson, Order.class);
}
// Header enricher — add/modify headers without changing payload
@Transformer(inputChannel = "orderChannel", outputChannel = "enrichedOrderChannel")
public Message<Order> enrichHeaders(Message<Order> message) {
return MessageBuilder.fromMessage(message)
.setHeader("processedAt", Instant.now().toString())
.setHeader("region", resolveRegion(message.getPayload()))
.build();
}
}
// @Router — routes a message to different channels based on content
@MessageEndpoint
public class OrderRouter {
@Router(inputChannel = "orderChannel")
public String routeByType(Order order) {
return switch (order.type()) {
case DIGITAL -> "digitalOrderChannel";
case PHYSICAL -> "physicalOrderChannel";
case SERVICE -> "serviceOrderChannel";
};
}
// Route to multiple channels (returns Collection)
@Router(inputChannel = "broadcastChannel")
public List<String> multicast(Event event) {
List<String> targets = new ArrayList<>();
targets.add("auditChannel");
if (event.requiresNotification()) targets.add("notificationChannel");
if (event.isHighPriority()) targets.add("alertChannel");
return targets;
}
}
// @Filter — drop messages that don't match a condition
@MessageEndpoint
public class OrderFilter {
@Filter(inputChannel = "orderChannel", outputChannel = "validOrderChannel",
discardChannel = "invalidOrderChannel")
public boolean isValid(Order order) {
return order.total().compareTo(BigDecimal.ZERO) > 0
&& order.customerId() != null;
}
}
The Splitter/Aggregator pair handles batch processing: split a message into N individual messages, process each independently in parallel, then aggregate the N results back into one.
// @Splitter — break one message into many
@MessageEndpoint
public class OrderLineSplitter {
@Splitter(inputChannel = "orderChannel", outputChannel = "orderLineChannel")
public List<OrderLine> split(Order order) {
return order.lines(); // each OrderLine becomes a separate message
// Spring automatically sets CORRELATION_ID and SEQUENCE_SIZE headers
}
}
// @Aggregator — collect N messages back into one
@MessageEndpoint
public class OrderLineAggregator {
@Aggregator(inputChannel = "processedLineChannel", outputChannel = "completedOrderChannel")
public OrderResult aggregate(List<LineResult> results) {
// Called when all lines from the same correlation group have arrived
return OrderResult.of(
results.stream().mapToDouble(LineResult::subtotal).sum(),
results.stream().allMatch(LineResult::success));
}
// Custom release strategy — release when all expected messages arrive
@ReleaseStrategy
public boolean isComplete(List<Message<?>> messages) {
int expected = (int) messages.get(0).getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE);
return messages.size() == expected;
}
// Custom correlation strategy — group by order ID
@CorrelationStrategy
public String correlate(LineResult result) {
return result.orderId();
}
}
The Java DSL (IntegrationFlow) is the modern, fluent way to define flows — more readable and refactor-friendly than annotations.
import org.springframework.integration.dsl.*;
@Configuration
public class OrderFlowConfig {
@Bean
public IntegrationFlow orderProcessingFlow() {
return IntegrationFlow
.from("rawOrderChannel") // entry channel
.transform(String.class, json ->
objectMapper.readValue(json, Order.class)) // String → Order
.filter(Order.class, o -> o.total().signum() > 0,
f -> f.discardChannel("invalidOrderChannel"))
.enrichHeaders(h -> h
.header("processedAt", Instant.now().toString())
.headerFunction("region", m -> resolveRegion(m.getPayload())))
.route(Order.class, o -> o.type().name().toLowerCase() + "OrderChannel")
.get();
}
@Bean
public IntegrationFlow digitalOrderFlow() {
return IntegrationFlow
.from("digitalOrderChannel")
.handle(OrderService.class, "processDigital") // bean method
.channel("orderReplyChannel")
.get();
}
// Poller-based flow — poll a QueueChannel on a schedule
@Bean
public IntegrationFlow asyncOrderFlow() {
return IntegrationFlow
.from("asyncOrderChannel",
c -> c.poller(Pollers.fixedDelay(Duration.ofMillis(200))))
.handle(OrderService.class, "processAsync")
.get();
}
}
// File inbound adapter — poll a directory and emit one Message per file
@Bean
public IntegrationFlow fileImportFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File("/data/imports"))
.patternFilter("*.csv")
.preventDuplicates(true),
c -> c.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.transform(Files.toStringTransformer()) // File → String
.channel("csvProcessingChannel")
.get();
}
// File outbound adapter — write messages to files
@Bean
public IntegrationFlow fileExportFlow() {
return IntegrationFlow
.from("exportChannel")
.handle(Files.outboundAdapter(new File("/data/exports"))
.fileNameGenerator(msg ->
"order-" + msg.getHeaders().get("orderId") + ".json")
.autoCreateDirectory(true))
.get();
}
// HTTP outbound gateway — call a REST endpoint
@Bean
public IntegrationFlow httpCallFlow() {
return IntegrationFlow
.from("httpRequestChannel")
.handle(Http.outboundGateway("https://api.example.com/orders/{id}")
.uriVariable("id", "payload.id")
.httpMethod(HttpMethod.GET)
.expectedResponseType(OrderDto.class))
.channel("httpResponseChannel")
.get();
}
// JMS inbound adapter
@Bean
public IntegrationFlow jmsInboundFlow(ConnectionFactory cf) {
return IntegrationFlow
.from(Jms.inboundAdapter(cf).destination("orders.queue"),
c -> c.poller(Pollers.fixedDelay(Duration.ofMillis(500))))
.transform(String.class, json -> objectMapper.readValue(json, Order.class))
.channel("orderChannel")
.get();
}
Spring Integration has adapters for 30+ technologies out of the box: AMQP, Apache Kafka, AWS S3/SQS/SNS, Azure Service Bus, FTP/SFTP, JDBC, LDAP, MongoDB, Redis, TCP/UDP, WebSocket, and more. Each follows the same channel adapter pattern.