Contents
- throttle() — Requests per Period
- Async Throttling
- Dynamic Rate Limits
- delay() — Fixed & Expression-Based Pauses
- Per-Correlation Throttling
- Circuit Breaker with Resilience4j
- Back Pressure Patterns
- Throttler vs Delay vs Circuit Breaker
throttle(N) allows at most N messages to pass through per time period (default 1 second). Excess messages are held and released in order when capacity is available.
@Component
public class ThrottledApiRoute extends RouteBuilder {
@Override
public void configure() {
from("direct:callExternalApi")
// Allow at most 10 messages per second to the external API
.throttle(10)
.timePeriodMillis(1_000) // default is 1000ms
.to("http:api.example.com/endpoint");
// Per minute rate limit
from("direct:sendEmail")
.throttle(100)
.timePeriodMillis(60_000) // 100 emails per minute
.to("smtp:mail.example.com");
}
}
By default, throttling blocks the calling thread. Use asyncDelayed(true) to release the calling thread immediately and schedule delivery asynchronously — preventing thread starvation under high load.
from("kafka:api.requests")
.throttle(50)
.timePeriodMillis(1_000)
.asyncDelayed() // non-blocking — releases the Kafka consumer thread
.executorService(Executors.newFixedThreadPool(10))
.to("http:downstream-service/api");
The throttle rate can be an expression evaluated per message — useful for tenant-based rate limits or header-driven throttling.
// Rate limit from a header (set by upstream auth/routing)
from("direct:tenantRequest")
.throttle(header("X-Rate-Limit")) // header contains the allowed rate
.timePeriodMillis(1_000)
.to("direct:processRequest");
// Rate limit from a bean (e.g., read from a database or config service)
from("direct:apiCall")
.throttle(method("rateLimitService", "getLimit")) // dynamic bean lookup
.timePeriodMillis(1_000)
.to("http:external-api");
@Component("rateLimitService")
public class RateLimitService {
private final RateLimitRepository repo;
// Called per message to determine current allowed rate
public long getLimit(Exchange exchange) {
String tenantId = exchange.getIn().getHeader("tenantId", String.class);
return repo.findByTenantId(tenantId)
.map(RateLimit::getRequestsPerSecond)
.orElse(10L); // default 10 rps
}
}
The delay() EIP introduces a fixed wait before forwarding the message — useful for polling intervals, retry back-off, or pacing batch jobs.
// Fixed 500ms delay between messages
from("direct:batchProcessor")
.delay(500)
.to("direct:callApi");
// Expression-based delay — exponential back-off using a header
from("direct:retryWithBackoff")
.delay(simple("${header.retryCount} * 1000")) // 1s, 2s, 3s, ...
.to("direct:callFlakyService");
// Async delay — don't block the thread
from("direct:scheduledTask")
.delay(2_000)
.asyncDelayed()
.to("direct:processTask");
Throttle independently per key — e.g., limit each tenant to 5 requests/second without affecting other tenants.
from("direct:multiTenantApi")
.throttle(5)
.timePeriodMillis(1_000)
.correlationExpression(header("tenantId")) // separate quota per tenant
.to("http:backend-service/api");
The circuit breaker complements throttling: while the throttler controls rate, the circuit breaker stops sending entirely when a downstream service is failing.
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-resilience4j-starter</artifactId>
</dependency>
from("direct:callPaymentGateway")
.throttle(20).timePeriodMillis(1_000) // max 20 rps
.circuitBreaker()
.resilience4jConfiguration()
.slidingWindowSize(10)
.failureRateThreshold(50) // open circuit if 50% fail
.waitDurationInOpenState(10_000) // wait 10s before half-open
.end()
.to("http:payment-gateway/charge")
.onFallback()
.log(LoggingLevel.WARN, "Circuit open — payment gateway unavailable")
.setBody(constant("{\"status\":\"PENDING\",\"message\":\"Payment deferred\"}"))
.end();
The class below shows the implementation. Key points are highlighted in the inline comments.
// Pattern: bounded queue + throttle + dead letter on overflow
from("kafka:order.events")
.throttle(100)
.timePeriodMillis(1_000)
.asyncDelayed()
.rejectExecution(true) // reject (instead of queue) when executor is full
.to("direct:processOrder")
.onException(RejectedExecutionException.class)
.handled(true)
.to("kafka:order.events.overflow"); // push back for later retry
// Pattern: sample — process only 1 in N messages (lossy but zero latency)
from("kafka:sensor.highfreq")
.sample()
.samplePeriod(1)
.units(TimeUnit.SECONDS) // process at most 1 message per second per partition
.to("direct:aggregateSensor");
| EIP | Purpose | Messages Dropped? | Use When |
| throttle() | Limit messages per time window | No — queued | Downstream has a known rate limit (API quota) |
| delay() | Fixed pause before forwarding | No — delayed | Pacing, retry back-off, polling intervals |
| sample() | Lossy rate reduction | Yes — sampled | High-frequency sensor data where loss is acceptable |
| circuitBreaker() | Stop sending on failure | Yes — fallback | Downstream is unreliable; prevent cascade failure |