Contents
- What is Kafka Connect
- Source vs Sink Connectors
- Running Connect (Standalone vs Distributed)
- Connector Configuration
- Common Connectors (JDBC, File)
- REST API
Kafka Connect is a component of the Kafka ecosystem that makes it easy to move large amounts of data into and out of Kafka
without writing custom code. It runs as a separate process (or cluster of processes) and is configured via JSON over HTTP.
- Connector — the high-level abstraction that specifies where data comes from or goes to.
- Task — the unit of work. A connector may be split into multiple tasks for parallelism.
- Worker — a JVM process running Kafka Connect. Workers host tasks and connectors.
- Converter — serialises/deserialises data between Connect's internal format and Kafka (e.g., JSON, Avro).
- Transform (SMT) — Single Message Transform applied to each record inline (e.g., rename fields, add timestamp).
| Type | Direction | Examples |
| Source Connector |
External system → Kafka topic |
JDBC Source (read from DB), Debezium CDC, File Source, S3 Source |
| Sink Connector |
Kafka topic → External system |
JDBC Sink (write to DB), Elasticsearch Sink, S3 Sink, HDFS Sink |
A single Kafka Connect worker can run many connectors simultaneously. Source connectors poll external systems for new data;
sink connectors consume from topics and write to external systems.
Kafka Connect supports two deployment modes:
| Mode | Use case | How to start |
| Standalone |
Development, simple single-worker pipelines |
connect-standalone.sh worker.properties connector.properties |
| Distributed |
Production, fault-tolerant, multi-worker |
connect-distributed.sh worker.properties — connectors deployed via REST API |
Minimal distributed worker config (connect-distributed.properties):
bootstrap.servers=localhost:9092
# Unique name for this Connect cluster — used for internal topics
group.id=connect-cluster
# Internal topics for config, offsets, and status storage
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
# Default converters
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# REST API port
listeners=HTTP://:8083
In distributed mode, connectors and tasks are automatically rebalanced if a worker fails. Add more workers to the same group.id to scale out.
In distributed mode, connectors are created by POSTing a JSON config to the REST API.
Here is the structure of a connector configuration:
{
"name": "my-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "3",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "kafka",
"connection.password": "secret",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "orders,customers",
"topic.prefix": "pg-",
"poll.interval.ms": "5000",
"transforms": "addTimestamp",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "ingestion_ts"
}
}
Key common config properties across all connectors:
| Property | Description |
| connector.class | Fully qualified connector class name |
| tasks.max | Max number of tasks (parallelism) |
| topics | Comma-separated topics (sink connectors) |
| key.converter / value.converter | Override worker-level converter per connector |
| transforms | Comma-separated list of SMTs to apply |
JDBC Source Connector — polls a relational database and publishes rows to Kafka.
{
"name": "jdbc-source-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/shop",
"connection.user": "connect",
"connection.password": "pass",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"table.whitelist": "orders",
"topic.prefix": "mysql-",
"tasks.max": "1"
}
}
File Source Connector (bundled) — reads lines from a file and publishes to a topic.
# connector.properties for standalone mode
name=file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/var/log/app.log
topic=app-logs
File Sink Connector (bundled) — consumes from a topic and appends to a file.
name=file-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/output.txt
topics=app-logs
Kafka Connect exposes a REST API (default port 8083) for managing connectors.
| Method | Endpoint | Action |
| GET | /connectors | List all connector names |
| POST | /connectors | Create a new connector |
| GET | /connectors/{name} | Get connector info |
| GET | /connectors/{name}/status | Get connector and task status |
| PUT | /connectors/{name}/config | Update connector config |
| POST | /connectors/{name}/restart | Restart connector |
| PUT | /connectors/{name}/pause | Pause connector |
| PUT | /connectors/{name}/resume | Resume paused connector |
| DELETE | /connectors/{name} | Delete connector |
# Create connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.json
# Check status
curl http://localhost:8083/connectors/jdbc-source-orders/status
# Pause connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-orders/pause
# Delete connector
curl -X DELETE http://localhost:8083/connectors/jdbc-source-orders
Task failures do not automatically restart. Monitor connector status and use /connectors/{name}/tasks/{taskId}/restart to restart individual failed tasks.