Contents
- Key Broker Configuration Parameters
- Dynamic Config Changes (No Restart)
- AdminClient — Topic & Partition Management
- Partition Reassignment
- Preferred Leader Election
- Rolling Restart Procedure
- Log Retention & Compaction
- KRaft Mode vs ZooKeeper
| Parameter | Default | Recommendation |
| num.network.threads | 3 | Set to CPU cores / 2 for I/O-heavy brokers |
| num.io.threads | 8 | Match to number of disks × 2 |
| log.dirs | /tmp/kafka-logs | Use multiple disks — one path per disk |
| num.replica.fetchers | 1 | Increase to 4–8 during large reassignments |
| log.retention.hours | 168 (7 days) | Set per-topic with retention.ms |
| log.segment.bytes | 1 GB | Reduce to 256MB for faster log compaction |
| min.insync.replicas | 1 | Set to RF - 1 for durability (e.g., 2 for RF=3) |
| default.replication.factor | 1 | Set to 3 in production |
| auto.create.topics.enable | true | Set false in production — manage topics explicitly |
| unclean.leader.election.enable | false | Keep false — prevents data loss |
Many broker and topic configurations can be changed dynamically without restarting the broker, using the kafka-configs.sh CLI or AdminClient.
# Increase log retention for a specific topic (override cluster default)
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name payments \
--alter --add-config "retention.ms=604800000" # 7 days
# Reduce retention for high-volume topic
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name metrics.raw \
--alter --add-config "retention.ms=3600000" # 1 hour
# View current config for a topic
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name payments \
--describe
# Change broker-level config dynamically (no restart)
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type brokers --entity-name 1 \
--alter --add-config "log.cleaner.threads=4"
// Dynamic config change via AdminClient
try (AdminClient admin = AdminClient.create(Map.of(
"bootstrap.servers", "kafka:9092"))) {
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "payments");
ConfigEntry retentionEntry = new ConfigEntry("retention.ms", "604800000");
admin.alterConfigs(Map.of(
topicResource,
new Config(List.of(retentionEntry))
)).all().get();
}
The class below shows the implementation. Key points are highlighted in the inline comments.
@Component
public class KafkaAdminService {
private final AdminClient adminClient;
// Create a topic with specific settings
public void createTopic(String name, int partitions, short replicationFactor) throws Exception {
NewTopic topic = new NewTopic(name, partitions, replicationFactor)
.configs(Map.of(
"retention.ms", "604800000", // 7 days
"min.insync.replicas", "2",
"compression.type", "lz4"
));
adminClient.createTopics(List.of(topic)).all().get();
}
// Increase partition count (can only increase, never decrease)
public void increasePartitions(String topicName, int newPartitionCount) throws Exception {
adminClient.createPartitions(
Map.of(topicName, NewPartitions.increaseTo(newPartitionCount))
).all().get();
}
// List consumer groups and their state
public void listConsumerGroups() throws Exception {
adminClient.listConsumerGroups().all().get()
.forEach(group ->
System.out.println(group.groupId() + " — " + group.state().orElse(ConsumerGroupState.UNKNOWN)));
}
// Delete records up to a given offset (for GDPR / data cleanup)
public void deleteRecordsBefore(String topic, int partition, long beforeOffset) throws Exception {
TopicPartition tp = new TopicPartition(topic, partition);
adminClient.deleteRecords(
Map.of(tp, RecordsToDelete.beforeOffset(beforeOffset))
).all().get();
}
// Describe cluster — number of brokers and controller
public void describeCluster() throws Exception {
DescribeClusterResult cluster = adminClient.describeCluster();
System.out.println("Controller: " + cluster.controller().get().idString());
System.out.println("Brokers: " + cluster.nodes().get().size());
}
}
Move partitions between brokers — needed when adding brokers, decommissioning a broker, or rebalancing storage.
# 1. Generate a reassignment plan for topics listed in topics.json
kafka-reassign-partitions.sh \
--bootstrap-server kafka:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3,4" \ # target brokers (include new broker 4)
--generate > reassignment-plan.json
# topics.json
# { "topics": [{"topic": "payments"}, {"topic": "orders"}], "version": 1 }
# 2. Review the generated plan (reassignment-plan.json), then execute
kafka-reassign-partitions.sh \
--bootstrap-server kafka:9092 \
--reassignment-json-file reassignment-plan.json \
--throttle 50000000 \ # 50 MB/s — throttle replication I/O
--execute
# 3. Monitor progress
kafka-reassign-partitions.sh \
--bootstrap-server kafka:9092 \
--reassignment-json-file reassignment-plan.json \
--verify
# 4. Remove throttle after completion
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type brokers --entity-name 1 \
--alter --delete-config "leader.replication.throttled.rate,follower.replication.throttled.rate"
Always throttle reassignments in production with --throttle. An unthrottled reassignment can saturate your network and cause consumer lag or producer timeouts. 50–100 MB/s per broker is a safe starting point.
After a broker restart or reassignment, partitions may not be on their preferred (original) leader. Run a preferred leader election to restore the optimal distribution.
# Trigger preferred leader election for all partitions
kafka-leader-election.sh \
--bootstrap-server kafka:9092 \
--election-type PREFERRED \
--all-topic-partitions
# Or for a specific topic
kafka-leader-election.sh \
--bootstrap-server kafka:9092 \
--election-type PREFERRED \
--topic payments
// Via AdminClient — elect leaders for specific partitions
Set<TopicPartition> partitions = Set.of(
new TopicPartition("payments", 0),
new TopicPartition("payments", 1)
);
adminClient.electLeaders(ElectionType.PREFERRED, partitions).all().get();
Restart brokers one at a time without data loss. The key constraint: never restart more brokers simultaneously than the topic's min.insync.replicas allows.
- Ensure all topics have min.insync.replicas ≤ replication.factor - 1 (i.e., can tolerate one broker down).
- Wait for all partitions to be fully replicated — no under-replicated partitions.
- Set the broker as leader of as few partitions as possible before shutdown (optional — kafka-leader-election.sh).
- Issue a controlled shutdown: send SIGTERM and wait for the broker to cleanly transfer partition leadership before stopping.
- Restart the broker, wait for it to rejoin and fully sync all replicas.
- Verify no under-replicated partitions, then proceed to the next broker.
# Check for under-replicated partitions before proceeding
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --under-replicated-partitions
# Controlled shutdown — send SIGTERM (not SIGKILL!)
kill -15 $(cat /var/run/kafka/kafka.pid)
# Wait for broker to fully rejoin (watch under-replicated count drop to 0)
watch -n 5 'kafka-topics.sh --bootstrap-server kafka:9092 --describe --under-replicated-partitions | wc -l'
Run the following commands from the project root. Commands are shown in the order they should be executed.
# Configure compaction on a topic (keep only latest value per key)
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name user.preferences \
--alter --add-config "cleanup.policy=compact,min.cleanable.dirty.ratio=0.1,segment.ms=3600000"
# Configure delete + compact (compact then delete old segments)
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name orders \
--alter --add-config "cleanup.policy=compact,delete,retention.ms=2592000000" # 30 days
# Delete a topic
kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic old-events
# Increase log cleaner threads if compaction is falling behind
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type brokers --entity-name 1 \
--alter --add-config "log.cleaner.threads=4"
| Aspect | ZooKeeper Mode | KRaft Mode (Kafka 3.3+) |
| Metadata storage | ZooKeeper ensemble | Internal Kafka topic (__cluster_metadata) |
| Dependency | ZooKeeper cluster required | No ZooKeeper — self-contained |
| Controller | One elected Kafka broker | Dedicated controller nodes (quorum) |
| Partition limit | ~200k partitions | Millions of partitions |
| Recovery speed | Slower — ZK read on startup | Faster controller failover |
| Production ready | Stable (being deprecated) | GA since Kafka 3.3 |
# KRaft broker configuration (no ZooKeeper)
process.roles=broker,controller # combined mode for small clusters
node.id=1
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
controller.listener.names=CONTROLLER
log.dirs=/var/kafka/data