Kafka Connect: Code-Free Integration

Every messaging system needs to move data to and from external systems: databases, object storage, Elasticsearch, Salesforce, legacy systems. Without Kafka Connect, each integration requires custom code: a producer that reads from the database, a consumer who writes on S3. Kafka Connect standardizes this with a connector framework configurable that automatically manage scalability, fault tolerance and exactly-once.

In 2026, the catalog of available connectors exceeds 1000 connectors between open-source and commercial (Confluent Hub). The most common use cases are: CDC (Change Data Capture) from relational databases via Debezium, replication towards S3/GCS for the data lake, sync to Elasticsearch for full-text search.

What You Will Learn

  • Kafka Connect architecture: worker, connector, task, offset management
  • Deployment mode: standalone vs distributed
  • Debezium PostgreSQL Source Connector: setup, WAL configuration, snapshot
  • Debezium MySQL Source Connector: binlog replication
  • Sink Connectors: S3, Elasticsearch, JDBC
  • Single Message Transforms (SMT): filter, rename, enrich records
  • Monitoring: JMX metrics and error management in production

Architecture: Worker, Connector and Task

# Componenti di Kafka Connect:

# WORKER: processo JVM che esegue i connector
# Distributed mode: piu worker formano un cluster (consigliato per produzione)
# Standalone mode: singolo worker, utile per sviluppo/test

# CONNECTOR: unita logica di integrazione (es: "connetti PostgreSQL orders a Kafka")
# Ogni connector crea 1 o piu TASK
# Source connector: legge da un sistema esterno -> scrive su Kafka
# Sink connector: legge da Kafka -> scrive su un sistema esterno

# TASK: unita di parallelismo effettiva (thread)
# Ogni task e responsabile di un sottoinsieme dei dati
# Esempio: connector con 4 task per PostgreSQL con 4 tabelle = 1 task per tabella

# OFFSET: traccia la posizione nel source (WAL LSN per Postgres, binlog position per MySQL)
# Salvato su topic interno: __connect-offsets
# Garantisce che dopo un restart, la replica riprende da dove si era fermata

# Deploy del Kafka Connect cluster (Docker Compose):
# kafka-connect:
#   image: confluentinc/cp-kafka-connect:7.6.0
#   environment:
#     CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
#     CONNECT_REST_PORT: 8083
#     CONNECT_GROUP_ID: "kafka-connect-cluster"
#     CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
#     CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
#     CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
#     CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
#     CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
#     CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
#     # Plugin path: directory con i JAR dei connector
#     CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
#   volumes:
#     - ./connect-plugins:/usr/share/confluent-hub-components

Debezium: Change Data Capture from PostgreSQL

Debezium It is the most used CDC connector with Kafka. Instead of doing database polling (which impacts performance and does not capture deletes), Debezium reads the directly Write-Ahead Log (WAL) of PostgreSQL: each INSERT, UPDATE and DELETE generates a Kafka event with the data before and after the modification.

PostgreSQL configuration for CDC

-- PostgreSQL: abilitare logical replication per Debezium

-- 1. postgresql.conf: cambia wal_level (richiede restart)
-- wal_level = logical
-- max_wal_senders = 10
-- max_replication_slots = 10

-- 2. Crea un utente dedicato per Debezium con i permessi minimi necessari
CREATE ROLE debezium_user WITH
  LOGIN
  PASSWORD 'secure_password_here'
  REPLICATION;  -- Necessario per leggere il WAL

-- 3. Assegna permessi di SELECT sulle tabelle da replicare
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;

-- 4. Permesso per leggere i pg_publication (richiesto da Debezium 2.x)
GRANT USAGE ON SCHEMA public TO debezium_user;

-- 5. Crea una publication (Debezium 2.x usa pgoutput come plugin)
-- Debezium la crea automaticamente, ma puoi crearla manualmente:
CREATE PUBLICATION debezium_pub FOR TABLE orders, users, products;

-- 6. Verifica che logical replication sia abilitata
SHOW wal_level;  -- deve essere: logical

Debezium PostgreSQL Connector configuration

{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres-host",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "secure_password_here",
    "database.dbname": "ecommerce",
    "database.server.name": "ecommerce",

    "plugin.name": "pgoutput",
    "publication.name": "debezium_pub",
    "slot.name": "debezium_slot",

    "table.include.list": "public.orders,public.users,public.products",

    "topic.prefix": "cdc",

    "decimal.handling.mode": "double",
    "binary.handling.mode": "base64",

    "snapshot.mode": "initial",

    "heartbeat.interval.ms": "10000",
    "slot.drop.on.stop": "false",

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "errors.retry.timeout": "60000",
    "errors.retry.delay.max.ms": "5000",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq.postgres-source-connector",
    "errors.deadletterqueue.topic.replication.factor": "3"
  }
}
# Registra il connector via REST API
curl -X POST http://kafka-connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d @postgres-source-connector.json

# Verifica lo stato del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status
# {
#   "name": "postgres-source-connector",
#   "connector": {"state": "RUNNING", "worker_id": "kafka-connect-1:8083"},
#   "tasks": [{"id": 0, "state": "RUNNING", "worker_id": "kafka-connect-1:8083"}]
# }

# Struttura del messaggio CDC in output (con Debezium Avro):
# Topic: cdc.public.orders
# Key: {"id": 123}
# Value (con ExtractNewRecordState unwrap):
# {
#   "id": 123,
#   "user_id": "user-42",
#   "amount": 99.99,
#   "__deleted": "false",     # "true" per DELETE events (con delete.handling.mode=rewrite)
#   "__op": "u",              # c=insert, u=update, d=delete, r=read (snapshot)
#   "__source_ts_ms": 1710928200000
# }

Debezium MySQL: Binlog Configuration

-- MySQL: abilitare binlog in formato ROW
-- Aggiungi in /etc/mysql/mysql.conf.d/mysqld.cnf:
-- [mysqld]
-- server-id         = 1
-- log_bin           = mysql-bin
-- binlog_format     = ROW
-- binlog_row_image  = FULL
-- expire_logs_days  = 10

-- Crea utente Debezium MySQL
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-host",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.server.id": "1",
    "topic.prefix": "cdc-mysql",
    "database.include.list": "shop",
    "table.include.list": "shop.orders,shop.customers",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-1:9092",
    "schema.history.internal.kafka.topic": "__debezium-schema-history",
    "snapshot.mode": "initial",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}

Single Message Transforms (SMT)

Le Single Message Transforms they are applicable light transformations to every record in the Connect pipeline, without the need for Kafka Streams. They are chainable (SMT chain) and very useful for simple cases like renaming fields, adding timestamps, filter records by value or routing to different topics.

{
  "transforms": "renameField,addTimestamp,filterDeleted,routeByTable",

  "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.renameField.renames": "user_id:userId,order_id:orderId",

  "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addTimestamp.timestamp.field": "kafka_ingestion_time",

  "transforms.filterDeleted.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.filterDeleted.condition": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
  "transforms.filterDeleted.condition.name": "__deleted",
  "transforms.filterDeleted.negate": "true",

  "transforms.routeByTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.routeByTable.regex": "cdc\\.public\\.(.*)",
  "transforms.routeByTable.replacement": "db.changes.$1"
}

Sink Connector: From Kafka to S3

{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "cdc.public.orders,cdc.public.users",

    "s3.region": "eu-west-1",
    "s3.bucket.name": "data-lake-raw",
    "s3.part.size": "5242880",

    "flush.size": "10000",
    "rotate.interval.ms": "600000",
    "rotate.schedule.interval.ms": "3600000",

    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",

    "schema.compatibility": "FULL",

    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en_US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "created_at",

    "s3.credentials.provider.class": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",

    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Monitoring and Error Management

# Metriche JMX importanti per Kafka Connect

# kafka.connect:type=connector-metrics,connector="{connector-name}"
# -> source-record-poll-total: record letti dalla sorgente
# -> source-record-poll-rate: rate di lettura (records/sec)
# -> source-record-write-total: record scritti su Kafka
# -> source-record-write-rate: rate di scrittura

# kafka.connect:type=connector-task-metrics
# -> offset-commit-success-percentage: % di commit offset riusciti
# -> batch-size-avg: dimensione media dei batch
# -> task-error-total: numero di errori totali

# Prometheus scraping via JMX Exporter
# jmx_prometheus_javaagent.jar configurato con kafka-connect-rules.yml

# Verifica ritardo di replica (lag) per Debezium
# Confronta il timestamp del WAL con il timestamp dell'evento su Kafka
# In caso di lag, controlla:
# - Slot replication lag su PostgreSQL:
SELECT
    slot_name,
    active,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
# Se lag_bytes > 1GB: il connector non riesce a tenere il passo

# Lista connector e loro stato
curl http://kafka-connect:8083/connectors?expand=status | jq .

# Riavvia un connector (in caso di errore)
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/restart

# Riavvia solo un task specifico
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/tasks/0/restart

# Pause/Resume
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/pause
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/resume

Attention: Replication Slot and Disk Space

A PostgreSQL replication slot holds all WAL records as long as the connector he consumed them. If the connector goes offline for hours or days, the slot accumulates WAL indefinitely and can fill the PostgreSQL server's disk, leading to a database crash. Always set up an alert on pg_replication_slots and consider setting up max_slot_wal_keep_size (PostgreSQL 13+) how protection.

Conclusions

Kafka Connect with Debezium and the standard stack for Change Data Capture in 2026: captures every change from the database with millisecond latency, without impacting the database production database performance, with fault tolerance and exactly-once semantics. The combination with SMT allows light transformations without adding components separated to the pipeline.

The Complete Series: Apache Kafka

  • Article 01 — Apache Kafka Fundamentals
  • Article 02 — KRaft in Kafka 4.0
  • Article 03 — Advanced Producer and Consumer
  • Article 04 — Exactly-Once Semantics in Kafka
  • Article 05 — Schema Registry: Avro and Protobuf
  • Article 06 — Kafka Streams: KTable and Windowing
  • Article 07 (this) — Kafka Connect: Source/Sink Connector and Debezium CDC
  • Article 08 — Kafka + Apache Flink: Pipeline Analytics Real-Time