Contents

Kafka sits in the critical path of most event-driven architectures. Without monitoring you are flying blind — problems surface only when downstream systems break or users complain. Three categories of metrics matter most:

What can go wrong without monitoring:

Kafka metrics are JMX-based under the hood. Spring Boot with Micrometer bridges these JMX MBeans into a unified metrics facade so you never have to deal with JMX directly. // A simple example: without monitoring, this silent failure goes unnoticed @Service public class OrderConsumer { @KafkaListener(topics = "orders", groupId = "order-service") public void consume(String payload) { try { orderService.process(payload); } catch (Exception e) { // Swallowed exception — no metric, no alert, no visibility log.error("Failed to process order", e); } } }

By the end of this guide every failure like the one above will increment a counter, appear on a dashboard, and fire an alert.

Spring Boot Actuator exposes application metrics through the /actuator/metrics endpoint. Micrometer acts as the metrics facade — add the Prometheus registry to export metrics in a format Prometheus can scrape.

<dependencies> <!-- Spring Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Actuator for /actuator endpoints --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- Micrometer Prometheus registry --> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> </dependencies>

Enable the Prometheus endpoint and Kafka metrics in application.properties:

# Expose health, metrics, and prometheus endpoints management.endpoints.web.exposure.include=health,metrics,prometheus # Enable Kafka consumer and producer metrics via Micrometer spring.kafka.consumer.properties[metrics.recording.level]=INFO spring.kafka.producer.properties[metrics.recording.level]=INFO # Kafka connection spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=order-service spring.kafka.consumer.auto-offset-reset=earliest Setting metrics.recording.level to DEBUG captures per-broker and per-topic metrics. This generates a large number of time series — use INFO in production unless you need fine-grained breakdowns.

Once the application starts, verify that Kafka metrics are available:

# Check available Kafka metrics curl http://localhost:8080/actuator/metrics | jq '.names[]' | grep kafka # Fetch a specific metric curl http://localhost:8080/actuator/metrics/kafka.consumer.fetch.manager.records.consumed.rate # Prometheus format (for scraping) curl http://localhost:8080/actuator/prometheus | grep kafka_consumer

Spring Boot auto-registers Kafka client metrics with Micrometer when Actuator is on the classpath. Here are the key metrics grouped by role.

Consumer Metrics
MetricDescription
kafka.consumer.fetch.manager.records.consumed.rateRecords consumed per second across all topics
kafka.consumer.fetch.manager.records.lagNumber of records the consumer is behind the broker on a partition
kafka.consumer.fetch.manager.records.lag.maxMaximum lag across all assigned partitions
kafka.consumer.fetch.manager.fetch.latency.avgAverage time for a fetch request round-trip
kafka.consumer.coordinator.commit.rateOffset commits per second
kafka.consumer.coordinator.rebalance.rate.per.hourConsumer rebalances per hour — high values indicate instability
Producer Metrics
MetricDescription
kafka.producer.record.send.rateRecords sent per second
kafka.producer.record.error.rateRecords that failed to send per second
kafka.producer.request.latency.avgAverage latency for produce requests
kafka.producer.batch.size.avgAverage number of bytes per batch
kafka.producer.buffer.available.bytesAvailable send buffer — drops to zero under backpressure
import io.micrometer.core.instrument.MeterRegistry; import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MetricsConfig { // Add common tags to every metric for easier filtering in Grafana @Bean public MeterRegistryCustomizer<MeterRegistry> commonTags() { return registry -> registry.config() .commonTags("application", "order-service") .commonTags("environment", "production"); } } Common tags like application and environment are attached to every metric. This lets you filter dashboards by service or environment without changing individual metric registrations.

Consumer lag is the single most important Kafka metric. It measures how far behind a consumer group is from the latest produced offset. A lag of zero means the consumer is fully caught up. Growing lag means the consumer cannot process records fast enough — eventually records may fall off the retention window and be lost.

The built-in metric kafka.consumer.fetch.manager.records.lag reports per-partition lag. Spring Boot tags it with client.id and topic so you can drill down.

# Query lag for a specific topic via Actuator curl "http://localhost:8080/actuator/metrics/kafka.consumer.fetch.manager.records.lag?tag=topic:orders"

For more control, implement a ConsumerAwareRebalanceListener to track which partitions are assigned to each consumer instance:

import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.stereotype.Component; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; import java.util.Collection; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Component public class LagTrackingRebalanceListener implements ConsumerAwareRebalanceListener { private final MeterRegistry meterRegistry; private final Set<TopicPartition> assignedPartitions = ConcurrentHashMap.newKeySet(); public LagTrackingRebalanceListener(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; } @Override public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { assignedPartitions.addAll(partitions); partitions.forEach(tp -> meterRegistry.gauge("kafka.consumer.partition.assigned", Tags.of("topic", tp.topic(), "partition", String.valueOf(tp.partition())), 1) ); log.info("Partitions assigned: {}", partitions); } @Override public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { assignedPartitions.removeAll(partitions); log.warn("Partitions revoked: {}", partitions); } public Set<TopicPartition> getAssignedPartitions() { return Set.copyOf(assignedPartitions); } }

Register the listener with your container factory:

import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; @Configuration public class KafkaConsumerConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> consumerFactory, LagTrackingRebalanceListener rebalanceListener) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.getContainerProperties().setConsumerRebalanceListener(rebalanceListener); return factory; } } Consumer lag reported by the Kafka client is based on the last fetch response. If a consumer is paused or stuck, the lag metric may not update. Use an external tool like Burrow or the Kafka Admin API for broker-side lag checks in critical systems.

Built-in metrics cover Kafka client internals. Application-level metrics — how many orders were processed, how many failed, how long processing took — require custom instrumentation. Inject MeterRegistry and create counters, timers, and gauges.

import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class OrderConsumer { private final OrderService orderService; private final Counter processedCounter; private final Counter failedCounter; private final Timer processingTimer; public OrderConsumer(OrderService orderService, MeterRegistry meterRegistry) { this.orderService = orderService; this.processedCounter = Counter.builder("kafka.orders.processed") .description("Total orders successfully processed") .tag("topic", "orders") .register(meterRegistry); this.failedCounter = Counter.builder("kafka.orders.failed") .description("Total orders that failed processing") .tag("topic", "orders") .register(meterRegistry); this.processingTimer = Timer.builder("kafka.orders.processing.duration") .description("Time taken to process each order") .tag("topic", "orders") .publishPercentileHistogram() .register(meterRegistry); } @KafkaListener(topics = "orders", groupId = "order-service") public void consume(String payload) { processingTimer.record(() -> { try { orderService.process(payload); processedCounter.increment(); } catch (Exception e) { failedCounter.increment(); throw e; // Let the error handler deal with retries } }); } }

For a gauge that tracks a dynamic value such as an internal queue depth:

import io.micrometer.core.instrument.MeterRegistry; import org.springframework.stereotype.Component; import jakarta.annotation.PostConstruct; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @Component public class OrderBuffer { private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10_000); private final MeterRegistry meterRegistry; public OrderBuffer(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; } @PostConstruct public void registerMetrics() { // Gauge automatically reads queue.size() on each scrape meterRegistry.gauge("kafka.orders.buffer.size", queue, BlockingQueue::size); meterRegistry.gauge("kafka.orders.buffer.remaining.capacity", queue, BlockingQueue::remainingCapacity); } public void enqueue(String order) { queue.offer(order); } public String dequeue() throws InterruptedException { return queue.take(); } } Use publishPercentileHistogram() on timers to get p50, p95, and p99 latency buckets in Prometheus. This adds histogram buckets to the metric but increases cardinality — use it selectively on high-value metrics.

With Micrometer exporting metrics in Prometheus format at /actuator/prometheus, configure Prometheus to scrape your application.

# prometheus.yml — scrape config for Spring Boot app scrape_configs: - job_name: 'order-service' metrics_path: '/actuator/prometheus' scrape_interval: 15s static_configs: - targets: ['order-service:8080'] labels: environment: 'production'
Useful PromQL Queries

These queries power most Kafka dashboards. Copy them into Grafana panel queries.

Consumer lag per partition:

kafka_consumer_fetch_manager_records_lag{topic="orders"}

Total consumer lag across all partitions (sum by consumer group):

sum by (client_id) (kafka_consumer_fetch_manager_records_lag{topic="orders"})

Records consumed per second (rate over 5 minutes):

rate(kafka_consumer_fetch_manager_records_consumed_total{topic="orders"}[5m])

Order processing failure rate:

rate(kafka_orders_failed_total{topic="orders"}[5m])

P99 processing latency:

histogram_quantile(0.99, rate(kafka_orders_processing_duration_seconds_bucket{topic="orders"}[5m]))

Producer error rate:

rate(kafka_producer_record_error_total[5m])
Sample Grafana Panel — Consumer Lag

Import this JSON into a Grafana dashboard to create a consumer lag time-series panel:

{ "type": "timeseries", "title": "Kafka Consumer Lag — orders", "datasource": "Prometheus", "targets": [ { "expr": "sum by (client_id, partition) (kafka_consumer_fetch_manager_records_lag{topic=\"orders\"})", "legendFormat": "{{client_id}} — partition {{partition}}" } ], "fieldConfig": { "defaults": { "unit": "short", "thresholds": { "steps": [ { "value": 0, "color": "green" }, { "value": 1000, "color": "yellow" }, { "value": 10000, "color": "red" } ] } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "table", "placement": "bottom" } } } Set threshold colors in Grafana to highlight lag severity — green below 1,000, yellow up to 10,000, red above. Adjust thresholds based on your throughput and SLA requirements.

Spring Boot Actuator includes a health endpoint at /actuator/health. When spring-kafka is on the classpath, Spring Boot automatically registers a KafkaHealthIndicator that checks broker connectivity.

# Show full health details (UP/DOWN with component breakdown) management.endpoint.health.show-details=always # Include Kafka health indicator management.health.kafka.enabled=true

The default health indicator verifies that the application can describe the Kafka cluster. For deeper checks — like verifying that a consumer group is active and partitions are assigned — create a custom health indicator:

import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; import org.apache.kafka.common.ConsumerGroupState; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.HealthIndicator; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.stereotype.Component; import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; @Component("kafkaConsumerGroupHealth") public class KafkaConsumerGroupHealthIndicator implements HealthIndicator { private final KafkaAdmin kafkaAdmin; private final String consumerGroup = "order-service"; public KafkaConsumerGroupHealthIndicator(KafkaAdmin kafkaAdmin) { this.kafkaAdmin = kafkaAdmin; } @Override public Health health() { try (AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties())) { DescribeConsumerGroupsResult result = client.describeConsumerGroups(Collections.singletonList(consumerGroup)); Map<String, ConsumerGroupDescription> groups = result.all().get(5, TimeUnit.SECONDS); ConsumerGroupDescription group = groups.get(consumerGroup); if (group == null) { return Health.down() .withDetail("consumerGroup", consumerGroup) .withDetail("reason", "Consumer group not found") .build(); } ConsumerGroupState state = group.state(); int memberCount = group.members().size(); if (state == ConsumerGroupState.STABLE && memberCount > 0) { return Health.up() .withDetail("consumerGroup", consumerGroup) .withDetail("state", state.toString()) .withDetail("members", memberCount) .build(); } return Health.down() .withDetail("consumerGroup", consumerGroup) .withDetail("state", state.toString()) .withDetail("members", memberCount) .build(); } catch (Exception e) { return Health.down(e) .withDetail("consumerGroup", consumerGroup) .build(); } } }

A custom health check for broker connectivity that validates topic metadata exists:

import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.common.Node; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.HealthIndicator; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.stereotype.Component; import java.util.Collection; import java.util.concurrent.TimeUnit; @Component("kafkaBrokerHealth") public class KafkaBrokerHealthIndicator implements HealthIndicator { private final KafkaAdmin kafkaAdmin; private final int expectedBrokers; public KafkaBrokerHealthIndicator(KafkaAdmin kafkaAdmin) { this.kafkaAdmin = kafkaAdmin; this.expectedBrokers = 3; // Adjust for your cluster } @Override public Health health() { try (AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties())) { DescribeClusterResult cluster = client.describeCluster(); Collection<Node> nodes = cluster.nodes().get(5, TimeUnit.SECONDS); String clusterId = cluster.clusterId().get(5, TimeUnit.SECONDS); Health.Builder builder = (nodes.size() >= expectedBrokers) ? Health.up() : Health.down().withDetail("reason", "Expected " + expectedBrokers + " brokers but found " + nodes.size()); return builder .withDetail("clusterId", clusterId) .withDetail("brokerCount", nodes.size()) .withDetail("brokers", nodes.stream() .map(n -> n.host() + ":" + n.port()) .toList()) .build(); } catch (Exception e) { return Health.down(e).build(); } } } Custom health indicators that open an AdminClient on every health check can be expensive. Use the management.endpoint.health.cache.time-to-live property (e.g., 10s) to cache the health result and avoid hammering the Kafka cluster. # Cache health check results for 10 seconds management.endpoint.health.cache.time-to-live=10s

Dashboards show what is happening; alerts tell you when something needs attention. Focus alerts on actionable conditions and avoid noise.

What to Alert On
Prometheus Alert Rules
# prometheus-alerts.yml groups: - name: kafka-alerts rules: # Consumer lag too high for more than 5 minutes - alert: KafkaConsumerLagHigh expr: sum by (client_id) (kafka_consumer_fetch_manager_records_lag{topic="orders"}) > 10000 for: 5m labels: severity: warning annotations: summary: "Kafka consumer lag is high" description: "Consumer {{ $labels.client_id }} has lag {{ $value }} on topic orders for more than 5 minutes." # Error rate exceeds 1 per second - alert: KafkaProcessingErrorRateHigh expr: rate(kafka_orders_failed_total[5m]) > 1 for: 2m labels: severity: critical annotations: summary: "Kafka order processing error rate is high" description: "Error rate is {{ $value }}/s — check application logs for details." # Consumer rebalance storm - alert: KafkaRebalanceStorm expr: kafka_consumer_coordinator_rebalance_rate_per_hour > 10 for: 10m labels: severity: warning annotations: summary: "Kafka consumer group is rebalancing excessively" description: "Consumer {{ $labels.client_id }} is rebalancing {{ $value }} times/hour." # Zero consumer throughput on a normally active topic - alert: KafkaConsumerThroughputZero expr: rate(kafka_consumer_fetch_manager_records_consumed_total{topic="orders"}[10m]) == 0 for: 10m labels: severity: critical annotations: summary: "Kafka consumer has stopped consuming" description: "No records consumed from topic orders for 10 minutes."
Log Correlation with MDC and TraceId

When an alert fires, you need to trace the problem back to specific records. Use MDC (Mapped Diagnostic Context) to inject Kafka metadata into every log line produced during message processing:

import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import java.util.UUID; @Service public class OrderConsumer { private static final Logger log = LoggerFactory.getLogger(OrderConsumer.class); private final OrderService orderService; public OrderConsumer(OrderService orderService) { this.orderService = orderService; } @KafkaListener(topics = "orders", groupId = "order-service") public void consume(ConsumerRecord<String, String> record) { // Set MDC fields for log correlation MDC.put("traceId", UUID.randomUUID().toString()); MDC.put("kafkaTopic", record.topic()); MDC.put("kafkaPartition", String.valueOf(record.partition())); MDC.put("kafkaOffset", String.valueOf(record.offset())); MDC.put("kafkaKey", record.key()); try { log.info("Processing order"); orderService.process(record.value()); log.info("Order processed successfully"); } catch (Exception e) { log.error("Failed to process order", e); throw e; } finally { MDC.clear(); } } }

Configure the log pattern to include MDC fields:

# application.properties — include Kafka context in log lines logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} [traceId=%X{traceId}, topic=%X{kafkaTopic}, partition=%X{kafkaPartition}, offset=%X{kafkaOffset}] - %msg%n

With this configuration every log line produced during Kafka message processing includes the topic, partition, offset, and a unique trace ID. When an alert fires for a high error rate, search logs by traceId to see the full processing context for any failed record.

If you use Spring Cloud Sleuth or Micrometer Tracing, the traceId is set automatically. You can skip the manual UUID.randomUUID() and let the tracing library propagate trace context from Kafka headers using a ProducerInterceptor and ConsumerInterceptor.
Putting It All Together

A production monitoring setup for Spring Kafka follows this pipeline:

  1. Spring Boot exposes Kafka and custom metrics via /actuator/prometheus.
  2. Prometheus scrapes the endpoint every 15 seconds.
  3. Grafana dashboards display consumer lag, throughput, error rates, and processing latency.
  4. Prometheus alert rules fire on sustained lag, error spikes, rebalance storms, and zero throughput.
  5. Alertmanager routes alerts to Slack, PagerDuty, or email based on severity labels.
  6. MDC-enriched logs let you correlate any alert back to specific Kafka records.