Contents

ComponentResponsibility
ConnectorValidates config, determines parallelism, distributes config to tasks
TaskDoes the actual data movement — polls source or writes to sink
WorkerKafka Connect runtime — manages tasks, offsets, REST API, transforms
OffsetStorageReaderLets a SourceTask read previously committed offsets to resume after restart
SourceRecordA Kafka record produced by a SourceTask (topic, key, value, sourcePartition, sourceOffset)
SinkRecordA Kafka record delivered to a SinkTask (includes topic, offset, key, value)

The Connector class validates configuration and tells the framework how many tasks to create and what config each task receives.

import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; public class HttpPollingSourceConnector extends SourceConnector { private Map<String, String> config; @Override public void start(Map<String, String> props) { this.config = props; // Validate required config here if (!props.containsKey("http.url")) throw new ConnectException("http.url is required"); } @Override public Class<? extends Task> taskClass() { return HttpPollingSourceTask.class; } @Override public List<Map<String, String>> taskConfigs(int maxTasks) { // Each task gets an independent URL to poll (one task per endpoint) List<String> urls = Arrays.asList(config.get("http.url").split(",")); int numTasks = Math.min(maxTasks, urls.size()); List<Map<String, String>> taskConfigs = new ArrayList<>(); for (int i = 0; i < numTasks; i++) { Map<String, String> taskConfig = new HashMap<>(config); taskConfig.put("http.url", urls.get(i).trim()); taskConfigs.add(taskConfig); } return taskConfigs; } @Override public void stop() { /* release connector-level resources */ } @Override public ConfigDef config() { return new ConfigDef() .define("http.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Comma-separated list of HTTP endpoints to poll") .define("http.poll.interval.ms", ConfigDef.Type.LONG, 5000L, ConfigDef.Importance.MEDIUM, "Polling interval in milliseconds") .define("kafka.topic", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Target Kafka topic"); } @Override public String version() { return "1.0.0"; } }

The task polls the source and returns SourceRecord objects. It uses context.offsetStorageReader() to resume from the last committed position after a restart.

import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; public class HttpPollingSourceTask extends SourceTask { private String httpUrl; private String kafkaTopic; private long pollIntervalMs; private long lastId = 0; // tracks the last processed record ID @Override public void start(Map<String, String> props) { httpUrl = props.get("http.url"); kafkaTopic = props.get("kafka.topic"); pollIntervalMs = Long.parseLong(props.getOrDefault("http.poll.interval.ms", "5000")); // Resume from last committed offset Map<String, Object> partition = Map.of("url", httpUrl); Map<String, Object> offset = context.offsetStorageReader().offset(partition); if (offset != null && offset.containsKey("last_id")) { lastId = (Long) offset.get("last_id"); } } @Override public List<SourceRecord> poll() throws InterruptedException { Thread.sleep(pollIntervalMs); List<SourceRecord> records = new ArrayList<>(); // Fetch new records from the HTTP endpoint since lastId List<JsonNode> items = fetchFromHttp(httpUrl, lastId); for (JsonNode item : items) { long id = item.get("id").asLong(); // sourcePartition identifies the "stream" — used for offset tracking Map<String, Object> sourcePartition = Map.of("url", httpUrl); // sourceOffset is what gets committed — used on restart Map<String, Object> sourceOffset = Map.of("last_id", id); records.add(new SourceRecord( sourcePartition, sourceOffset, kafkaTopic, null, // partition — let Kafka choose Schema.STRING_SCHEMA, // key schema item.get("key").asText(), // key Schema.STRING_SCHEMA, // value schema item.toString() // value )); lastId = Math.max(lastId, id); } return records; } @Override public void stop() { /* close HTTP client */ } @Override public String version() { return "1.0.0"; } }

The class below shows the implementation. Key points are highlighted in the inline comments.

public class HttpSinkConnector extends SinkConnector { private Map<String, String> config; @Override public void start(Map<String, String> props) { this.config = props; if (!props.containsKey("http.endpoint")) throw new ConnectException("http.endpoint is required"); } @Override public Class<? extends Task> taskClass() { return HttpSinkTask.class; } @Override public List<Map<String, String>> taskConfigs(int maxTasks) { // All tasks share the same config for a simple sink return Collections.nCopies(maxTasks, new HashMap<>(config)); } @Override public void stop() {} @Override public ConfigDef config() { return new ConfigDef() .define("http.endpoint", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "HTTP endpoint to POST records to") .define("http.batch.size", ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM, "Max records per HTTP batch"); } @Override public String version() { return "1.0.0"; } }

The Connect framework calls put() with batches of records. After successful delivery, offsets are committed automatically by the worker.

import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; public class HttpSinkTask extends SinkTask { private String httpEndpoint; private int batchSize; private HttpClient httpClient; @Override public void start(Map<String, String> props) { httpEndpoint = props.get("http.endpoint"); batchSize = Integer.parseInt(props.getOrDefault("http.batch.size", "100")); httpClient = HttpClient.newHttpClient(); } @Override public void put(Collection<SinkRecord> records) { if (records.isEmpty()) return; // Batch records and POST to HTTP endpoint List<String> payloads = records.stream() .map(r -> r.value().toString()) .collect(Collectors.toList()); // Send in chunks of batchSize Lists.partition(payloads, batchSize).forEach(batch -> { try { HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(httpEndpoint)) .POST(HttpRequest.BodyPublishers.ofString( "[" + String.join(",", batch) + "]")) .header("Content-Type", "application/json") .build(); HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); if (response.statusCode() >= 400) { throw new RetriableException("HTTP error: " + response.statusCode()); } } catch (IOException | InterruptedException e) { throw new RetriableException("Failed to send records to " + httpEndpoint, e); } }); } @Override public void stop() { /* close HTTP client */ } @Override public String version() { return "1.0.0"; } } Throw RetriableException for transient errors (network timeout, 5xx). The Connect framework will retry the put() call. Throw ConnectException for permanent errors that should send records to the DLQ.

Configure error handling in the connector deployment request so failed records go to a dead letter topic rather than stopping the task.

{ "name": "http-sink", "config": { "connector.class": "com.example.HttpSinkConnector", "topics": "order.created", "http.endpoint": "https://api.example.com/orders", "errors.tolerance": "all", "errors.deadletterqueue.topic.name": "order.created.DLQ", "errors.deadletterqueue.topic.replication.factor": "1", "errors.deadletterqueue.context.headers.enable": "true", "errors.log.enable": "true", "errors.log.include.messages": "true" } }

The commands below build and run the application. Make sure Docker is running locally before executing the image build steps.

<!-- pom.xml — shade plugin to create a fat JAR --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals><goal>shade</goal></goals> </execution> </executions> </plugin> # 1. Build the fat JAR mvn clean package -DskipTests # 2. Place in Connect's plugin.path directory mkdir -p /opt/kafka/plugins/http-connector cp target/http-connector-1.0.0.jar /opt/kafka/plugins/http-connector/ # 3. Set plugin.path in connect-distributed.properties echo "plugin.path=/opt/kafka/plugins" >> connect-distributed.properties # 4. Restart Kafka Connect worker # 5. Register the connector via REST API curl -X POST http://connect:8083/connectors \ -H "Content-Type: application/json" \ -d @connector-config.json # 6. Verify status curl http://connect:8083/connectors/http-sink/status

SMTs transform individual records in the Connect pipeline — add fields, rename keys, filter, or route. Implement Transformation<R> to create a custom one.

import org.apache.kafka.connect.transforms.Transformation; public class AddTimestampTransform<R extends ConnectRecord<R>> implements Transformation<R> { @Override public R apply(R record) { if (record.valueSchema() == null) { // Schemaless — value is a Map @SuppressWarnings("unchecked") Map<String, Object> value = (Map<String, Object>) record.value(); Map<String, Object> updated = new LinkedHashMap<>(value); updated.put("processed_at", Instant.now().toString()); return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), updated, record.timestamp()); } return record; // schema-based transform omitted for brevity } @Override public ConfigDef config() { return new ConfigDef(); } @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} } // Apply SMT in connector config { "transforms": "addTs", "transforms.addTs.type": "com.example.AddTimestampTransform" }