Contents

Kafka decouples producers from consumers — they can be deployed, updated, and scaled independently. This independence is a strength, but it also means a producer may write messages using a newer schema while an old consumer is still running, or vice versa. Without a shared schema contract, either side risks receiving bytes it cannot deserialize.

Confluent Schema Registry solves these problems by storing every schema version, assigning it a unique integer ID, and embedding that ID in every Kafka message (as the first 5 bytes of the payload). Before a producer can write, the serializer checks the Registry to confirm the schema is registered and compatible with the subject's configured mode.

Subject Naming Strategies

A subject is the namespace under which schema versions are grouped in the Registry. The naming strategy determines how subjects are named for a given topic and schema.

StrategySubject Name PatternBest For
TopicNameStrategy {topic}-key / {topic}-value Default. One schema per topic. All producers on a topic must use the same schema.
RecordNameStrategy {namespace}.{recordName} Same record type used across multiple topics. Schema is tied to the record, not the topic.
TopicRecordNameStrategy {topic}-{namespace}.{recordName} Multiple record types on one topic (e.g., an event bus). Each record type has its own subject per topic.
Set the strategy with value.subject.name.strategy in producer/consumer config. The default TopicNameStrategy is sufficient for the majority of use cases where one topic carries one event type.

Apache Avro schemas are defined in JSON using .avsc files. Avro is schema-based — the schema is always required for serialization and deserialization, which makes it an ideal pairing with Schema Registry. Avro data is compact (no field names in the binary payload) and supports rich evolution rules via default values.

Primitive Types
Complex Types
TypeDescriptionExample
recordNamed type with fields (like a struct or class)An Order, User, Payment event
enumA fixed set of string symbols["PENDING","PAID","CANCELLED"]
arrayList of items of a single typeList of line items in an order
mapKey-value map with string keysMetadata tags
unionA value that can be one of several types["null","string"] for optional fields
fixedFixed-length byte sequenceMD5 hash (16 bytes)
Full Example: Order Schema (v1)
{ "type": "record", "name": "Order", "namespace": "io.cscode.kafka.schema", "doc": "Represents a customer order placed in the system.", "fields": [ { "name": "orderId", "type": "string", "doc": "Unique identifier for the order (UUID)." }, { "name": "customerId", "type": "string", "doc": "Unique identifier for the customer." }, { "name": "amount", "type": "double", "doc": "Total order amount in USD." }, { "name": "status", "type": { "type": "enum", "name": "OrderStatus", "symbols": ["PENDING", "PAID", "SHIPPED", "CANCELLED"] }, "doc": "Current status of the order." }, { "name": "lineItems", "type": { "type": "array", "items": { "type": "record", "name": "LineItem", "fields": [ { "name": "productId", "type": "string" }, { "name": "quantity", "type": "int" }, { "name": "unitPrice", "type": "double" } ] } }, "doc": "Individual items included in this order." }, { "name": "metadata", "type": { "type": "map", "values": "string" }, "default": {}, "doc": "Arbitrary key-value metadata (e.g. source channel, promo code)." }, { "name": "createdAt", "type": "long", "logicalType": "timestamp-millis", "doc": "Epoch milliseconds when the order was created." }, { "name": "shippingAddress", "type": ["null", "string"], "default": null, "doc": "Optional shipping address. Null for digital products." } ] } Always define optional fields as a union of ["null", "T"] with "default": null. This pattern is the foundation of safe schema evolution — it allows the field to be added or removed while remaining compatible with older readers.

Schema Registry enforces a compatibility mode per subject. When a new schema version is registered, the Registry checks it against existing versions according to the configured mode. If the check fails, the registration is rejected with a 409 Conflict response.

Mode New schema is checked against What is allowed Upgrade order
BACKWARD Latest registered version Add fields with defaults; remove fields without defaults Upgrade consumers first, then producers
FORWARD Latest registered version Add fields without defaults; remove fields with defaults Upgrade producers first, then consumers
FULL Latest registered version Add fields with defaults only; remove fields with defaults only Either order — both old and new can read each other
BACKWARD_TRANSITIVE All previous versions Same as BACKWARD but validated against the entire history Upgrade consumers first
FORWARD_TRANSITIVE All previous versions Same as FORWARD but validated against the entire history Upgrade producers first
FULL_TRANSITIVE All previous versions Same as FULL but validated against the entire history Either order — safest for long-lived topics
NONE No check Any change is allowed — no compatibility enforcement Manual coordination required

The default global compatibility mode in Confluent Schema Registry is BACKWARD. You can override it globally or set a different mode per subject.

BACKWARD vs BACKWARD_TRANSITIVE: BACKWARD checks only against the immediately preceding version. BACKWARD_TRANSITIVE checks against every previously registered version. Use the transitive variants when consumers may be running versions that are many releases behind — for example in a multi-datacenter environment.

BACKWARD compatibility means the new schema can read data written with the old schema. A consumer upgraded to the new schema can still deserialize messages that were produced before the upgrade. This is the most common mode because consumers are usually upgraded before producers in a rolling deploy.

Rules
Example: v1 → v2 (add optional field)

Schema v1 — original Order schema registered to the subject orders-value:

{ "type": "record", "name": "Order", "namespace": "io.cscode.kafka.schema", "fields": [ { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "createdAt", "type": "long" } ] }

Schema v2 — adding an optional couponCode field with a default of null (BACKWARD-compatible):

{ "type": "record", "name": "Order", "namespace": "io.cscode.kafka.schema", "fields": [ { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "createdAt", "type": "long" }, { "name": "couponCode", "type": ["null", "string"], "default": null, "doc": "Optional promotional coupon code applied to the order." } ] }
Register v2 via Schema Registry REST API
# Register schema v2 for subject "orders-value" curl -X POST http://localhost:8081/subjects/orders-value/versions \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{ "schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"io.cscode.kafka.schema\",\"fields\":[{\"name\":\"orderId\",\"type\":\"string\"},{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"createdAt\",\"type\":\"long\"},{\"name\":\"couponCode\",\"type\":[\"null\",\"string\"],\"default\":null}]}" }' # Response — the new schema ID # {"id": 2}
Verify Compatibility Before Registering
# Check compatibility of a candidate schema against the latest version # Returns {"is_compatible": true} or {"is_compatible": false} curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{ "schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"io.cscode.kafka.schema\",\"fields\":[{\"name\":\"orderId\",\"type\":\"string\"},{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"createdAt\",\"type\":\"long\"},{\"name\":\"couponCode\",\"type\":[\"null\",\"string\"],\"default\":null}]}" }' Run the compatibility check in your CI/CD pipeline before merging schema changes. A failed compatibility check should block the merge the same way a failing unit test would.

FORWARD compatibility means the old schema can read data written with the new schema. An old consumer running the previous schema version can still deserialize messages produced by a newer producer. Unknown fields in the new message are simply ignored by the old reader. In this mode producers are upgraded first, then consumers catch up.

Rules
Example: v1 → v2 (add required field — FORWARD-compatible)

The new schema adds region without a default. Old consumers using GenericRecord will receive the field but simply ignore it if they do not reference it.

{ "type": "record", "name": "Order", "namespace": "io.cscode.kafka.schema", "fields": [ { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "createdAt", "type": "long" }, { "name": "region", "type": "string", "doc": "Geographic region of the order. Required as of v2." } ] }
Example: v1 → v2 (remove field with default — FORWARD-compatible)

Removing legacySource which had a default of "WEB". Old readers that expected this field will use the default value when reading the new (shorter) message.

{ "type": "record", "name": "Order", "namespace": "io.cscode.kafka.schema", "fields": [ { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "createdAt", "type": "long" } ] } When using GenericRecord on the consumer side, call record.hasField("region") before accessing newly added fields — old messages will not carry them.

FULL compatibility is the intersection of BACKWARD and FORWARD: the new schema must be readable by old consumers and old messages must be readable by the new consumer. This is the most restrictive mode but also the safest for rolling deploys — you can upgrade producers and consumers in any order without risk.

Rules (must satisfy both BACKWARD and FORWARD simultaneously)
Example: v1 → v2 under FULL (add field with default, remove field with default)

v1 had legacyChannel with "default": "UNKNOWN" and lacked priority.

{ "type": "record", "name": "Order", "namespace": "io.cscode.kafka.schema", "fields": [ { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "createdAt", "type": "long" }, { "name": "priority", "type": { "type": "enum", "name": "OrderPriority", "symbols": ["NORMAL","HIGH","URGENT"] }, "default": "NORMAL", "doc": "Processing priority. Old messages default to NORMAL." } ] }

Because priority has a default, new consumers reading old messages will receive NORMAL. Because legacyChannel had a default, old consumers reading new messages will use UNKNOWN. Both directions are covered — FULL compatibility is satisfied.

Set FULL compatibility at the subject level
curl -X PUT http://localhost:8081/config/orders-value \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{"compatibility": "FULL"}' # Response # {"compatibility": "FULL"} FULL_TRANSITIVE extends this guarantee to every version in the history, not just the latest. Use it for topics that retain data for months or years, where consumers may replay from the very beginning of the topic.

Schema Registry exposes a REST API on port 8081 by default. All schema management operations — registration, retrieval, compatibility checks, and deletion — are available over HTTP.

Register a Schema
# POST /subjects/{subject}/versions # Registers a new schema version. Returns the assigned schema ID. curl -X POST http://localhost:8081/subjects/orders-value/versions \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"io.cscode.kafka.schema\",\"fields\":[{\"name\":\"orderId\",\"type\":\"string\"},{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"createdAt\",\"type\":\"long\"}]}"}' # {"id": 1}
List All Subjects
# GET /subjects curl http://localhost:8081/subjects # ["orders-value","payments-value","users-key"]
List Versions for a Subject
# GET /subjects/{subject}/versions curl http://localhost:8081/subjects/orders-value/versions # [1, 2, 3]
Fetch Latest Schema
# GET /subjects/{subject}/versions/latest curl http://localhost:8081/subjects/orders-value/versions/latest # { # "subject": "orders-value", # "version": 3, # "id": 5, # "schema": "{...}" # }
Fetch Schema by Version Number
curl http://localhost:8081/subjects/orders-value/versions/2
Fetch Schema by Global ID
# GET /schemas/ids/{id} curl http://localhost:8081/schemas/ids/5 # {"schema": "{...}"}
Check Schema Compatibility
# Test candidate schema against a specific version curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/2 \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"io.cscode.kafka.schema\",\"fields\":[{\"name\":\"orderId\",\"type\":\"string\"},{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"createdAt\",\"type\":\"long\"},{\"name\":\"couponCode\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}' # {"is_compatible": true}
Set Subject-Level Compatibility
# PUT /config/{subject} # Override global compatibility for a specific subject curl -X PUT http://localhost:8081/config/orders-value \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{"compatibility": "BACKWARD_TRANSITIVE"}' # Check current subject config curl http://localhost:8081/config/orders-value # {"compatibilityLevel": "BACKWARD_TRANSITIVE"} # Check global default config curl http://localhost:8081/config # {"compatibilityLevel": "BACKWARD"}
Soft Delete a Schema Version
# DELETE /subjects/{subject}/versions/{version} # Soft delete — marks version as deleted but does not remove from storage curl -X DELETE http://localhost:8081/subjects/orders-value/versions/1 # 1 # Hard delete — permanently removes (requires soft delete first) curl -X DELETE "http://localhost:8081/subjects/orders-value/versions/1?permanent=true"
Confluent CLI (kafka-schema-registry-cli)
# List subjects confluent schema-registry subject list # Describe a subject (lists all versions) confluent schema-registry subject describe orders-value # Register from a file confluent schema-registry schema create \ --subject orders-value \ --schema ./src/main/avro/Order.avsc \ --type AVRO # Check compatibility confluent schema-registry schema compatibility \ --subject orders-value \ --schema ./src/main/avro/OrderV2.avsc \ --type AVRO # Update compatibility mode confluent schema-registry subject update orders-value \ --compatibility FULL_TRANSITIVE Prefer the Confluent CLI for local development and the REST API for CI/CD automation. Both interact with the same Schema Registry — the CLI is a thin wrapper around the REST endpoints.

The Confluent Kafka client library includes KafkaAvroSerializer and KafkaAvroDeserializer that handle schema registration, ID embedding, and schema lookup transparently. You only need to configure the Schema Registry URL.

Maven Dependencies
<!-- pom.xml --> <repositories> <repository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </repository> </repositories> <dependencies> <!-- Confluent Avro serializer --> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>7.6.0</version> </dependency> <!-- Apache Avro runtime --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.3</version> </dependency> </dependencies> <!-- Avro Maven Plugin for code generation from .avsc files --> <build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.3</version> <executions> <execution> <phase>generate-sources</phase> <goals><goal>schema</goal></goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory> <outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory> <stringType>String</stringType> </configuration> </execution> </executions> </plugin> </plugins> </build>
Producer Configuration
import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.kafka.clients.producer.*; import java.util.Properties; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Use KafkaAvroSerializer for both key and value props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); // Point to Schema Registry props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); // Auto-register schema on first produce (true by default in dev; set false in prod) props.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); // Use specific generated class (Order.java generated by avro-maven-plugin) KafkaProducer<String, Order> producer = new KafkaProducer<>(props); Order order = Order.newBuilder() .setOrderId("ord-001") .setCustomerId("cust-42") .setAmount(149.99) .setCreatedAt(System.currentTimeMillis()) .build(); ProducerRecord<String, Order> record = new ProducerRecord<>("orders", order.getOrderId(), order); producer.send(record, (metadata, ex) -> { if (ex != null) ex.printStackTrace(); else System.out.printf("Sent to partition %d offset %d%n", metadata.partition(), metadata.offset()); }); producer.close();
Consumer Configuration — Specific Class
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import org.apache.kafka.clients.consumer.*; import java.time.Duration; import java.util.Collections; import java.util.Properties; Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); // Deserialize directly into the generated Order class (not GenericRecord) props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("orders")); while (true) { ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, Order> rec : records) { Order order = rec.value(); System.out.printf("OrderId=%s Amount=%.2f%n", order.getOrderId(), order.getAmount()); } }
Consumer — GenericRecord (Schema-Agnostic)
import org.apache.avro.generic.GenericRecord; // Set specific.avro.reader=false (default) to receive GenericRecord props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("orders")); ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, GenericRecord> rec : records) { GenericRecord order = rec.value(); // Safe field access — check before reading newly added fields if (order.hasField("couponCode") && order.get("couponCode") != null) { System.out.println("Coupon: " + order.get("couponCode")); } System.out.println("Amount: " + order.get("amount")); } Set AUTO_REGISTER_SCHEMAS to false in production. This forces schema registration to go through a controlled CI/CD process and prevents accidentally registering an incompatible schema from a developer's local machine.

The following changes break compatibility regardless of mode and will cause runtime failures in either producers, consumers, or both.

1. Renaming a Field

Avro has no concept of field aliases that survive binary round-trips in a transparent way. Renaming customerId to userId is treated as removing the old field and adding a new one. Old consumers expecting customerId will receive null; they will not find the value under userId.

{ "name": "userId", "aliases": ["customerId"], "type": "string" }

The aliases field helps Avro's schema resolution during deserialization, but only when both writer and reader schemas are available to the decoder (e.g., in Avro file-based reads). In Kafka with Schema Registry, consumers fetch the writer schema by ID — aliases are not a reliable rename strategy. Introduce a new field, migrate consumers, deprecate and remove the old field across multiple releases.

2. Changing a Field's Type

Changing amount from double to string is a binary-incompatible change. Avro's wire encoding for these types is completely different — a consumer using the old schema will misread the bytes as a double and produce garbage or throw an exception.

{ "name": "amount", "type": "string" }

Avro does support a limited set of type promotions during schema resolution (e.g., intlong, floatdouble), but changing double to string is never safe. Always introduce a new field with a new name if you need a different type.

3. Removing a Required Field

Removing a field that has no default value is forbidden under BACKWARD and FULL modes. Even under FORWARD mode, producers that still run the old code will continue to produce the field — but consumers using the new schema that drops it will silently lose that data because it will no longer be mapped to any field in the deserialized object.

4. Skipping Version Registrations

Going from v1 directly to v3 without registering v2 bypasses the transitive checks. Under BACKWARD_TRANSITIVE the Registry checks all registered versions — but if v2 was never registered, there is a gap in compatibility guarantees and consumers running v2 code may encounter unexpected messages.

5. Using NONE Compatibility in Production

Setting NONE disables all Registry enforcement and delegates compatibility entirely to manual coordination. In a team environment or microservices architecture, this is equivalent to having no schema contract at all — it is only appropriate for exploratory development topics that are never replayed.

6. Reordering Enum Symbols

Avro encodes enum values as integers (the index in the symbols list). Reordering ["PENDING","PAID","SHIPPED"] to ["PAID","PENDING","SHIPPED"] silently maps 0 to a different symbol — old messages are misread without any error.

The safest policy for long-lived production topics: use FULL_TRANSITIVE, always add fields with defaults, never remove fields without a two-phase deprecation cycle, and enforce schema registration in CI/CD.

Schema Registry supports three serialization formats: Avro, Protobuf, and JSON Schema. While Avro is the most widely used with Kafka, each format has specific strengths.

Aspect Avro Protobuf JSON Schema
Wire format Compact binary (no field names) Compact binary (field numbers) JSON text (verbose)
Evolution mechanism Default values + schema resolution rules Field numbers — never reuse them additionalProperties, required array
Code generation avro-maven-plugin / avro-tools protoc compiler jsonschema2pojo or manual
Human readable .avsc JSON — moderate .proto IDL — concise JSON — fully readable
Null safety Explicit — requires union ["null","T"] Every field is optional by default in proto3 Controlled via required array
Best for High-throughput Kafka pipelines with Java/Python consumers Cross-language gRPC services + Kafka with polyglot consumers REST APIs where JSON payloads also flow through Kafka
Protobuf — Field Numbers for Evolution

Protobuf schema evolution relies on stable field numbers. As long as you never reuse a field number for a different type, old and new messages are always forward and backward compatible.

# order.proto syntax = "proto3"; package io.cscode.kafka.schema; message Order { string order_id = 1; string customer_id = 2; double amount = 3; int64 created_at = 4; // Added in v2 — safe: new field number 5, never reused string coupon_code = 5; // NEVER do this — reserved prevents accidental reuse of field number 3 reserved 6; reserved "legacy_channel"; }

Register a Protobuf schema with Schema Registry by setting "schemaType": "PROTOBUF" in the registration request body.

curl -X POST http://localhost:8081/subjects/orders-value/versions \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{ "schemaType": "PROTOBUF", "schema": "syntax = \"proto3\"; package io.cscode.kafka.schema; message Order { string order_id = 1; string customer_id = 2; double amount = 3; int64 created_at = 4; string coupon_code = 5; }" }'
JSON Schema — $schema Validation

JSON Schema does not have a compact binary encoding — messages are plain JSON. It is suitable when consumers are HTTP-based services, the payload is already JSON, and you want schema validation without a separate serialization layer.

{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Order", "type": "object", "properties": { "orderId": { "type": "string" }, "customerId": { "type": "string" }, "amount": { "type": "number", "minimum": 0 }, "createdAt": { "type": "integer" }, "couponCode": { "type": "string" } }, "required": ["orderId", "customerId", "amount", "createdAt"], "additionalProperties": false } # Register JSON Schema curl -X POST http://localhost:8081/subjects/orders-value/versions \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{ "schemaType": "JSON", "schema": "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"title\":\"Order\",\"type\":\"object\",\"properties\":{\"orderId\":{\"type\":\"string\"},\"customerId\":{\"type\":\"string\"},\"amount\":{\"type\":\"number\"},\"createdAt\":{\"type\":\"integer\"}},\"required\":[\"orderId\",\"customerId\",\"amount\",\"createdAt\"],\"additionalProperties\":false}" }'
When to Choose Each Format
All three formats are fully supported by Schema Registry's compatibility checks and REST API. The serializer/deserializer class changes (KafkaAvroSerializerKafkaProtobufSerializer / KafkaJsonSchemaSerializer), but the Registry workflow — register, check, embed ID — is identical.