Contents
- Lag Concepts — Offset, Log-End, Lag
- Consumer-Side Metrics (Micrometer)
- Broker JMX Exporter
- Kafka Exporter (Cluster-Wide Lag)
- Prometheus Scrape Config & Alert Rules
- Grafana Dashboard Queries
- Programmatic Lag with AdminClient
- Lag Remediation Strategies
| Term | Definition |
| Log-End Offset (LEO) | The offset of the next message to be written to a partition. The "tip" of the log. |
| Current Offset | The offset of the last message the consumer group has committed for a partition. |
| Consumer Lag | LEO − Current Offset per partition. Zero means fully caught up. |
| records-lag-max | JMX / Micrometer metric: maximum lag across all partitions assigned to a consumer instance. |
| Sum Lag | Total lag across all partitions of a topic for a group — used for scaling decisions. |
Spring Kafka automatically registers consumer metrics in Micrometer when spring-boot-starter-actuator is on the classpath. The key metric is kafka.consumer.records-lag-max.
# application.yml
management:
endpoints:
web:
exposure:
include: health,info,prometheus
metrics:
tags:
application: ${spring.application.name}
// Programmatic lag gauge — exposes per-partition lag as a custom metric
@Component
public class LagMetricsPublisher {
private final MeterRegistry registry;
private final KafkaConsumer<?, ?> consumer; // injected or created
// Call periodically or from a listener to publish current lag
public void publishLag(Map<TopicPartition, Long> endOffsets) {
for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
TopicPartition tp = entry.getKey();
long committed = getCommittedOffset(tp);
long lag = entry.getValue() - committed;
Gauge.builder("kafka.consumer.partition.lag", () -> lag)
.tag("topic", tp.topic())
.tag("partition", String.valueOf(tp.partition()))
.tag("group", "my-group")
.register(registry);
}
}
}
Expose metrics at /actuator/prometheus and scrape with Prometheus:
# Check via curl
curl http://localhost:8080/actuator/prometheus | grep kafka_consumer_records_lag
# kafka_consumer_records_lag_max{client_id="...", group_id="payment-service", ...} 0.0
The JMX Exporter runs as a Java agent on each broker and exposes all Kafka JMX MBeans as Prometheus metrics. This gives broker-level and topic-level consumer lag without requiring application code changes.
# kafka-jmx-exporter.yml — minimal config for consumer group metrics
lowercaseOutputName: true
rules:
# Consumer group lag per partition
- pattern: "kafka.server<>Value"
name: kafka_server_fetcher_lag
labels:
client_id: "$1"
topic: "$2"
partition: "$3"
# Under-replicated partitions — broker health indicator
- pattern: "kafka.server<>Value"
name: kafka_server_under_replicated_partitions
# Log end offset per topic/partition — used to compute lag externally
- pattern: "kafka.log<>Value"
name: kafka_log_log_end_offset
labels:
topic: "$1"
partition: "$2"
# docker-compose.yml broker snippet — attach JMX Exporter as Java agent
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
environment:
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: kafka
EXTRA_ARGS: "-javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=7072:/opt/jmx-exporter/kafka-jmx-exporter.yml"
volumes:
- ./jmx-exporter:/opt/jmx-exporter
ports:
- "7072:7072" # Prometheus scrape port
Kafka Exporter connects to the broker (not via JMX) and publishes consumer group lag per partition — the easiest way to get all groups and all topics in one place.
# docker-compose.yml
services:
kafka-exporter:
image: danielqsj/kafka-exporter:latest
command:
- "--kafka.server=kafka:9092"
- "--group.filter=.*" # monitor all consumer groups
- "--topic.filter=.*" # monitor all topics
ports:
- "9308:9308"
Key metrics produced by Kafka Exporter:
| Metric | Description |
| kafka_consumer_group_lag | Lag per consumer group, topic, partition |
| kafka_consumer_group_current_offset | Committed offset per group/topic/partition |
| kafka_topic_partition_current_offset | Log-end offset per topic/partition |
| kafka_consumer_group_members | Number of active consumers in a group |
| kafka_brokers | Number of brokers in the cluster |
The YAML below shows the complete configuration for this feature. Adjust the values to match your environment.
# prometheus.yml
scrape_configs:
- job_name: kafka-exporter
static_configs:
- targets: ["kafka-exporter:9308"]
scrape_interval: 15s
- job_name: spring-app
static_configs:
- targets: ["payment-service:8080"]
metrics_path: /actuator/prometheus
scrape_interval: 15s
# kafka-alerts.yml — Prometheus alerting rules
groups:
- name: kafka-consumer-lag
rules:
# Alert when any consumer group is more than 10k messages behind
- alert: KafkaConsumerHighLag
expr: kafka_consumer_group_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer group {{ $labels.consumergroup }} is lagging on {{ $labels.topic }}"
description: "Lag is {{ $value }} messages on partition {{ $labels.partition }}"
# Alert when lag is growing (not just high but increasing)
- alert: KafkaConsumerLagGrowing
expr: |
rate(kafka_consumer_group_lag[10m]) > 100
for: 5m
labels:
severity: critical
annotations:
summary: "Consumer lag is growing for group {{ $labels.consumergroup }}"
# Alert when consumer group has no active members (group is dead)
- alert: KafkaConsumerGroupDead
expr: kafka_consumer_group_members == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Consumer group {{ $labels.consumergroup }} has no active members"
Key PromQL queries for a Kafka consumer lag dashboard:
# Total lag across all partitions for a consumer group (sum-lag)
sum by (consumergroup, topic) (kafka_consumer_group_lag)
# Lag per partition — heatmap panel
kafka_consumer_group_lag{consumergroup="payment-service"}
# Consumer throughput — messages consumed per second
rate(kafka_consumer_group_current_offset[1m])
# Estimated time-to-catch-up (seconds) given current consume rate
(
sum by (consumergroup, topic) (kafka_consumer_group_lag)
) /
(
sum by (consumergroup, topic) (rate(kafka_consumer_group_current_offset[5m]))
)
# Under-replicated partitions — cluster health
kafka_server_under_replicated_partitions
Import the community Kafka dashboard (Grafana ID 7589 — Kafka Exporter Overview) as a starting point, then add custom panels for your application-specific groups and SLA thresholds.
Use AdminClient to query consumer group lag from Java — useful for custom alerting, auto-scaling triggers, or exposing lag via a REST endpoint.
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
import java.util.concurrent.ExecutionException;
@Service
public class LagQueryService {
private final AdminClient adminClient;
public Map<TopicPartition, Long> getGroupLag(String groupId) throws ExecutionException, InterruptedException {
// 1. Get committed offsets for the group
Map<TopicPartition, OffsetAndMetadata> committed =
adminClient.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get();
// 2. Get end offsets for those partitions
Map<TopicPartition, Long> endOffsets =
adminClient.listOffsets(
committed.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> OffsetSpec.latest()))
).all().get()
.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().offset()));
// 3. Compute lag per partition
Map<TopicPartition, Long> lag = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committed.entrySet()) {
TopicPartition tp = entry.getKey();
long committedOffset = entry.getValue().offset();
long endOffset = endOffsets.getOrDefault(tp, committedOffset);
lag.put(tp, Math.max(0, endOffset - committedOffset));
}
return lag;
}
public long getTotalLag(String groupId) throws ExecutionException, InterruptedException {
return getGroupLag(groupId).values().stream().mapToLong(Long::longValue).sum();
}
}
// Expose via REST — useful for KEDA or custom auto-scalers
@RestController
@RequestMapping("/internal/kafka")
public class LagController {
private final LagQueryService lagService;
@GetMapping("/lag/{groupId}")
public Map<String, Long> getLag(@PathVariable String groupId) throws Exception {
return Map.of("totalLag", lagService.getTotalLag(groupId));
}
}
- Scale out consumers — add more consumer instances (up to the number of partitions). If fully scaled, increase partition count.
- Increase concurrency — set spring.kafka.listener.concurrency to process multiple partitions per instance in parallel.
- Optimise consumer processing — make handlers async, batch downstream writes, eliminate blocking I/O inside listener methods.
- Batch consumption — use @KafkaListener(batch = "true") to process a list of records per poll, reducing per-message overhead.
- Increase max.poll.records — default is 500. Increase if processing is fast and fetch size is the bottleneck.
- KEDA auto-scaling — use KEDA's Kafka scaler in Kubernetes to automatically add consumer pods when lag exceeds a threshold.
# KEDA ScaledObject — scale consumer deployment based on Kafka lag
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: payment-consumer-scaler
spec:
scaleTargetRef:
name: payment-service
minReplicaCount: 1
maxReplicaCount: 10 # must not exceed partition count
triggers:
- type: kafka
metadata:
bootstrapServers: kafka:9092
consumerGroup: payment-service
topic: payment.requested
lagThreshold: "1000" # scale up when lag > 1000 per partition