The Scheme Governance Problem

Imagine a Kafka system with 20 teams producing messages on 50 different topics. Without a registry schema, each team decides the message format independently: today JSON with field user_id, tomorrow they change to userId. Consumers who read that topic silently break down. The data that arrives on the data lake and inconsistent. Debugging an inconsistency becomes an investigation forensics between code versions from different teams.

Lo Schema Registry (Confluent, open-source) solves this with a formal contract: the producer registers the schema, the consumer validates the message received conforms to a compatible scheme. Any attempt to send data incompatible fails immediately with an explicit error, instead of corrupting the downstream system silently.

What You Will Learn

  • Schema Registry architecture: how it interacts with producers and consumers
  • Avro vs Protobuf vs JSON Schema: when to use each
  • Compatibility types: backward, forward, full, transitive
  • Schema evolution: add/remove fields without breaking change
  • Java setup with Avro and Confluent's Avro serializer
  • Best practices: subject naming, versioning, global vs per-subject config

Architecture: How the Schema Registry Works

The Schema Registry is a separate HTTP service from Kafka that exposes a REST API. Each scheme is identified by a subject (normally the name of the topic + suffix -value o -key) and a numerical version. Communication happens like this:

# Flusso producer:
# 1. Producer crea un ProducerRecord con un oggetto Avro/Protobuf
# 2. L'Avro Serializer fa una chiamata HTTP GET al Registry:
#    "Esiste lo schema X per il subject 'orders-value'?"
# 3. Se non esiste (o e cambiato), POST per registrarlo:
#    Schema Registry valida la compatibilita con le versioni precedenti
#    Se compatibile: OK, assegna schema ID intero (es: 42)
# 4. Il serializer scrive il messaggio come:
#    [0x00] [schema_id: 4 bytes] [payload Avro serializzato]
# 5. I primi 5 byte identificano il formato "magic byte + schema ID"

# Flusso consumer:
# 1. Consumer riceve i byte del messaggio
# 2. L'Avro Deserializer legge i primi 5 byte: magic byte + schema ID
# 3. Chiama il Registry: GET /schemas/ids/42
# 4. Registry risponde con lo schema (cachato localmente dopo la prima chiamata)
# 5. Deserializza il payload usando lo schema writer (come e stato scritto)
#    e lo schema reader (come il consumer si aspetta di leggerlo)
# 6. Avro fa la conversione automatica se gli schemi sono compatibili

# Struttura del payload serializzato:
# | 0x00 | schema_id (4 bytes BE) | avro binary payload |
#   ^magic byte     ^es: 0x0000002A = 42

# Avvia lo Schema Registry (via Docker)
docker run -d \
  -p 8081:8081 \
  -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
  -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="kafka-1:9092,kafka-2:9092" \
  -e SCHEMA_REGISTRY_KAFKASTORE_TOPIC="_schemas" \
  confluentinc/cp-schema-registry:7.6.0

Avro: Schema and Serialization

Apache Avro It is the most used serialization format with Kafka for its compactness and strong support in the Confluent ecosystem. The Avro scheme It is defined in JSON and is saved in the Registry.

// Schema Avro per un ordine - orders-value v1
{
  "type": "record",
  "namespace": "dev.federicocalo.orders",
  "name": "Order",
  "doc": "Schema per gli ordini e-commerce",
  "fields": [
    {
      "name": "order_id",
      "type": "string",
      "doc": "Identificatore univoco dell'ordine"
    },
    {
      "name": "user_id",
      "type": "string"
    },
    {
      "name": "amount",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 10,
        "scale": 2
      }
    },
    {
      "name": "currency",
      "type": "string",
      "default": "EUR"
    },
    {
      "name": "created_at",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
      },
      "default": "PENDING"
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "product_id", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "unit_price", "type": "double"}
          ]
        }
      }
    }
  ]
}
// Producer con Avro serializer e Schema Registry (Maven: io.confluent:kafka-avro-serializer)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Avro Value Serializer (registra automaticamente lo schema nel Registry)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "io.confluent.kafka.serializers.KafkaAvroSerializer");

// URL dello Schema Registry
props.put("schema.registry.url", "http://schema-registry:8081");

// Opzionale: autenticazione al Registry (se con Confluent Cloud)
// props.put("basic.auth.credentials.source", "USER_INFO");
// props.put("basic.auth.user.info", "api-key:api-secret");

KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);

// Carica lo schema da file .avsc
Schema schema = new Schema.Parser().parse(
    new File("src/main/avro/Order.avsc")
);

// Crea un record Avro generico
GenericRecord order = new GenericData.Record(schema);
order.put("order_id", UUID.randomUUID().toString());
order.put("user_id", "user-42");
order.put("amount", new BigDecimal("99.99"));
order.put("currency", "EUR");
order.put("created_at", Instant.now().toEpochMilli());
order.put("status", new GenericData.EnumSymbol(schema.getField("status").schema(), "CONFIRMED"));

List<GenericRecord> items = new ArrayList<>();
GenericRecord item = new GenericData.Record(schema.getField("items").schema().getElementType());
item.put("product_id", "prod-789");
item.put("quantity", 2);
item.put("unit_price", 49.99);
items.add(item);
order.put("items", items);

producer.send(new ProducerRecord<>("orders", order.get("order_id").toString(), order));
producer.flush();

Compatibility Rules

A subject's compatibility rule determines which changes to the schema are allowed. This is the most critical feature of the Registry: getting this wrong parameter can break consumers in production.

# Tipi di compatibilita disponibili:

# BACKWARD (default): le nuove versioni dello schema possono leggere
# i dati scritti con la versione precedente.
# Operazioni consentite: aggiungere campi CON default, rimuovere campi senza default
# Use case: consumer viene aggiornato PRIMA del producer
# Esempio: aggiungi campo "shipping_address" con default ""

# FORWARD: le versioni precedenti dello schema possono leggere
# i dati scritti con la nuova versione.
# Operazioni consentite: aggiungere campi senza default, rimuovere campi CON default
# Use case: producer viene aggiornato PRIMA del consumer

# FULL: sia backward che forward. La piu restrittiva.
# Operazioni consentite: SOLO aggiungere/rimuovere campi CON default
# Use case: non sai quale viene aggiornato prima

# NONE: nessun controllo di compatibilita (pericoloso in produzione)

# BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE:
# Come le versioni non-transitive ma la compatibilita e verificata
# rispetto a TUTTE le versioni precedenti, non solo l'ultima

# Configurazione globale e per-subject via REST API:
# Configurazione globale (default per tutti gli subject nuovi):
curl -X PUT http://schema-registry:8081/config \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "FULL"}'

# Configurazione per subject specifico (override del globale):
curl -X PUT http://schema-registry:8081/config/orders-value \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "BACKWARD"}'

# Verifica compatibilita prima di registrare (dry-run):
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/json" \
  -d '{"schema": "{\"type\": \"record\", \"name\": \"Order\", ...}"}'
# Response: {"is_compatible": true}

Evolution Scheme: Practical Example

// Schema v1 (attuale in produzione)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"}
  ]
}

// Schema v2 - AGGIUNTA backward-compatible:
// Aggiungi campo con default -> OK con BACKWARD e FULL
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"},
    // NUOVO: campo con default (null per Avro union o valore stringa)
    {"name": "discount_code", "type": ["null", "string"], "default": null}
  ]
}

// Schema v3 - RIMOZIONE backward-compatible:
// Rimuovi campo che aveva default -> OK con BACKWARD
// (Consumer con schema v2 riceveranno il default per discount_code quando leggono v3)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"}
    // discount_code rimosso: OK perche aveva default null
  ]
}

// Schema v4 - CAMBIAMENTO di tipo NON COMPATIBILE:
// Cambiare "amount" da double a string ROMPE backward e forward
// -> Rifiutato da Schema Registry con BACKWARD/FORWARD/FULL
// -> Devi usare un nuovo subject (nuovo topic) oppure NONE (rischio!)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "string"},  // BREAKING: double -> string
    {"name": "currency", "type": "string", "default": "EUR"}
  ]
}

Protobuf as an Alternative to Avro

Protocol Buffers (Protobuf) and the choice of Google and many teams who prefer a more expressive type system and a data-separated IDL. Supported by Confluent's Schema Registry since version 5.5.

// orders.proto - Schema Protobuf per ordini
syntax = "proto3";

package dev.federicocalo.orders;

option java_package = "dev.federicocalo.orders";
option java_outer_classname = "OrderProto";

message Order {
  string order_id = 1;
  string user_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at_ms = 5;  // timestamp in milliseconds
  OrderStatus status = 6;
  repeated OrderItem items = 7;

  // Campo aggiunto in v2: backward-compatible in Protobuf
  // (campi non presenti vengono ignorati)
  string shipping_address = 8;
}

enum OrderStatus {
  PENDING = 0;
  CONFIRMED = 1;
  SHIPPED = 2;
  DELIVERED = 3;
  CANCELLED = 4;
}

message OrderItem {
  string product_id = 1;
  int32 quantity = 2;
  double unit_price = 3;
}
// Producer con Protobuf serializer
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");

KafkaProducer<String, Order> producer = new KafkaProducer<>(props);

Order order = Order.newBuilder()
    .setOrderId(UUID.randomUUID().toString())
    .setUserId("user-42")
    .setAmount(99.99)
    .setCurrency("EUR")
    .setCreatedAtMs(Instant.now().toEpochMilli())
    .setStatus(OrderStatus.CONFIRMED)
    .addItems(OrderItem.newBuilder()
        .setProductId("prod-789")
        .setQuantity(2)
        .setUnitPrice(49.99)
        .build())
    .build();

producer.send(new ProducerRecord<>("orders-proto", order.getOrderId(), order));

Avro vs Protobuf vs JSON Schema: When to Use Which

Comparison Table: Serialization Formats

  • Avro: Compact, efficient for Hadoop/Spark,scheme evolution well documented, great for data engineering. Lack of backwards compatibility by type: Changing a type requires strategy. Choose if you use Confluent Platform or have pipeline to data lake (native Avro on Parquet).
  • Protobuf: Excellent for gRPC microservices, more expressive types (oneof, map, native timestamp), better IDE support. The field numbers guarantee natural backward compatibility (add a new field = new number). Choose if you already have Protobuf in gRPC or prefer typed IDL.
  • JSON Schema: Interoperable, human readable, none compilation. Larger payload. Choose for teams with less IaC experience or for APIs that need to be readable without tools.

Best Practices for Scheme Governance

# 1. Naming convention per i subject
# Default (TopicNameStrategy): {topic-name}-value, {topic-name}-key
# Record name strategy (piu flessibile): {namespace}.{record-name}
# Configura su producer:
# props.put("value.subject.name.strategy",
#     "io.confluent.kafka.serializers.subject.RecordNameStrategy")

# 2. Schema versioning in CI/CD
# Aggiorna lo schema nel repository -> PR review -> test compatibilita
# pre-merge -> register nel Registry di staging -> deploy producer

# Script di verifica compatibilita in CI:
#!/bin/bash
SCHEMA_FILE="src/main/avro/Order.avsc"
SUBJECT="orders-value"
REGISTRY_URL="http://schema-registry-staging:8081"

# Verifica compatibilita prima del merge
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" \
  -X POST "${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
  -H "Content-Type: application/json" \
  -d "{\"schema\": $(cat $SCHEMA_FILE | jq -Rs .)}")

if [ "$RESPONSE" != "200" ]; then
  echo "ERRORE: Schema non compatibile con la versione in produzione"
  exit 1
fi

echo "Schema compatibile: OK"

# 3. Usa schema IDs fissi nei test (non schema content)
# Questo rende i test stabili anche se lo schema evolve

# 4. Documenta ogni campo con "doc" in Avro
# Il Registry mostra la documentazione nella UI

# 5. Backup del Registry:
# Lo Schema Registry persiste gli schemi su Kafka (_schemas topic)
# Il backup del topic = backup degli schemi
kafka-console-consumer.sh \
  --bootstrap-server kafka-1:9092 \
  --topic _schemas \
  --from-beginning \
  > schemas-backup-$(date +%Y%m%d).json

Conclusions

The Schema Registry transforms Kafka from a messaging system into a real one data platform with governance: formal contracts between producers and consumers, evolution controlled patterns, explicit errors instead of silent corruption. In an organization with multiple teams, it is one of the most critical components to be configured correctly before going into production.

The Complete Series: Apache Kafka

  • Article 01 — Apache Kafka Fundamentals
  • Article 02 — KRaft in Kafka 4.0
  • Article 03 — Advanced Producer and Consumer
  • Article 04 — Exactly-Once Semantics in Kafka
  • Article 05 (this) — Schema Registry: Avro, Protobuf and Schema Evolution
  • Article 06 — Kafka Streams: KTable and Windowing
  • Article 07 — Kafka Connect: Debezium CDC and DB Integration
  • Article 08 — Kafka + Apache Flink: Pipeline Analytics Real-Time