Contents
- Connector Architecture
- Custom SourceConnector
- SourceTask — Polling & Offset Management
- Custom SinkConnector
- SinkTask — Writing to External System
- Error Handling & Dead Letter Queue
- Build & Deploy as a Plugin
- Custom Single Message Transform (SMT)
| Component | Responsibility |
| Connector | Validates config, determines parallelism, distributes config to tasks |
| Task | Does the actual data movement — polls source or writes to sink |
| Worker | Kafka Connect runtime — manages tasks, offsets, REST API, transforms |
| OffsetStorageReader | Lets a SourceTask read previously committed offsets to resume after restart |
| SourceRecord | A Kafka record produced by a SourceTask (topic, key, value, sourcePartition, sourceOffset) |
| SinkRecord | A Kafka record delivered to a SinkTask (includes topic, offset, key, value) |
The Connector class validates configuration and tells the framework how many tasks to create and what config each task receives.
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
public class HttpPollingSourceConnector extends SourceConnector {
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
this.config = props;
// Validate required config here
if (!props.containsKey("http.url"))
throw new ConnectException("http.url is required");
}
@Override
public Class<? extends Task> taskClass() {
return HttpPollingSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// Each task gets an independent URL to poll (one task per endpoint)
List<String> urls = Arrays.asList(config.get("http.url").split(","));
int numTasks = Math.min(maxTasks, urls.size());
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
Map<String, String> taskConfig = new HashMap<>(config);
taskConfig.put("http.url", urls.get(i).trim());
taskConfigs.add(taskConfig);
}
return taskConfigs;
}
@Override
public void stop() { /* release connector-level resources */ }
@Override
public ConfigDef config() {
return new ConfigDef()
.define("http.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"Comma-separated list of HTTP endpoints to poll")
.define("http.poll.interval.ms", ConfigDef.Type.LONG, 5000L,
ConfigDef.Importance.MEDIUM, "Polling interval in milliseconds")
.define("kafka.topic", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"Target Kafka topic");
}
@Override
public String version() { return "1.0.0"; }
}
The task polls the source and returns SourceRecord objects. It uses context.offsetStorageReader() to resume from the last committed position after a restart.
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
public class HttpPollingSourceTask extends SourceTask {
private String httpUrl;
private String kafkaTopic;
private long pollIntervalMs;
private long lastId = 0; // tracks the last processed record ID
@Override
public void start(Map<String, String> props) {
httpUrl = props.get("http.url");
kafkaTopic = props.get("kafka.topic");
pollIntervalMs = Long.parseLong(props.getOrDefault("http.poll.interval.ms", "5000"));
// Resume from last committed offset
Map<String, Object> partition = Map.of("url", httpUrl);
Map<String, Object> offset = context.offsetStorageReader().offset(partition);
if (offset != null && offset.containsKey("last_id")) {
lastId = (Long) offset.get("last_id");
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
Thread.sleep(pollIntervalMs);
List<SourceRecord> records = new ArrayList<>();
// Fetch new records from the HTTP endpoint since lastId
List<JsonNode> items = fetchFromHttp(httpUrl, lastId);
for (JsonNode item : items) {
long id = item.get("id").asLong();
// sourcePartition identifies the "stream" — used for offset tracking
Map<String, Object> sourcePartition = Map.of("url", httpUrl);
// sourceOffset is what gets committed — used on restart
Map<String, Object> sourceOffset = Map.of("last_id", id);
records.add(new SourceRecord(
sourcePartition, sourceOffset,
kafkaTopic,
null, // partition — let Kafka choose
Schema.STRING_SCHEMA, // key schema
item.get("key").asText(), // key
Schema.STRING_SCHEMA, // value schema
item.toString() // value
));
lastId = Math.max(lastId, id);
}
return records;
}
@Override
public void stop() { /* close HTTP client */ }
@Override
public String version() { return "1.0.0"; }
}
The class below shows the implementation. Key points are highlighted in the inline comments.
public class HttpSinkConnector extends SinkConnector {
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
this.config = props;
if (!props.containsKey("http.endpoint"))
throw new ConnectException("http.endpoint is required");
}
@Override
public Class<? extends Task> taskClass() {
return HttpSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// All tasks share the same config for a simple sink
return Collections.nCopies(maxTasks, new HashMap<>(config));
}
@Override
public void stop() {}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("http.endpoint", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"HTTP endpoint to POST records to")
.define("http.batch.size", ConfigDef.Type.INT, 100,
ConfigDef.Importance.MEDIUM, "Max records per HTTP batch");
}
@Override
public String version() { return "1.0.0"; }
}
The Connect framework calls put() with batches of records. After successful delivery, offsets are committed automatically by the worker.
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
public class HttpSinkTask extends SinkTask {
private String httpEndpoint;
private int batchSize;
private HttpClient httpClient;
@Override
public void start(Map<String, String> props) {
httpEndpoint = props.get("http.endpoint");
batchSize = Integer.parseInt(props.getOrDefault("http.batch.size", "100"));
httpClient = HttpClient.newHttpClient();
}
@Override
public void put(Collection<SinkRecord> records) {
if (records.isEmpty()) return;
// Batch records and POST to HTTP endpoint
List<String> payloads = records.stream()
.map(r -> r.value().toString())
.collect(Collectors.toList());
// Send in chunks of batchSize
Lists.partition(payloads, batchSize).forEach(batch -> {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(httpEndpoint))
.POST(HttpRequest.BodyPublishers.ofString(
"[" + String.join(",", batch) + "]"))
.header("Content-Type", "application/json")
.build();
HttpResponse<String> response =
httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 400) {
throw new RetriableException("HTTP error: " + response.statusCode());
}
} catch (IOException | InterruptedException e) {
throw new RetriableException("Failed to send records to " + httpEndpoint, e);
}
});
}
@Override
public void stop() { /* close HTTP client */ }
@Override
public String version() { return "1.0.0"; }
}
Throw RetriableException for transient errors (network timeout, 5xx). The Connect framework will retry the put() call. Throw ConnectException for permanent errors that should send records to the DLQ.
Configure error handling in the connector deployment request so failed records go to a dead letter topic rather than stopping the task.
{
"name": "http-sink",
"config": {
"connector.class": "com.example.HttpSinkConnector",
"topics": "order.created",
"http.endpoint": "https://api.example.com/orders",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "order.created.DLQ",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
}
The commands below build and run the application. Make sure Docker is running locally before executing the image build steps.
<!-- pom.xml — shade plugin to create a fat JAR -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
</execution>
</executions>
</plugin>
# 1. Build the fat JAR
mvn clean package -DskipTests
# 2. Place in Connect's plugin.path directory
mkdir -p /opt/kafka/plugins/http-connector
cp target/http-connector-1.0.0.jar /opt/kafka/plugins/http-connector/
# 3. Set plugin.path in connect-distributed.properties
echo "plugin.path=/opt/kafka/plugins" >> connect-distributed.properties
# 4. Restart Kafka Connect worker
# 5. Register the connector via REST API
curl -X POST http://connect:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.json
# 6. Verify status
curl http://connect:8083/connectors/http-sink/status
SMTs transform individual records in the Connect pipeline — add fields, rename keys, filter, or route. Implement Transformation<R> to create a custom one.
import org.apache.kafka.connect.transforms.Transformation;
public class AddTimestampTransform<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R record) {
if (record.valueSchema() == null) {
// Schemaless — value is a Map
@SuppressWarnings("unchecked")
Map<String, Object> value = (Map<String, Object>) record.value();
Map<String, Object> updated = new LinkedHashMap<>(value);
updated.put("processed_at", Instant.now().toString());
return record.newRecord(record.topic(), record.kafkaPartition(),
record.keySchema(), record.key(),
record.valueSchema(), updated,
record.timestamp());
}
return record; // schema-based transform omitted for brevity
}
@Override
public ConfigDef config() { return new ConfigDef(); }
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
// Apply SMT in connector config
{
"transforms": "addTs",
"transforms.addTs.type": "com.example.AddTimestampTransform"
}