Contents
- Basic Consumer Example
- Consumer Groups and Partition Assignment
- Offsets and Commit Strategies
- Manual Offset Commit
- Rebalancing
- Key Configuration Summary
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // start from beginning if no offset
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("my-topic")); // subscribe to one or more topics
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s partition=%d offset=%d key=%s value=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
// auto-commit occurs in background (default: every 5 seconds)
}
} finally {
consumer.close();
}
All consumers sharing the same group.id form a consumer group.
Kafka assigns partitions such that each partition is consumed by exactly one consumer in the group.
- 3 partitions, 1 consumer → consumer reads all 3 partitions.
- 3 partitions, 3 consumers → each consumer reads 1 partition (full parallelism).
- 3 partitions, 4 consumers → one consumer is idle (can't have more consumers than partitions in one group).
- Different group.id → independent consumption; each group reads all messages (broadcast).
Use multiple consumer groups to have the same events consumed independently by different services
(e.g., one group for analytics, another for real-time notifications).
Each consumer tracks its position (offset) per partition. On restart, it resumes from the last committed offset.
| Strategy | Config / API | Trade-off |
| Auto-commit | enable.auto.commit=true, auto.commit.interval.ms=5000 | Simple; may reprocess on crash (commits periodically, not after processing) |
| Manual sync commit | consumer.commitSync() | Safe; blocks until commit succeeds; lower throughput |
| Manual async commit | consumer.commitAsync() | Non-blocking; possible duplicate if async commit fails |
| Exact offset commit | commitSync(offsets) | Precise control; commit after processing each batch or record |
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // disable auto-commit
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // business logic
}
// Commit all partitions' offsets after processing the batch
consumer.commitSync();
// OR — async commit with callback
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed for offsets {}", offsets, exception);
}
});
}
} catch (Exception e) {
consumer.commitSync(); // try a final sync commit before closing
} finally {
consumer.close();
}
Disabling auto-commit and committing after processing ensures at-least-once delivery (no message is lost, but may be reprocessed on crash). Make your consumer logic idempotent to handle reprocessing.
A rebalance occurs when a consumer joins/leaves the group, or when a consumer crashes
(detected via session timeout). During rebalance, all consumption is paused and partitions are
reassigned.
- Commit offsets before rebalance using ConsumerRebalanceListener.onPartitionsRevoked().
- Tune session.timeout.ms (how long before Kafka considers a consumer dead) and heartbeat.interval.ms.
- max.poll.interval.ms — if poll() is not called within this time, consumer is kicked out of the group.
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Commit offsets before losing partition ownership
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Optionally seek to a specific offset
}
});
| Config | Default | Purpose |
| group.id | — | Consumer group identifier |
| auto.offset.reset | latest | Where to start when no offset: earliest/latest |
| enable.auto.commit | true | Auto-commit offsets periodically |
| auto.commit.interval.ms | 5000 | Auto-commit interval |
| max.poll.records | 500 | Max records returned per poll() |
| session.timeout.ms | 45000 | Consumer inactivity before eviction |
| heartbeat.interval.ms | 3000 | Frequency of heartbeats to group coordinator |