Contents
- Why Monitor Kafka
- Micrometer & Actuator Setup
- Built-in Kafka Metrics
- Consumer Lag Monitoring
- Custom Metrics with MeterRegistry
- Prometheus & Grafana
- Health Indicators
- Alerting Best Practices
Kafka sits in the critical path of most event-driven architectures. Without monitoring you are flying blind — problems surface only when downstream systems break or users complain. Three categories of metrics matter most:
- Consumer lag — the gap between the latest offset produced and the last offset committed by a consumer group. Growing lag means consumers cannot keep up.
- Throughput — records produced and consumed per second. A sudden drop signals a failed producer or a stuck consumer.
- Error rates — serialization failures, authorization errors, and network timeouts. Even a small spike can cascade.
What can go wrong without monitoring:
- Consumer lag grows silently until the retention window expires and messages are lost forever.
- A poison-pill record causes a consumer to enter an infinite retry loop while the rest of the partition stalls.
- A rebalance storm (consumers repeatedly joining and leaving the group) kills throughput but produces no application-level exception.
- Producer batches fail due to a broker leadership change — records pile up in the send buffer until memory runs out.
Kafka metrics are JMX-based under the hood. Spring Boot with Micrometer bridges these JMX MBeans into a unified metrics facade so you never have to deal with JMX directly.
// A simple example: without monitoring, this silent failure goes unnoticed
@Service
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-service")
public void consume(String payload) {
try {
orderService.process(payload);
} catch (Exception e) {
// Swallowed exception — no metric, no alert, no visibility
log.error("Failed to process order", e);
}
}
}
By the end of this guide every failure like the one above will increment a counter, appear on a dashboard, and fire an alert.
Spring Boot Actuator exposes application metrics through the /actuator/metrics endpoint. Micrometer acts as the metrics facade — add the Prometheus registry to export metrics in a format Prometheus can scrape.
<dependencies>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Actuator for /actuator endpoints -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Micrometer Prometheus registry -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies>
Enable the Prometheus endpoint and Kafka metrics in application.properties:
# Expose health, metrics, and prometheus endpoints
management.endpoints.web.exposure.include=health,metrics,prometheus
# Enable Kafka consumer and producer metrics via Micrometer
spring.kafka.consumer.properties[metrics.recording.level]=INFO
spring.kafka.producer.properties[metrics.recording.level]=INFO
# Kafka connection
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=order-service
spring.kafka.consumer.auto-offset-reset=earliest
Setting metrics.recording.level to DEBUG captures per-broker and per-topic metrics. This generates a large number of time series — use INFO in production unless you need fine-grained breakdowns.
Once the application starts, verify that Kafka metrics are available:
# Check available Kafka metrics
curl http://localhost:8080/actuator/metrics | jq '.names[]' | grep kafka
# Fetch a specific metric
curl http://localhost:8080/actuator/metrics/kafka.consumer.fetch.manager.records.consumed.rate
# Prometheus format (for scraping)
curl http://localhost:8080/actuator/prometheus | grep kafka_consumer
Spring Boot auto-registers Kafka client metrics with Micrometer when Actuator is on the classpath. Here are the key metrics grouped by role.
Consumer Metrics
| Metric | Description |
| kafka.consumer.fetch.manager.records.consumed.rate | Records consumed per second across all topics |
| kafka.consumer.fetch.manager.records.lag | Number of records the consumer is behind the broker on a partition |
| kafka.consumer.fetch.manager.records.lag.max | Maximum lag across all assigned partitions |
| kafka.consumer.fetch.manager.fetch.latency.avg | Average time for a fetch request round-trip |
| kafka.consumer.coordinator.commit.rate | Offset commits per second |
| kafka.consumer.coordinator.rebalance.rate.per.hour | Consumer rebalances per hour — high values indicate instability |
Producer Metrics
| Metric | Description |
| kafka.producer.record.send.rate | Records sent per second |
| kafka.producer.record.error.rate | Records that failed to send per second |
| kafka.producer.request.latency.avg | Average latency for produce requests |
| kafka.producer.batch.size.avg | Average number of bytes per batch |
| kafka.producer.buffer.available.bytes | Available send buffer — drops to zero under backpressure |
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MetricsConfig {
// Add common tags to every metric for easier filtering in Grafana
@Bean
public MeterRegistryCustomizer<MeterRegistry> commonTags() {
return registry -> registry.config()
.commonTags("application", "order-service")
.commonTags("environment", "production");
}
}
Common tags like application and environment are attached to every metric. This lets you filter dashboards by service or environment without changing individual metric registrations.
Consumer lag is the single most important Kafka metric. It measures how far behind a consumer group is from the latest produced offset. A lag of zero means the consumer is fully caught up. Growing lag means the consumer cannot process records fast enough — eventually records may fall off the retention window and be lost.
The built-in metric kafka.consumer.fetch.manager.records.lag reports per-partition lag. Spring Boot tags it with client.id and topic so you can drill down.
# Query lag for a specific topic via Actuator
curl "http://localhost:8080/actuator/metrics/kafka.consumer.fetch.manager.records.lag?tag=topic:orders"
For more control, implement a ConsumerAwareRebalanceListener to track which partitions are assigned to each consumer instance:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.stereotype.Component;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class LagTrackingRebalanceListener implements ConsumerAwareRebalanceListener {
private final MeterRegistry meterRegistry;
private final Set<TopicPartition> assignedPartitions = ConcurrentHashMap.newKeySet();
public LagTrackingRebalanceListener(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
assignedPartitions.addAll(partitions);
partitions.forEach(tp ->
meterRegistry.gauge("kafka.consumer.partition.assigned",
Tags.of("topic", tp.topic(), "partition", String.valueOf(tp.partition())),
1)
);
log.info("Partitions assigned: {}", partitions);
}
@Override
public void onPartitionsRevoked(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
assignedPartitions.removeAll(partitions);
log.warn("Partitions revoked: {}", partitions);
}
public Set<TopicPartition> getAssignedPartitions() {
return Set.copyOf(assignedPartitions);
}
}
Register the listener with your container factory:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory,
LagTrackingRebalanceListener rebalanceListener) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setConsumerRebalanceListener(rebalanceListener);
return factory;
}
}
Consumer lag reported by the Kafka client is based on the last fetch response. If a consumer is paused or stuck, the lag metric may not update. Use an external tool like Burrow or the Kafka Admin API for broker-side lag checks in critical systems.
Built-in metrics cover Kafka client internals. Application-level metrics — how many orders were processed, how many failed, how long processing took — require custom instrumentation. Inject MeterRegistry and create counters, timers, and gauges.
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class OrderConsumer {
private final OrderService orderService;
private final Counter processedCounter;
private final Counter failedCounter;
private final Timer processingTimer;
public OrderConsumer(OrderService orderService, MeterRegistry meterRegistry) {
this.orderService = orderService;
this.processedCounter = Counter.builder("kafka.orders.processed")
.description("Total orders successfully processed")
.tag("topic", "orders")
.register(meterRegistry);
this.failedCounter = Counter.builder("kafka.orders.failed")
.description("Total orders that failed processing")
.tag("topic", "orders")
.register(meterRegistry);
this.processingTimer = Timer.builder("kafka.orders.processing.duration")
.description("Time taken to process each order")
.tag("topic", "orders")
.publishPercentileHistogram()
.register(meterRegistry);
}
@KafkaListener(topics = "orders", groupId = "order-service")
public void consume(String payload) {
processingTimer.record(() -> {
try {
orderService.process(payload);
processedCounter.increment();
} catch (Exception e) {
failedCounter.increment();
throw e; // Let the error handler deal with retries
}
});
}
}
For a gauge that tracks a dynamic value such as an internal queue depth:
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Component
public class OrderBuffer {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10_000);
private final MeterRegistry meterRegistry;
public OrderBuffer(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@PostConstruct
public void registerMetrics() {
// Gauge automatically reads queue.size() on each scrape
meterRegistry.gauge("kafka.orders.buffer.size",
queue, BlockingQueue::size);
meterRegistry.gauge("kafka.orders.buffer.remaining.capacity",
queue, BlockingQueue::remainingCapacity);
}
public void enqueue(String order) {
queue.offer(order);
}
public String dequeue() throws InterruptedException {
return queue.take();
}
}
Use publishPercentileHistogram() on timers to get p50, p95, and p99 latency buckets in Prometheus. This adds histogram buckets to the metric but increases cardinality — use it selectively on high-value metrics.
With Micrometer exporting metrics in Prometheus format at /actuator/prometheus, configure Prometheus to scrape your application.
# prometheus.yml — scrape config for Spring Boot app
scrape_configs:
- job_name: 'order-service'
metrics_path: '/actuator/prometheus'
scrape_interval: 15s
static_configs:
- targets: ['order-service:8080']
labels:
environment: 'production'
Useful PromQL Queries
These queries power most Kafka dashboards. Copy them into Grafana panel queries.
Consumer lag per partition:
kafka_consumer_fetch_manager_records_lag{topic="orders"}
Total consumer lag across all partitions (sum by consumer group):
sum by (client_id) (kafka_consumer_fetch_manager_records_lag{topic="orders"})
Records consumed per second (rate over 5 minutes):
rate(kafka_consumer_fetch_manager_records_consumed_total{topic="orders"}[5m])
Order processing failure rate:
rate(kafka_orders_failed_total{topic="orders"}[5m])
P99 processing latency:
histogram_quantile(0.99, rate(kafka_orders_processing_duration_seconds_bucket{topic="orders"}[5m]))
Producer error rate:
rate(kafka_producer_record_error_total[5m])
Sample Grafana Panel — Consumer Lag
Import this JSON into a Grafana dashboard to create a consumer lag time-series panel:
{
"type": "timeseries",
"title": "Kafka Consumer Lag — orders",
"datasource": "Prometheus",
"targets": [
{
"expr": "sum by (client_id, partition) (kafka_consumer_fetch_manager_records_lag{topic=\"orders\"})",
"legendFormat": "{{client_id}} — partition {{partition}}"
}
],
"fieldConfig": {
"defaults": {
"unit": "short",
"thresholds": {
"steps": [
{ "value": 0, "color": "green" },
{ "value": 1000, "color": "yellow" },
{ "value": 10000, "color": "red" }
]
}
}
},
"options": {
"tooltip": { "mode": "multi" },
"legend": { "displayMode": "table", "placement": "bottom" }
}
}
Set threshold colors in Grafana to highlight lag severity — green below 1,000, yellow up to 10,000, red above. Adjust thresholds based on your throughput and SLA requirements.
Spring Boot Actuator includes a health endpoint at /actuator/health. When spring-kafka is on the classpath, Spring Boot automatically registers a KafkaHealthIndicator that checks broker connectivity.
# Show full health details (UP/DOWN with component breakdown)
management.endpoint.health.show-details=always
# Include Kafka health indicator
management.health.kafka.enabled=true
The default health indicator verifies that the application can describe the Kafka cluster. For deeper checks — like verifying that a consumer group is active and partitions are assigned — create a custom health indicator:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.common.ConsumerGroupState;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Component("kafkaConsumerGroupHealth")
public class KafkaConsumerGroupHealthIndicator implements HealthIndicator {
private final KafkaAdmin kafkaAdmin;
private final String consumerGroup = "order-service";
public KafkaConsumerGroupHealthIndicator(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;
}
@Override
public Health health() {
try (AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
DescribeConsumerGroupsResult result =
client.describeConsumerGroups(Collections.singletonList(consumerGroup));
Map<String, ConsumerGroupDescription> groups =
result.all().get(5, TimeUnit.SECONDS);
ConsumerGroupDescription group = groups.get(consumerGroup);
if (group == null) {
return Health.down()
.withDetail("consumerGroup", consumerGroup)
.withDetail("reason", "Consumer group not found")
.build();
}
ConsumerGroupState state = group.state();
int memberCount = group.members().size();
if (state == ConsumerGroupState.STABLE && memberCount > 0) {
return Health.up()
.withDetail("consumerGroup", consumerGroup)
.withDetail("state", state.toString())
.withDetail("members", memberCount)
.build();
}
return Health.down()
.withDetail("consumerGroup", consumerGroup)
.withDetail("state", state.toString())
.withDetail("members", memberCount)
.build();
} catch (Exception e) {
return Health.down(e)
.withDetail("consumerGroup", consumerGroup)
.build();
}
}
}
A custom health check for broker connectivity that validates topic metadata exists:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.Node;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@Component("kafkaBrokerHealth")
public class KafkaBrokerHealthIndicator implements HealthIndicator {
private final KafkaAdmin kafkaAdmin;
private final int expectedBrokers;
public KafkaBrokerHealthIndicator(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;
this.expectedBrokers = 3; // Adjust for your cluster
}
@Override
public Health health() {
try (AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
DescribeClusterResult cluster = client.describeCluster();
Collection<Node> nodes = cluster.nodes().get(5, TimeUnit.SECONDS);
String clusterId = cluster.clusterId().get(5, TimeUnit.SECONDS);
Health.Builder builder = (nodes.size() >= expectedBrokers)
? Health.up()
: Health.down().withDetail("reason", "Expected " + expectedBrokers
+ " brokers but found " + nodes.size());
return builder
.withDetail("clusterId", clusterId)
.withDetail("brokerCount", nodes.size())
.withDetail("brokers", nodes.stream()
.map(n -> n.host() + ":" + n.port())
.toList())
.build();
} catch (Exception e) {
return Health.down(e).build();
}
}
}
Custom health indicators that open an AdminClient on every health check can be expensive. Use the management.endpoint.health.cache.time-to-live property (e.g., 10s) to cache the health result and avoid hammering the Kafka cluster.
# Cache health check results for 10 seconds
management.endpoint.health.cache.time-to-live=10s
Dashboards show what is happening; alerts tell you when something needs attention. Focus alerts on actionable conditions and avoid noise.
What to Alert On
- Consumer lag threshold — lag above a sustained threshold means consumers are falling behind. Alert when total lag exceeds a value for more than 5 minutes.
- Error rate spike — a sudden increase in kafka.orders.failed or kafka.producer.record.error.rate indicates a systemic problem.
- Consumer rebalance storms — frequent rebalances (more than a few per hour) cause processing pauses and duplicate delivery.
- Zero throughput — if records.consumed.rate drops to zero for a topic that normally has traffic, the consumer is likely dead or stuck.
- Producer buffer exhaustion — buffer.available.bytes near zero means the producer is blocked waiting for space.
Prometheus Alert Rules
# prometheus-alerts.yml
groups:
- name: kafka-alerts
rules:
# Consumer lag too high for more than 5 minutes
- alert: KafkaConsumerLagHigh
expr: sum by (client_id) (kafka_consumer_fetch_manager_records_lag{topic="orders"}) > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka consumer lag is high"
description: "Consumer {{ $labels.client_id }} has lag {{ $value }} on topic orders for more than 5 minutes."
# Error rate exceeds 1 per second
- alert: KafkaProcessingErrorRateHigh
expr: rate(kafka_orders_failed_total[5m]) > 1
for: 2m
labels:
severity: critical
annotations:
summary: "Kafka order processing error rate is high"
description: "Error rate is {{ $value }}/s — check application logs for details."
# Consumer rebalance storm
- alert: KafkaRebalanceStorm
expr: kafka_consumer_coordinator_rebalance_rate_per_hour > 10
for: 10m
labels:
severity: warning
annotations:
summary: "Kafka consumer group is rebalancing excessively"
description: "Consumer {{ $labels.client_id }} is rebalancing {{ $value }} times/hour."
# Zero consumer throughput on a normally active topic
- alert: KafkaConsumerThroughputZero
expr: rate(kafka_consumer_fetch_manager_records_consumed_total{topic="orders"}[10m]) == 0
for: 10m
labels:
severity: critical
annotations:
summary: "Kafka consumer has stopped consuming"
description: "No records consumed from topic orders for 10 minutes."
Log Correlation with MDC and TraceId
When an alert fires, you need to trace the problem back to specific records. Use MDC (Mapped Diagnostic Context) to inject Kafka metadata into every log line produced during message processing:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class OrderConsumer {
private static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);
private final OrderService orderService;
public OrderConsumer(OrderService orderService) {
this.orderService = orderService;
}
@KafkaListener(topics = "orders", groupId = "order-service")
public void consume(ConsumerRecord<String, String> record) {
// Set MDC fields for log correlation
MDC.put("traceId", UUID.randomUUID().toString());
MDC.put("kafkaTopic", record.topic());
MDC.put("kafkaPartition", String.valueOf(record.partition()));
MDC.put("kafkaOffset", String.valueOf(record.offset()));
MDC.put("kafkaKey", record.key());
try {
log.info("Processing order");
orderService.process(record.value());
log.info("Order processed successfully");
} catch (Exception e) {
log.error("Failed to process order", e);
throw e;
} finally {
MDC.clear();
}
}
}
Configure the log pattern to include MDC fields:
# application.properties — include Kafka context in log lines
logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} [traceId=%X{traceId}, topic=%X{kafkaTopic}, partition=%X{kafkaPartition}, offset=%X{kafkaOffset}] - %msg%n
With this configuration every log line produced during Kafka message processing includes the topic, partition, offset, and a unique trace ID. When an alert fires for a high error rate, search logs by traceId to see the full processing context for any failed record.
If you use Spring Cloud Sleuth or Micrometer Tracing, the traceId is set automatically. You can skip the manual UUID.randomUUID() and let the tracing library propagate trace context from Kafka headers using a ProducerInterceptor and ConsumerInterceptor.
Putting It All Together
A production monitoring setup for Spring Kafka follows this pipeline:
- Spring Boot exposes Kafka and custom metrics via /actuator/prometheus.
- Prometheus scrapes the endpoint every 15 seconds.
- Grafana dashboards display consumer lag, throughput, error rates, and processing latency.
- Prometheus alert rules fire on sustained lag, error spikes, rebalance storms, and zero throughput.
- Alertmanager routes alerts to Slack, PagerDuty, or email based on severity labels.
- MDC-enriched logs let you correlate any alert back to specific Kafka records.