Contents

import org.apache.kafka.clients.consumer.*; import java.time.Duration; import java.util.*; Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // start from beginning if no offset KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(List.of("my-topic")); // subscribe to one or more topics try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic=%s partition=%d offset=%d key=%s value=%s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // auto-commit occurs in background (default: every 5 seconds) } } finally { consumer.close(); }

All consumers sharing the same group.id form a consumer group. Kafka assigns partitions such that each partition is consumed by exactly one consumer in the group.

Use multiple consumer groups to have the same events consumed independently by different services (e.g., one group for analytics, another for real-time notifications).

Each consumer tracks its position (offset) per partition. On restart, it resumes from the last committed offset.

StrategyConfig / APITrade-off
Auto-commitenable.auto.commit=true, auto.commit.interval.ms=5000Simple; may reprocess on crash (commits periodically, not after processing)
Manual sync commitconsumer.commitSync()Safe; blocks until commit succeeds; lower throughput
Manual async commitconsumer.commitAsync()Non-blocking; possible duplicate if async commit fails
Exact offset commitcommitSync(offsets)Precise control; commit after processing each batch or record
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // disable auto-commit KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(List.of("orders")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200)); for (ConsumerRecord<String, String> record : records) { processRecord(record); // business logic } // Commit all partitions' offsets after processing the batch consumer.commitSync(); // OR — async commit with callback consumer.commitAsync((offsets, exception) -> { if (exception != null) { log.error("Commit failed for offsets {}", offsets, exception); } }); } } catch (Exception e) { consumer.commitSync(); // try a final sync commit before closing } finally { consumer.close(); } Disabling auto-commit and committing after processing ensures at-least-once delivery (no message is lost, but may be reprocessed on crash). Make your consumer logic idempotent to handle reprocessing.

A rebalance occurs when a consumer joins/leaves the group, or when a consumer crashes (detected via session timeout). During rebalance, all consumption is paused and partitions are reassigned.

consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // Commit offsets before losing partition ownership consumer.commitSync(); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // Optionally seek to a specific offset } });
ConfigDefaultPurpose
group.idConsumer group identifier
auto.offset.resetlatestWhere to start when no offset: earliest/latest
enable.auto.committrueAuto-commit offsets periodically
auto.commit.interval.ms5000Auto-commit interval
max.poll.records500Max records returned per poll()
session.timeout.ms45000Consumer inactivity before eviction
heartbeat.interval.ms3000Frequency of heartbeats to group coordinator