Contents

Spring Integration models everything as Messages flowing through Channels and being processed by Endpoints:

<!-- Maven dependency --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency>
import org.springframework.integration.channel.*; import org.springframework.messaging.MessageChannel; @Configuration public class ChannelConfig { // DirectChannel — synchronous, point-to-point (default) // The sender's thread executes the handler directly @Bean public MessageChannel orderChannel() { return new DirectChannel(); } // QueueChannel — buffered, async. Sender puts, handler polls @Bean public MessageChannel asyncOrderChannel() { return new QueueChannel(100); // capacity 100 } // PublishSubscribeChannel — fan-out to all subscribers // Optionally async with a TaskExecutor @Bean public MessageChannel eventBusChannel() { PublishSubscribeChannel ch = new PublishSubscribeChannel(Executors.newCachedThreadPool()); return ch; } // PriorityChannel — poll by message priority header @Bean public MessageChannel priorityChannel() { return new PriorityChannel(50, Comparator.comparingInt(m -> (int) m.getHeaders() .getOrDefault("priority", 0))); } } // Sending a message manually @Autowired @Qualifier("orderChannel") MessageChannel orderChannel; Message<Order> msg = MessageBuilder .withPayload(order) .setHeader("source", "web") .setHeader("priority", 3) .build(); orderChannel.send(msg);

A @MessagingGateway interface hides the messaging infrastructure — callers invoke a plain Java method and Spring Integration handles message creation and channel routing transparently.

// Define the gateway interface @MessagingGateway public interface OrderGateway { // Send to "orderChannel", receive reply from "orderReplyChannel" @Gateway(requestChannel = "orderChannel", replyChannel = "orderReplyChannel") OrderConfirmation placeOrder(Order order); // Fire-and-forget (no reply) @Gateway(requestChannel = "notificationChannel") void notifyUser(Notification notification); // With timeout @Gateway(requestChannel = "externalApiChannel", replyTimeout = 5000) ApiResponse callExternalApi(ApiRequest request); } // Use it like a plain service — no messaging API visible to the caller @Service @RequiredArgsConstructor public class CheckoutService { private final OrderGateway orderGateway; public OrderConfirmation checkout(Cart cart) { Order order = buildOrder(cart); return orderGateway.placeOrder(order); // internally sends a Message } }
// @ServiceActivator — calls a method when a message arrives on the input channel @MessageEndpoint public class OrderProcessor { @ServiceActivator(inputChannel = "orderChannel", outputChannel = "orderReplyChannel") public OrderConfirmation process(Order order) { // 'order' is the message payload — Spring unwraps it automatically return OrderConfirmation.of(order.id(), "CONFIRMED", Instant.now()); } // Access headers alongside payload @ServiceActivator(inputChannel = "enrichedOrderChannel") public void processWithHeaders( @Payload Order order, @Header("source") String source, @Header(MessageHeaders.CORRELATION_ID) String correlationId) { log.info("Processing order {} from {} (correlation: {})", order.id(), source, correlationId); } } // @Transformer — converts the payload type @MessageEndpoint public class OrderTransformer { @Transformer(inputChannel = "rawOrderChannel", outputChannel = "orderChannel") public Order transformRawToOrder(String rawJson) { return objectMapper.readValue(rawJson, Order.class); } // Header enricher — add/modify headers without changing payload @Transformer(inputChannel = "orderChannel", outputChannel = "enrichedOrderChannel") public Message<Order> enrichHeaders(Message<Order> message) { return MessageBuilder.fromMessage(message) .setHeader("processedAt", Instant.now().toString()) .setHeader("region", resolveRegion(message.getPayload())) .build(); } } // @Router — routes a message to different channels based on content @MessageEndpoint public class OrderRouter { @Router(inputChannel = "orderChannel") public String routeByType(Order order) { return switch (order.type()) { case DIGITAL -> "digitalOrderChannel"; case PHYSICAL -> "physicalOrderChannel"; case SERVICE -> "serviceOrderChannel"; }; } // Route to multiple channels (returns Collection) @Router(inputChannel = "broadcastChannel") public List<String> multicast(Event event) { List<String> targets = new ArrayList<>(); targets.add("auditChannel"); if (event.requiresNotification()) targets.add("notificationChannel"); if (event.isHighPriority()) targets.add("alertChannel"); return targets; } } // @Filter — drop messages that don't match a condition @MessageEndpoint public class OrderFilter { @Filter(inputChannel = "orderChannel", outputChannel = "validOrderChannel", discardChannel = "invalidOrderChannel") public boolean isValid(Order order) { return order.total().compareTo(BigDecimal.ZERO) > 0 && order.customerId() != null; } }

The Splitter/Aggregator pair handles batch processing: split a message into N individual messages, process each independently in parallel, then aggregate the N results back into one.

// @Splitter — break one message into many @MessageEndpoint public class OrderLineSplitter { @Splitter(inputChannel = "orderChannel", outputChannel = "orderLineChannel") public List<OrderLine> split(Order order) { return order.lines(); // each OrderLine becomes a separate message // Spring automatically sets CORRELATION_ID and SEQUENCE_SIZE headers } } // @Aggregator — collect N messages back into one @MessageEndpoint public class OrderLineAggregator { @Aggregator(inputChannel = "processedLineChannel", outputChannel = "completedOrderChannel") public OrderResult aggregate(List<LineResult> results) { // Called when all lines from the same correlation group have arrived return OrderResult.of( results.stream().mapToDouble(LineResult::subtotal).sum(), results.stream().allMatch(LineResult::success)); } // Custom release strategy — release when all expected messages arrive @ReleaseStrategy public boolean isComplete(List<Message<?>> messages) { int expected = (int) messages.get(0).getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE); return messages.size() == expected; } // Custom correlation strategy — group by order ID @CorrelationStrategy public String correlate(LineResult result) { return result.orderId(); } }

The Java DSL (IntegrationFlow) is the modern, fluent way to define flows — more readable and refactor-friendly than annotations.

import org.springframework.integration.dsl.*; @Configuration public class OrderFlowConfig { @Bean public IntegrationFlow orderProcessingFlow() { return IntegrationFlow .from("rawOrderChannel") // entry channel .transform(String.class, json -> objectMapper.readValue(json, Order.class)) // String → Order .filter(Order.class, o -> o.total().signum() > 0, f -> f.discardChannel("invalidOrderChannel")) .enrichHeaders(h -> h .header("processedAt", Instant.now().toString()) .headerFunction("region", m -> resolveRegion(m.getPayload()))) .route(Order.class, o -> o.type().name().toLowerCase() + "OrderChannel") .get(); } @Bean public IntegrationFlow digitalOrderFlow() { return IntegrationFlow .from("digitalOrderChannel") .handle(OrderService.class, "processDigital") // bean method .channel("orderReplyChannel") .get(); } // Poller-based flow — poll a QueueChannel on a schedule @Bean public IntegrationFlow asyncOrderFlow() { return IntegrationFlow .from("asyncOrderChannel", c -> c.poller(Pollers.fixedDelay(Duration.ofMillis(200)))) .handle(OrderService.class, "processAsync") .get(); } }
// File inbound adapter — poll a directory and emit one Message per file @Bean public IntegrationFlow fileImportFlow() { return IntegrationFlow .from(Files.inboundAdapter(new File("/data/imports")) .patternFilter("*.csv") .preventDuplicates(true), c -> c.poller(Pollers.fixedDelay(Duration.ofSeconds(5)))) .transform(Files.toStringTransformer()) // File → String .channel("csvProcessingChannel") .get(); } // File outbound adapter — write messages to files @Bean public IntegrationFlow fileExportFlow() { return IntegrationFlow .from("exportChannel") .handle(Files.outboundAdapter(new File("/data/exports")) .fileNameGenerator(msg -> "order-" + msg.getHeaders().get("orderId") + ".json") .autoCreateDirectory(true)) .get(); } // HTTP outbound gateway — call a REST endpoint @Bean public IntegrationFlow httpCallFlow() { return IntegrationFlow .from("httpRequestChannel") .handle(Http.outboundGateway("https://api.example.com/orders/{id}") .uriVariable("id", "payload.id") .httpMethod(HttpMethod.GET) .expectedResponseType(OrderDto.class)) .channel("httpResponseChannel") .get(); } // JMS inbound adapter @Bean public IntegrationFlow jmsInboundFlow(ConnectionFactory cf) { return IntegrationFlow .from(Jms.inboundAdapter(cf).destination("orders.queue"), c -> c.poller(Pollers.fixedDelay(Duration.ofMillis(500)))) .transform(String.class, json -> objectMapper.readValue(json, Order.class)) .channel("orderChannel") .get(); } Spring Integration has adapters for 30+ technologies out of the box: AMQP, Apache Kafka, AWS S3/SQS/SNS, Azure Service Bus, FTP/SFTP, JDBC, LDAP, MongoDB, Redis, TCP/UDP, WebSocket, and more. Each follows the same channel adapter pattern.