Contents
- Basic Producer Example
- Acknowledgment (acks)
- Partitioning
- Batching and Compression
- Idempotent and Transactional Producers
- Key Configuration Summary
Add the Kafka clients dependency to your project:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Send message asynchronously (with callback)
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "user-123", "event-payload");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error: " + exception.getMessage());
} else {
System.out.printf("Sent to topic=%s partition=%d offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
producer.close(); // flushes and closes
acks controls how many broker acknowledgments the producer waits for before
considering the request complete. It directly affects durability vs throughput.
| acks value | Meaning | Durability | Throughput |
| 0 | Fire and forget — no ack waited | Lowest (data loss possible) | Highest |
| 1 (default) | Leader broker acks | Medium (loss if leader crashes before replication) | High |
| all or -1 | All in-sync replicas ack | Highest (no loss if replication factor ≥ 2) | Lower |
props.put(ProducerConfig.ACKS_CONFIG, "all"); // strongest durability
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
Kafka routes messages to partitions by:
- Key-based (default) — murmur2(key) % numPartitions. Same key always goes to the same partition (ordering guaranteed per key).
- Round-robin — if no key is set, messages are distributed across partitions.
- Custom Partitioner — implement org.apache.kafka.clients.producer.Partitioner.
// Message with key — always goes to same partition
new ProducerRecord<>("orders", "customer-456", orderJson);
// Message without key — round-robin across partitions
new ProducerRecord<>("logs", null, logLine);
// Explicit partition
new ProducerRecord<>("orders", 2, "customer-789", orderJson);
The producer buffers records in a per-partition batch to improve throughput.
Compression reduces network and storage overhead.
// Batching
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16 KB batch size
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // wait up to 5ms to fill batch
// Compression (snappy / gzip / lz4 / zstd)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// Buffer (total memory for producer buffering)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432L); // 32 MB
Idempotent producer (Kafka 0.11+): ensures exactly-once delivery within a
single partition — duplicate retries are deduplicated by the broker.
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Implicitly sets: acks=all, retries=MAX_INT, max.in.flight.requests.per.connection=5
Transactional producer: atomic multi-partition / multi-topic writes.
All messages in a transaction are either all committed or all aborted.
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-1");
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", key, value));
producer.send(new ProducerRecord<>("audit-log", key, auditValue));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
| Config | Default | Purpose |
| bootstrap.servers | — | Comma-separated list of broker host:port |
| acks | 1 | Durability level |
| retries | 2147483647 | Retry on transient failures |
| batch.size | 16384 | Max batch size in bytes |
| linger.ms | 0 | Extra delay to allow batching |
| compression.type | none | snappy/gzip/lz4/zstd |
| enable.idempotence | true (Kafka 3+) | Exactly-once within partition |