Contents

Kafka Connect is a component of the Kafka ecosystem that makes it easy to move large amounts of data into and out of Kafka without writing custom code. It runs as a separate process (or cluster of processes) and is configured via JSON over HTTP.

TypeDirectionExamples
Source Connector External system → Kafka topic JDBC Source (read from DB), Debezium CDC, File Source, S3 Source
Sink Connector Kafka topic → External system JDBC Sink (write to DB), Elasticsearch Sink, S3 Sink, HDFS Sink

A single Kafka Connect worker can run many connectors simultaneously. Source connectors poll external systems for new data; sink connectors consume from topics and write to external systems.

Kafka Connect supports two deployment modes:

ModeUse caseHow to start
Standalone Development, simple single-worker pipelines connect-standalone.sh worker.properties connector.properties
Distributed Production, fault-tolerant, multi-worker connect-distributed.sh worker.properties — connectors deployed via REST API

Minimal distributed worker config (connect-distributed.properties):

bootstrap.servers=localhost:9092 # Unique name for this Connect cluster — used for internal topics group.id=connect-cluster # Internal topics for config, offsets, and status storage config.storage.topic=connect-configs offset.storage.topic=connect-offsets status.storage.topic=connect-statuses config.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 # Default converters key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false # REST API port listeners=HTTP://:8083 In distributed mode, connectors and tasks are automatically rebalanced if a worker fails. Add more workers to the same group.id to scale out.

In distributed mode, connectors are created by POSTing a JSON config to the REST API. Here is the structure of a connector configuration:

{ "name": "my-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "3", "connection.url": "jdbc:postgresql://localhost:5432/mydb", "connection.user": "kafka", "connection.password": "secret", "mode": "incrementing", "incrementing.column.name": "id", "table.whitelist": "orders,customers", "topic.prefix": "pg-", "poll.interval.ms": "5000", "transforms": "addTimestamp", "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addTimestamp.timestamp.field": "ingestion_ts" } }

Key common config properties across all connectors:

PropertyDescription
connector.classFully qualified connector class name
tasks.maxMax number of tasks (parallelism)
topicsComma-separated topics (sink connectors)
key.converter / value.converterOverride worker-level converter per connector
transformsComma-separated list of SMTs to apply

JDBC Source Connector — polls a relational database and publishes rows to Kafka.

{ "name": "jdbc-source-orders", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://localhost:3306/shop", "connection.user": "connect", "connection.password": "pass", "mode": "timestamp+incrementing", "timestamp.column.name": "updated_at", "incrementing.column.name": "id", "table.whitelist": "orders", "topic.prefix": "mysql-", "tasks.max": "1" } }

File Source Connector (bundled) — reads lines from a file and publishes to a topic.

# connector.properties for standalone mode name=file-source connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector tasks.max=1 file=/var/log/app.log topic=app-logs

File Sink Connector (bundled) — consumes from a topic and appends to a file.

name=file-sink connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 file=/tmp/output.txt topics=app-logs

Kafka Connect exposes a REST API (default port 8083) for managing connectors.

MethodEndpointAction
GET/connectorsList all connector names
POST/connectorsCreate a new connector
GET/connectors/{name}Get connector info
GET/connectors/{name}/statusGet connector and task status
PUT/connectors/{name}/configUpdate connector config
POST/connectors/{name}/restartRestart connector
PUT/connectors/{name}/pausePause connector
PUT/connectors/{name}/resumeResume paused connector
DELETE/connectors/{name}Delete connector
# Create connector curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d @connector-config.json # Check status curl http://localhost:8083/connectors/jdbc-source-orders/status # Pause connector curl -X PUT http://localhost:8083/connectors/jdbc-source-orders/pause # Delete connector curl -X DELETE http://localhost:8083/connectors/jdbc-source-orders Task failures do not automatically restart. Monitor connector status and use /connectors/{name}/tasks/{taskId}/restart to restart individual failed tasks.