Contents
- Why Multi-Cluster Replication
- MirrorMaker 2 Architecture
- Active/Passive Setup
- Active/Active Setup
- Offset Translation & Consumer Failover
- Monitoring MirrorMaker 2
- Topic & ACL Sync
- Running as Dedicated Cluster
- Common Pitfalls
A single Kafka cluster, no matter how well provisioned, cannot satisfy every operational requirement across a distributed organisation.
Cross-cluster replication addresses four distinct needs:
- Disaster Recovery (DR): A passive DR cluster in a separate availability zone or region receives a continuous copy of all production topics. If the primary cluster fails, consumers are redirected to the DR cluster with minimal data loss.
- Geo-Distribution: Data produced in one region is replicated to another so that consumers in each region read from a nearby cluster, reducing latency and cross-region egress costs.
- Data Locality & Compliance: Regulations such as GDPR may require that personal data is processed only within a specific jurisdiction. Replication lets you keep a local copy while still feeding a central analytics cluster with anonymised data.
- Edge-to-Central Aggregation: IoT or on-premise Kafka deployments produce data locally. MirrorMaker 2 fans those streams into a central cloud cluster for global analytics without requiring direct producer access to the central broker.
MirrorMaker 2 vs MirrorMaker 1
| Feature | MirrorMaker 1 | MirrorMaker 2 |
| Foundation | Standalone consumer + producer process | Built on Kafka Connect framework |
| Offset sync | Not supported | Native consumer group offset translation |
| Topic config sync | Not supported | Syncs retention, compaction, partition count |
| Active/Active | Cycle prevention is manual and fragile | Built-in cycle prevention via topic prefix policy |
| Monitoring | Limited JMX metrics | Rich Connect metrics + heartbeat topic |
| Status | Deprecated (removed in Kafka 4.x) | Actively maintained, recommended |
Confluent Replicator is a commercial alternative to MirrorMaker 2 that adds exactly-once delivery guarantees and a graphical management UI. For open-source deployments, MirrorMaker 2 is the standard choice.
MirrorMaker 2 is implemented as a set of Kafka Connect connectors. When you run connect-mirror-maker.sh, it starts an embedded
Connect worker and deploys four connectors automatically for each configured replication flow.
| Connector | Role |
| MirrorSourceConnector |
Reads records from the source cluster and writes them to the target cluster. Handles topic creation, partition count matching, and record-level replication. |
| MirrorSinkConnector |
Internal connector used by MirrorSourceConnector to write to the target. Not configured directly in mm2.properties. |
| MirrorCheckpointConnector |
Periodically reads consumer group offsets from the source cluster, translates them to the equivalent target offsets, and writes checkpoints to the __consumer_offsets-equivalent topic on the target. Enables seamless consumer failover. |
| MirrorHeartbeatConnector |
Produces synthetic heartbeat records to the mm2-heartbeats topic on the source. These records propagate through the replication pipeline, letting you measure end-to-end replication latency. |
Remote Topic Naming
By default MM2 uses DefaultReplicationPolicy, which prefixes replicated topics with the source cluster alias and a dot separator.
A topic named orders on cluster us-east becomes us-east.orders on the target cluster.
This prefix is the key mechanism that prevents replication cycles in active/active topologies — a connector will never replicate a topic that already carries the target cluster's prefix.
- Original topic on source: orders
- Replicated topic on target: us-east.orders
- Heartbeat topic: us-east.mm2-heartbeats
- Checkpoint topic: us-east.checkpoints.internal
Offset Translation
Because a replicated topic on the target cluster has different internal offsets than the source (due to compaction differences, replication lag, and independent segment rolling), consumer group offsets cannot be directly reused.
MirrorCheckpointConnector maintains a mapping by writing translated offset checkpoints to a dedicated internal topic. Consumers failing over to the target use these checkpoints to resume from the correct position.
In an active/passive topology, one cluster (us-east) is the primary where producers write. The second cluster (us-west)
is a passive DR replica. Consumers normally read from us-east and fail over to us-west only when needed.
Replication flows in one direction only: us-east → us-west.
Full mm2.properties — Active/Passive
# ── Cluster aliases ──────────────────────────────────────────────
clusters = us-east, us-west
# ── Bootstrap servers for each cluster ───────────────────────────
us-east.bootstrap.servers = kafka-east-1:9092,kafka-east-2:9092,kafka-east-3:9092
us-west.bootstrap.servers = kafka-west-1:9092,kafka-west-2:9092,kafka-west-3:9092
# ── Replication flow: us-east → us-west only ─────────────────────
us-east->us-west.enabled = true
us-west->us-east.enabled = false
# ── Topics to replicate (regex) ───────────────────────────────────
us-east->us-west.topics = .*
# ── Topics to exclude ────────────────────────────────────────────
us-east->us-west.topics.blacklist = .*\.internal, .*-changelog, connect-.*, __.*
# ── Replication factor for remote topics on target ───────────────
replication.factor = 3
# ── Consumer group offset sync ────────────────────────────────────
us-east->us-west.sync.group.offsets.enabled = true
us-east->us-west.sync.group.offsets.interval.seconds = 60
# ── Topic config and ACL sync ─────────────────────────────────────
us-east->us-west.sync.topic.configs.enabled = true
us-east->us-west.sync.topic.acls.enabled = true
# ── Replication policy ────────────────────────────────────────────
# DefaultReplicationPolicy prefixes replicated topics: us-east.orders
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy
replication.policy.separator = .
# ── Heartbeat & checkpoint connectors ────────────────────────────
us-east->us-west.emit.heartbeats.enabled = true
us-east->us-west.emit.heartbeats.interval.seconds = 5
us-east->us-west.emit.checkpoints.enabled = true
us-east->us-west.emit.checkpoints.interval.seconds = 60
# ── Connect worker settings ───────────────────────────────────────
# Shared across both clusters
offset.storage.replication.factor = 3
status.storage.replication.factor = 3
config.storage.replication.factor = 3
# ── Security (if cross-DC TLS is required) ───────────────────────
# us-east.security.protocol = SSL
# us-east.ssl.truststore.location = /etc/kafka/certs/truststore.jks
# us-east.ssl.truststore.password = changeit
# us-east.ssl.keystore.location = /etc/kafka/certs/keystore.jks
# us-east.ssl.keystore.password = changeit
#
# us-west.security.protocol = SSL
# us-west.ssl.truststore.location = /etc/kafka/certs/truststore.jks
# us-west.ssl.truststore.password = changeit
# us-west.ssl.keystore.location = /etc/kafka/certs/keystore.jks
# us-west.ssl.keystore.password = changeit
Starting MirrorMaker 2
# Run as a standalone MirrorMaker 2 process (embedded Connect worker)
bin/connect-mirror-maker.sh config/mm2.properties
# Alternatively — run against a specific pair of clusters only
bin/connect-mirror-maker.sh \
--clusters us-east,us-west \
config/mm2.properties
Resulting Topic Names on us-west
- us-east.orders — replicated from orders on us-east
- us-east.payments — replicated from payments on us-east
- us-east.mm2-heartbeats — heartbeat flow from us-east
- us-east.checkpoints.internal — offset checkpoints written by MirrorCheckpointConnector
Consumer group offset sync runs on the interval defined by sync.group.offsets.interval.seconds. For tighter RPO targets, reduce this value, but be aware of the additional load on both clusters.
In an active/active topology, both clusters accept writes from producers. Each cluster replicates its local topics to the other.
The critical challenge is cycle prevention — without it, a record produced on us-east would be replicated
to us-west, then replicated back to us-east, and so on indefinitely.
Cycle Prevention via DefaultReplicationPolicy
DefaultReplicationPolicy prevents cycles by checking the topic name prefix before replicating.
If a topic on us-west is named us-east.orders, the us-west→us-east connector recognises the us-east.
prefix and skips it — the record has already originated from us-east and must not be sent back.
Full mm2.properties — Active/Active
# ── Cluster aliases ──────────────────────────────────────────────
clusters = us-east, us-west
# ── Bootstrap servers ─────────────────────────────────────────────
us-east.bootstrap.servers = kafka-east-1:9092,kafka-east-2:9092,kafka-east-3:9092
us-west.bootstrap.servers = kafka-west-1:9092,kafka-west-2:9092,kafka-west-3:9092
# ── Bidirectional replication ─────────────────────────────────────
us-east->us-west.enabled = true
us-west->us-east.enabled = true
# ── Topics to replicate ───────────────────────────────────────────
us-east->us-west.topics = .*
us-west->us-east.topics = .*
# ── Exclude internal and already-replicated topics ────────────────
# DefaultReplicationPolicy auto-excludes prefixed topics, but explicit
# blacklist guards against edge cases.
us-east->us-west.topics.blacklist = .*\.internal, .*-changelog, connect-.*, __.*
us-west->us-east.topics.blacklist = .*\.internal, .*-changelog, connect-.*, __.*
# ── Replication factor ────────────────────────────────────────────
replication.factor = 3
# ── Consumer group offset sync ────────────────────────────────────
us-east->us-west.sync.group.offsets.enabled = true
us-east->us-west.sync.group.offsets.interval.seconds = 60
us-west->us-east.sync.group.offsets.enabled = true
us-west->us-east.sync.group.offsets.interval.seconds = 60
# ── Topic config sync ─────────────────────────────────────────────
us-east->us-west.sync.topic.configs.enabled = true
us-west->us-east.sync.topic.configs.enabled = true
# ── ACL sync ──────────────────────────────────────────────────────
us-east->us-west.sync.topic.acls.enabled = true
us-west->us-east.sync.topic.acls.enabled = true
# ── Replication policy — MUST be DefaultReplicationPolicy for A/A ─
# IdentityReplicationPolicy does NOT add a prefix and will cause cycles.
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy
replication.policy.separator = .
# ── Heartbeat & checkpoint ────────────────────────────────────────
us-east->us-west.emit.heartbeats.enabled = true
us-east->us-west.emit.heartbeats.interval.seconds = 5
us-east->us-west.emit.checkpoints.enabled = true
us-east->us-west.emit.checkpoints.interval.seconds = 60
us-west->us-east.emit.heartbeats.enabled = true
us-west->us-east.emit.heartbeats.interval.seconds = 5
us-west->us-east.emit.checkpoints.enabled = true
us-west->us-east.emit.checkpoints.interval.seconds = 60
# ── Connect worker shared settings ───────────────────────────────
offset.storage.replication.factor = 3
status.storage.replication.factor = 3
config.storage.replication.factor = 3
DefaultReplicationPolicy vs IdentityReplicationPolicy
| Policy | Topic Naming | Cycle Safe | Use Case |
| DefaultReplicationPolicy |
source-alias.topic-name |
Yes |
Active/active and active/passive. Standard choice. |
| IdentityReplicationPolicy |
topic-name (unchanged) |
No |
Active/passive only, when consumers must not change their topic subscription. Requires disabling the reverse flow explicitly. |
Conflict Resolution in Active/Active
MirrorMaker 2 does not perform record-level merge or conflict resolution. If both clusters accept writes to the same logical topic
(e.g., orders on us-east and orders on us-west), consumers on each cluster will see:
- Local records in the original orders topic.
- Remote records in the us-west.orders (or us-east.orders) prefixed topic.
Applications that need a unified view must read from both topics and deduplicate by record key or a business-level identifier.
Conflict resolution logic (last-write-wins, CRDT, version vectors) is the responsibility of the application layer.
For most active/active deployments, producers in each region only write to region-specific topics (e.g., orders-east, orders-west). MirrorMaker 2 then replicates each to the other cluster. This avoids same-key conflicts entirely.
The offset of a record on the target cluster is almost never the same as on the source cluster. Compaction, different retention
periods, and independent log segment lifecycles cause offsets to diverge. MirrorCheckpointConnector bridges this gap.
How MirrorCheckpointConnector Works
- Every emit.checkpoints.interval.seconds seconds, it reads the committed offsets for all tracked consumer groups from the source cluster's __consumer_offsets topic.
- It translates each source offset to the equivalent target offset by consulting the internal offset mapping maintained by MirrorSourceConnector.
- The translated checkpoints are written to <source-alias>.checkpoints.internal on the target cluster (e.g., us-east.checkpoints.internal).
- Optionally, with sync.group.offsets.enabled = true, MM2 also commits the translated offsets directly to __consumer_offsets on the target, so consumers can start consuming without any code change.
Programmatic Offset Translation
If you need to translate offsets manually (e.g., to determine the exact target partition/offset before a failover), use the
RemoteClusterUtils utility class.
import org.apache.kafka.connect.mirror.RemoteClusterUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-west-1:9092");
// Translate consumer group offsets from us-east to us-west
// remoteClusterAlias = "us-east" (source cluster alias as seen from target)
// consumerGroupId = the group you want to translate
Map<TopicPartition, OffsetAndMetadata> translatedOffsets =
RemoteClusterUtils.translateOffsets(
props,
"us-east", // remote (source) cluster alias
"my-consumer-group",
Duration.ofSeconds(10)
);
// Assign translated offsets to a new consumer on the target cluster
KafkaConsumer<String, String> drConsumer = new KafkaConsumer<>(props);
drConsumer.assign(translatedOffsets.keySet());
translatedOffsets.forEach(drConsumer::seek);
Consumer Failover Steps
- Step 1: Confirm MirrorCheckpointConnector is running and sync.group.offsets.enabled = true is set for the replication flow.
- Step 2: Update consumer bootstrap.servers to point to the DR (target) cluster.
- Step 3: Change the consumer's topic subscription from orders to us-east.orders (the prefixed name on the DR cluster).
- Step 4: The consumer group's translated offsets have already been committed to __consumer_offsets on the DR cluster by MM2. The consumer will pick up from the correct position automatically on the next poll().
- Step 5: If offsets are stale or missing, use RemoteClusterUtils.translateOffsets() to compute and seek to the correct position manually.
The lag between when a record is produced on the source and when its translated offset checkpoint is written to the target is bounded by emit.checkpoints.interval.seconds. For critical DR scenarios, set this to 30 seconds or less.
MM2 exposes monitoring data through three channels: the heartbeat topic, JMX metrics from the embedded Connect worker, and
standard Kafka consumer group lag tooling.
Heartbeat Topic
MirrorHeartbeatConnector produces a record to mm2-heartbeats on the source every
emit.heartbeats.interval.seconds seconds. As this record propagates through the replication pipeline to the target,
you can measure the end-to-end replication latency by comparing the record timestamp to the current time on the target cluster.
# Consume heartbeats on the target cluster to verify replication is alive
bin/kafka-console-consumer.sh \
--bootstrap-server kafka-west-1:9092 \
--topic us-east.mm2-heartbeats \
--from-beginning \
--property print.timestamp=true
# Expected output (one record per heartbeat interval):
# CreateTime:1713456000000 {"sourceClusterAlias":"us-east","targetClusterAlias":"us-west","timestamp":1713456000000}
Key JMX Metrics
| Metric | MBean | What it Measures |
| Replication Latency |
kafka.connect:type=MirrorSourceConnector,target=*,topic=*,partition=* |
Milliseconds between record timestamp on source and when it was written to target |
| Record Count |
kafka.connect:type=MirrorSourceConnector,... |
Number of records replicated per topic/partition |
| Connector Status |
kafka.connect:type=connector-metrics,connector=* |
RUNNING / PAUSED / FAILED state of each connector |
| Consumer Lag |
kafka.consumer:type=consumer-fetch-manager-metrics |
How far MM2's internal source connector consumer is behind the source topic head |
Checking Consumer Lag on Source Connector
# Check lag of the MirrorSourceConnector's internal consumer group on source cluster
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-east-1:9092 \
--group us-east->us-west \
--describe
# Check translated consumer group on target cluster
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-west-1:9092 \
--group my-app-consumer-group \
--describe
High replication latency combined with zero consumer lag on the source connector usually indicates a slow network link between DCs, not a MM2 configuration problem. Consider tuning fetch.max.bytes and producer.batch.size in the connector config.
Beyond record replication, MM2 can synchronise topic configurations and ACLs from the source cluster to the target,
reducing the operational overhead of keeping cluster configurations in lockstep.
What Gets Synced
- Topic retention: retention.ms, retention.bytes — replicated from source topic configs.
- Compaction policy: cleanup.policy (delete / compact / compact,delete).
- Partition count: MM2 will create the remote topic with the same number of partitions as the source. Note: partition count can only increase, not decrease.
- Other topic configs: min.insync.replicas, max.message.bytes, compression.type, segment sizes.
- ACLs: Requires Kafka 2.7+ on both clusters. Broker ACLs for the replicated (prefixed) topic name are synced.
What Does Not Get Synced
- Cluster-level configs (e.g., default replication factor, log directories).
- Broker quotas and user quotas.
- Consumer group memberships and active sessions.
- Connector and Schema Registry configurations.
- ACLs on clusters running Kafka below 2.7 — the sync will silently skip ACL operations.
Configuration
# Enable topic config sync (retention, compaction, etc.)
us-east->us-west.sync.topic.configs.enabled = true
# Configs to exclude from sync (comma-separated regex)
us-east->us-west.sync.topic.configs.exclude = follower\.replication\.throttled\.replicas, \
leader\.replication\.throttled\.replicas
# Enable ACL sync — requires Kafka 2.7+ on both clusters
us-east->us-west.sync.topic.acls.enabled = true
Even with sync.topic.configs.enabled = true, changes on the source propagate to the target on the next config-sync cycle (default: every 10 minutes). For immediate propagation, restart the MirrorSourceConnector or force a task restart via the Connect REST API.
MirrorMaker 2 can be deployed in two modes: as a standalone process that embeds its own Connect worker, or as a set of
connectors deployed to an existing dedicated Connect cluster.
Mode 1 — Standalone (Embedded) Process
The simplest deployment. Run connect-mirror-maker.sh with your mm2.properties file.
MM2 starts an embedded Connect worker, creates the required connectors, and manages their lifecycle.
This mode is self-contained but does not use a distributed Connect cluster's offset or config storage topics.
# Single node — start the embedded MM2 process
bin/connect-mirror-maker.sh config/mm2.properties
# Restrict to a specific pair of clusters (useful when mm2.properties
# defines many clusters but a given node should only handle one flow)
bin/connect-mirror-maker.sh \
--clusters us-east,us-west \
config/mm2.properties
Mode 2 — Distributed Connect Cluster
For production, deploy MM2 connectors to a dedicated distributed Connect cluster. This provides horizontal scalability,
fault tolerance via task redistribution, and a REST management API.
# Start a distributed Connect worker pointing to the MM2 config
bin/connect-distributed.sh config/connect-distributed.properties &
# Deploy the MirrorSourceConnector via the Connect REST API
curl -X POST http://connect-worker:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "us-east-to-us-west-source",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "us-east",
"target.cluster.alias": "us-west",
"source.cluster.bootstrap.servers": "kafka-east-1:9092",
"target.cluster.bootstrap.servers": "kafka-west-1:9092",
"topics": ".*",
"replication.factor": "3",
"sync.topic.configs.enabled": "true",
"replication.policy.class": "org.apache.kafka.connect.mirror.DefaultReplicationPolicy"
}
}'
# Deploy the MirrorCheckpointConnector
curl -X POST http://connect-worker:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "us-east-to-us-west-checkpoint",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"source.cluster.alias": "us-east",
"target.cluster.alias": "us-west",
"source.cluster.bootstrap.servers": "kafka-east-1:9092",
"target.cluster.bootstrap.servers": "kafka-west-1:9092",
"sync.group.offsets.enabled": "true",
"emit.checkpoints.interval.seconds": "60"
}
}'
# Deploy the MirrorHeartbeatConnector
curl -X POST http://connect-worker:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "us-east-to-us-west-heartbeat",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"source.cluster.alias": "us-east",
"target.cluster.alias": "us-west",
"source.cluster.bootstrap.servers": "kafka-east-1:9092",
"target.cluster.bootstrap.servers": "kafka-west-1:9092",
"emit.heartbeats.interval.seconds": "5"
}
}'
Scaling
- Add more Connect worker nodes to the group (same group.id in connect-distributed.properties). Kafka Connect distributes tasks across all workers automatically.
- Increase tasks.max on the MirrorSourceConnector to parallelise replication across more partitions simultaneously.
- Each Connect worker should be co-located with (or close to) the source cluster to minimise consumer fetch latency.
The --clusters flag in standalone mode controls which cluster pairs a given MM2 process handles. In large multi-region setups, deploy one MM2 process per region pair rather than one central process handling all flows — this avoids cross-region consumer traffic from the MM2 process itself.
The following issues account for the majority of MM2 support cases in production environments.
1. Topic Name Prefix Confusion in Active/Active
- Consumers that were reading orders on us-east must read us-east.orders on us-west after failover. Forgetting to update the topic name is the most common cause of "no messages received" after a failover.
- When using IdentityReplicationPolicy in a misguided attempt to avoid topic renaming, the reverse replication direction MUST be disabled. Failure to do so causes an infinite replication loop that saturates broker I/O.
2. Consumer Offset Lag When DR Cluster is Behind
- If the replication pipeline is lagging (e.g., due to a network partition between DCs), the DR cluster may be minutes or hours behind the source. After failover, consumers will reprocess already-handled records.
- Monitor replication latency continuously. Set up alerting when it exceeds your RPO tolerance.
- Use idempotent consumers (or exactly-once semantics) to handle potential reprocessing gracefully.
3. Partition Count Mismatch
- If a topic is manually created on the target with a different partition count before MM2 first replicates it, MM2 will not change the partition count. Records will replicate but the key-based partition assignment will differ from the source.
- Always let MM2 create remote topics automatically, or ensure manual creation matches the source partition count exactly.
4. Forgetting to Enable the Heartbeat Connector
- Without MirrorHeartbeatConnector, there is no built-in way to verify that the replication pipeline is alive and measure latency. Operators discover replication has been silently failing only when a DR failover is attempted.
- Always deploy all three connectors: MirrorSourceConnector, MirrorCheckpointConnector, and MirrorHeartbeatConnector.
5. Missing SSL Config for Cross-DC Communication
# Both the source consumer AND the target producer inside MM2
# need their own SSL settings. Prefix with the cluster alias.
us-east.security.protocol = SSL
us-east.ssl.truststore.location = /etc/kafka/certs/east-truststore.jks
us-east.ssl.truststore.password = changeit
us-east.ssl.keystore.location = /etc/kafka/certs/east-keystore.jks
us-east.ssl.keystore.password = changeit
us-east.ssl.key.password = changeit
us-west.security.protocol = SSL
us-west.ssl.truststore.location = /etc/kafka/certs/west-truststore.jks
us-west.ssl.truststore.password = changeit
us-west.ssl.keystore.location = /etc/kafka/certs/west-keystore.jks
us-west.ssl.keystore.password = changeit
us-west.ssl.key.password = changeit
# SASL/PLAIN example for clusters using authentication
# us-east.security.protocol = SASL_SSL
# us-east.sasl.mechanism = PLAIN
# us-east.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required \
# username="mm2-user" password="mm2-secret";
- Each cluster alias gets its own prefixed security properties. Mixing them up (e.g., using us-east certs for the us-west producer) results in TLS handshake failures that are difficult to diagnose from MM2 logs alone.
- Verify connectivity independently with kafka-broker-api-versions.sh using the same SSL config before starting MM2.
6. Replication Factor Exceeds Target Cluster Broker Count
- If replication.factor = 3 is set but the target cluster has fewer than 3 brokers, MM2 will fail to create remote topics and log errors. Set replication.factor to match the target cluster's available broker count.
Always run a full DR drill — stop the source cluster, trigger consumer failover, and verify end-to-end message flow on the DR cluster — before you need it in a real incident. Paper DR plans are not a substitute for a tested runbook.