Contents

ParameterDefaultRecommendation
num.network.threads3Set to CPU cores / 2 for I/O-heavy brokers
num.io.threads8Match to number of disks × 2
log.dirs/tmp/kafka-logsUse multiple disks — one path per disk
num.replica.fetchers1Increase to 4–8 during large reassignments
log.retention.hours168 (7 days)Set per-topic with retention.ms
log.segment.bytes1 GBReduce to 256MB for faster log compaction
min.insync.replicas1Set to RF - 1 for durability (e.g., 2 for RF=3)
default.replication.factor1Set to 3 in production
auto.create.topics.enabletrueSet false in production — manage topics explicitly
unclean.leader.election.enablefalseKeep false — prevents data loss

Many broker and topic configurations can be changed dynamically without restarting the broker, using the kafka-configs.sh CLI or AdminClient.

# Increase log retention for a specific topic (override cluster default) kafka-configs.sh --bootstrap-server kafka:9092 \ --entity-type topics --entity-name payments \ --alter --add-config "retention.ms=604800000" # 7 days # Reduce retention for high-volume topic kafka-configs.sh --bootstrap-server kafka:9092 \ --entity-type topics --entity-name metrics.raw \ --alter --add-config "retention.ms=3600000" # 1 hour # View current config for a topic kafka-configs.sh --bootstrap-server kafka:9092 \ --entity-type topics --entity-name payments \ --describe # Change broker-level config dynamically (no restart) kafka-configs.sh --bootstrap-server kafka:9092 \ --entity-type brokers --entity-name 1 \ --alter --add-config "log.cleaner.threads=4" // Dynamic config change via AdminClient try (AdminClient admin = AdminClient.create(Map.of( "bootstrap.servers", "kafka:9092"))) { ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "payments"); ConfigEntry retentionEntry = new ConfigEntry("retention.ms", "604800000"); admin.alterConfigs(Map.of( topicResource, new Config(List.of(retentionEntry)) )).all().get(); }

The class below shows the implementation. Key points are highlighted in the inline comments.

@Component public class KafkaAdminService { private final AdminClient adminClient; // Create a topic with specific settings public void createTopic(String name, int partitions, short replicationFactor) throws Exception { NewTopic topic = new NewTopic(name, partitions, replicationFactor) .configs(Map.of( "retention.ms", "604800000", // 7 days "min.insync.replicas", "2", "compression.type", "lz4" )); adminClient.createTopics(List.of(topic)).all().get(); } // Increase partition count (can only increase, never decrease) public void increasePartitions(String topicName, int newPartitionCount) throws Exception { adminClient.createPartitions( Map.of(topicName, NewPartitions.increaseTo(newPartitionCount)) ).all().get(); } // List consumer groups and their state public void listConsumerGroups() throws Exception { adminClient.listConsumerGroups().all().get() .forEach(group -> System.out.println(group.groupId() + " — " + group.state().orElse(ConsumerGroupState.UNKNOWN))); } // Delete records up to a given offset (for GDPR / data cleanup) public void deleteRecordsBefore(String topic, int partition, long beforeOffset) throws Exception { TopicPartition tp = new TopicPartition(topic, partition); adminClient.deleteRecords( Map.of(tp, RecordsToDelete.beforeOffset(beforeOffset)) ).all().get(); } // Describe cluster — number of brokers and controller public void describeCluster() throws Exception { DescribeClusterResult cluster = adminClient.describeCluster(); System.out.println("Controller: " + cluster.controller().get().idString()); System.out.println("Brokers: " + cluster.nodes().get().size()); } }

Move partitions between brokers — needed when adding brokers, decommissioning a broker, or rebalancing storage.

# 1. Generate a reassignment plan for topics listed in topics.json kafka-reassign-partitions.sh \ --bootstrap-server kafka:9092 \ --topics-to-move-json-file topics.json \ --broker-list "1,2,3,4" \ # target brokers (include new broker 4) --generate > reassignment-plan.json # topics.json # { "topics": [{"topic": "payments"}, {"topic": "orders"}], "version": 1 } # 2. Review the generated plan (reassignment-plan.json), then execute kafka-reassign-partitions.sh \ --bootstrap-server kafka:9092 \ --reassignment-json-file reassignment-plan.json \ --throttle 50000000 \ # 50 MB/s — throttle replication I/O --execute # 3. Monitor progress kafka-reassign-partitions.sh \ --bootstrap-server kafka:9092 \ --reassignment-json-file reassignment-plan.json \ --verify # 4. Remove throttle after completion kafka-configs.sh --bootstrap-server kafka:9092 \ --entity-type brokers --entity-name 1 \ --alter --delete-config "leader.replication.throttled.rate,follower.replication.throttled.rate" Always throttle reassignments in production with --throttle. An unthrottled reassignment can saturate your network and cause consumer lag or producer timeouts. 50–100 MB/s per broker is a safe starting point.

After a broker restart or reassignment, partitions may not be on their preferred (original) leader. Run a preferred leader election to restore the optimal distribution.

# Trigger preferred leader election for all partitions kafka-leader-election.sh \ --bootstrap-server kafka:9092 \ --election-type PREFERRED \ --all-topic-partitions # Or for a specific topic kafka-leader-election.sh \ --bootstrap-server kafka:9092 \ --election-type PREFERRED \ --topic payments // Via AdminClient — elect leaders for specific partitions Set<TopicPartition> partitions = Set.of( new TopicPartition("payments", 0), new TopicPartition("payments", 1) ); adminClient.electLeaders(ElectionType.PREFERRED, partitions).all().get();

Restart brokers one at a time without data loss. The key constraint: never restart more brokers simultaneously than the topic's min.insync.replicas allows.

  1. Ensure all topics have min.insync.replicas ≤ replication.factor - 1 (i.e., can tolerate one broker down).
  2. Wait for all partitions to be fully replicated — no under-replicated partitions.
  3. Set the broker as leader of as few partitions as possible before shutdown (optional — kafka-leader-election.sh).
  4. Issue a controlled shutdown: send SIGTERM and wait for the broker to cleanly transfer partition leadership before stopping.
  5. Restart the broker, wait for it to rejoin and fully sync all replicas.
  6. Verify no under-replicated partitions, then proceed to the next broker.
# Check for under-replicated partitions before proceeding kafka-topics.sh --bootstrap-server kafka:9092 \ --describe --under-replicated-partitions # Controlled shutdown — send SIGTERM (not SIGKILL!) kill -15 $(cat /var/run/kafka/kafka.pid) # Wait for broker to fully rejoin (watch under-replicated count drop to 0) watch -n 5 'kafka-topics.sh --bootstrap-server kafka:9092 --describe --under-replicated-partitions | wc -l'

Run the following commands from the project root. Commands are shown in the order they should be executed.

# Configure compaction on a topic (keep only latest value per key) kafka-configs.sh --bootstrap-server kafka:9092 \ --entity-type topics --entity-name user.preferences \ --alter --add-config "cleanup.policy=compact,min.cleanable.dirty.ratio=0.1,segment.ms=3600000" # Configure delete + compact (compact then delete old segments) kafka-configs.sh --bootstrap-server kafka:9092 \ --entity-type topics --entity-name orders \ --alter --add-config "cleanup.policy=compact,delete,retention.ms=2592000000" # 30 days # Delete a topic kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic old-events # Increase log cleaner threads if compaction is falling behind kafka-configs.sh --bootstrap-server kafka:9092 \ --entity-type brokers --entity-name 1 \ --alter --add-config "log.cleaner.threads=4"
AspectZooKeeper ModeKRaft Mode (Kafka 3.3+)
Metadata storageZooKeeper ensembleInternal Kafka topic (__cluster_metadata)
DependencyZooKeeper cluster requiredNo ZooKeeper — self-contained
ControllerOne elected Kafka brokerDedicated controller nodes (quorum)
Partition limit~200k partitionsMillions of partitions
Recovery speedSlower — ZK read on startupFaster controller failover
Production readyStable (being deprecated)GA since Kafka 3.3
# KRaft broker configuration (no ZooKeeper) process.roles=broker,controller # combined mode for small clusters node.id=1 controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093 listeners=PLAINTEXT://:9092,CONTROLLER://:9093 controller.listener.names=CONTROLLER log.dirs=/var/kafka/data