Contents

TermDefinition
Log-End Offset (LEO)The offset of the next message to be written to a partition. The "tip" of the log.
Current OffsetThe offset of the last message the consumer group has committed for a partition.
Consumer LagLEO − Current Offset per partition. Zero means fully caught up.
records-lag-maxJMX / Micrometer metric: maximum lag across all partitions assigned to a consumer instance.
Sum LagTotal lag across all partitions of a topic for a group — used for scaling decisions.

Spring Kafka automatically registers consumer metrics in Micrometer when spring-boot-starter-actuator is on the classpath. The key metric is kafka.consumer.records-lag-max.

# application.yml management: endpoints: web: exposure: include: health,info,prometheus metrics: tags: application: ${spring.application.name} // Programmatic lag gauge — exposes per-partition lag as a custom metric @Component public class LagMetricsPublisher { private final MeterRegistry registry; private final KafkaConsumer<?, ?> consumer; // injected or created // Call periodically or from a listener to publish current lag public void publishLag(Map<TopicPartition, Long> endOffsets) { for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) { TopicPartition tp = entry.getKey(); long committed = getCommittedOffset(tp); long lag = entry.getValue() - committed; Gauge.builder("kafka.consumer.partition.lag", () -> lag) .tag("topic", tp.topic()) .tag("partition", String.valueOf(tp.partition())) .tag("group", "my-group") .register(registry); } } }

Expose metrics at /actuator/prometheus and scrape with Prometheus:

# Check via curl curl http://localhost:8080/actuator/prometheus | grep kafka_consumer_records_lag # kafka_consumer_records_lag_max{client_id="...", group_id="payment-service", ...} 0.0

The JMX Exporter runs as a Java agent on each broker and exposes all Kafka JMX MBeans as Prometheus metrics. This gives broker-level and topic-level consumer lag without requiring application code changes.

# kafka-jmx-exporter.yml — minimal config for consumer group metrics lowercaseOutputName: true rules: # Consumer group lag per partition - pattern: "kafka.server<>Value" name: kafka_server_fetcher_lag labels: client_id: "$1" topic: "$2" partition: "$3" # Under-replicated partitions — broker health indicator - pattern: "kafka.server<>Value" name: kafka_server_under_replicated_partitions # Log end offset per topic/partition — used to compute lag externally - pattern: "kafka.log<>Value" name: kafka_log_log_end_offset labels: topic: "$1" partition: "$2" # docker-compose.yml broker snippet — attach JMX Exporter as Java agent services: kafka: image: confluentinc/cp-kafka:7.6.0 environment: KAFKA_JMX_PORT: 9999 KAFKA_JMX_HOSTNAME: kafka EXTRA_ARGS: "-javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=7072:/opt/jmx-exporter/kafka-jmx-exporter.yml" volumes: - ./jmx-exporter:/opt/jmx-exporter ports: - "7072:7072" # Prometheus scrape port

Kafka Exporter connects to the broker (not via JMX) and publishes consumer group lag per partition — the easiest way to get all groups and all topics in one place.

# docker-compose.yml services: kafka-exporter: image: danielqsj/kafka-exporter:latest command: - "--kafka.server=kafka:9092" - "--group.filter=.*" # monitor all consumer groups - "--topic.filter=.*" # monitor all topics ports: - "9308:9308"

Key metrics produced by Kafka Exporter:

MetricDescription
kafka_consumer_group_lagLag per consumer group, topic, partition
kafka_consumer_group_current_offsetCommitted offset per group/topic/partition
kafka_topic_partition_current_offsetLog-end offset per topic/partition
kafka_consumer_group_membersNumber of active consumers in a group
kafka_brokersNumber of brokers in the cluster

The YAML below shows the complete configuration for this feature. Adjust the values to match your environment.

# prometheus.yml scrape_configs: - job_name: kafka-exporter static_configs: - targets: ["kafka-exporter:9308"] scrape_interval: 15s - job_name: spring-app static_configs: - targets: ["payment-service:8080"] metrics_path: /actuator/prometheus scrape_interval: 15s # kafka-alerts.yml — Prometheus alerting rules groups: - name: kafka-consumer-lag rules: # Alert when any consumer group is more than 10k messages behind - alert: KafkaConsumerHighLag expr: kafka_consumer_group_lag > 10000 for: 5m labels: severity: warning annotations: summary: "Consumer group {{ $labels.consumergroup }} is lagging on {{ $labels.topic }}" description: "Lag is {{ $value }} messages on partition {{ $labels.partition }}" # Alert when lag is growing (not just high but increasing) - alert: KafkaConsumerLagGrowing expr: | rate(kafka_consumer_group_lag[10m]) > 100 for: 5m labels: severity: critical annotations: summary: "Consumer lag is growing for group {{ $labels.consumergroup }}" # Alert when consumer group has no active members (group is dead) - alert: KafkaConsumerGroupDead expr: kafka_consumer_group_members == 0 for: 2m labels: severity: critical annotations: summary: "Consumer group {{ $labels.consumergroup }} has no active members"

Key PromQL queries for a Kafka consumer lag dashboard:

# Total lag across all partitions for a consumer group (sum-lag) sum by (consumergroup, topic) (kafka_consumer_group_lag) # Lag per partition — heatmap panel kafka_consumer_group_lag{consumergroup="payment-service"} # Consumer throughput — messages consumed per second rate(kafka_consumer_group_current_offset[1m]) # Estimated time-to-catch-up (seconds) given current consume rate ( sum by (consumergroup, topic) (kafka_consumer_group_lag) ) / ( sum by (consumergroup, topic) (rate(kafka_consumer_group_current_offset[5m])) ) # Under-replicated partitions — cluster health kafka_server_under_replicated_partitions Import the community Kafka dashboard (Grafana ID 7589 — Kafka Exporter Overview) as a starting point, then add custom panels for your application-specific groups and SLA thresholds.

Use AdminClient to query consumer group lag from Java — useful for custom alerting, auto-scaling triggers, or exposing lag via a REST endpoint.

import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.util.*; import java.util.concurrent.ExecutionException; @Service public class LagQueryService { private final AdminClient adminClient; public Map<TopicPartition, Long> getGroupLag(String groupId) throws ExecutionException, InterruptedException { // 1. Get committed offsets for the group Map<TopicPartition, OffsetAndMetadata> committed = adminClient.listConsumerGroupOffsets(groupId) .partitionsToOffsetAndMetadata() .get(); // 2. Get end offsets for those partitions Map<TopicPartition, Long> endOffsets = adminClient.listOffsets( committed.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, e -> OffsetSpec.latest())) ).all().get() .entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, e -> e.getValue().offset())); // 3. Compute lag per partition Map<TopicPartition, Long> lag = new LinkedHashMap<>(); for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committed.entrySet()) { TopicPartition tp = entry.getKey(); long committedOffset = entry.getValue().offset(); long endOffset = endOffsets.getOrDefault(tp, committedOffset); lag.put(tp, Math.max(0, endOffset - committedOffset)); } return lag; } public long getTotalLag(String groupId) throws ExecutionException, InterruptedException { return getGroupLag(groupId).values().stream().mapToLong(Long::longValue).sum(); } } // Expose via REST — useful for KEDA or custom auto-scalers @RestController @RequestMapping("/internal/kafka") public class LagController { private final LagQueryService lagService; @GetMapping("/lag/{groupId}") public Map<String, Long> getLag(@PathVariable String groupId) throws Exception { return Map.of("totalLag", lagService.getTotalLag(groupId)); } }
# KEDA ScaledObject — scale consumer deployment based on Kafka lag apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: payment-consumer-scaler spec: scaleTargetRef: name: payment-service minReplicaCount: 1 maxReplicaCount: 10 # must not exceed partition count triggers: - type: kafka metadata: bootstrapServers: kafka:9092 consumerGroup: payment-service topic: payment.requested lagThreshold: "1000" # scale up when lag > 1000 per partition