The Three Guarantees of Delivery in Kafka

Before tackling exactly-once, it is essential to understand the three delivery guarantees that a distributed messaging system can offer. In the event of network failure or broker crash, the behavior is radically different:

At-most-once: The message is sent a maximum of once. If there is an error, the message is lost without retry. Suitable for metrics only no criticism or high volume logs where some loss is acceptable.

At-least-once: the message is delivered at least once but may be duplicated in case of retry. And Kafka's default guarantee and e sufficient for most cases if the consumers are idempotent.

Exactly-once: The message is delivered exactly once. It is the most difficult guarantee to implement in distributed systems and comes at a cost in terms of latency and throughput.

What You Will Learn

  • How the idempotent producer eliminates duplicates at the producer level
  • The Transactional API: how to wrap multiple writes in an atomic transaction
  • The role of the Transaction Coordinator in the Kafka broker
  • Exactly-once end-to-end: read-process-write with Kafka Streams
  • The throughput implications and when EOS is really needed
  • Complete Java setup for production with EOS enabled

Step 1: The Idempotent Producer

The first layer of exactly-ounces and the idempotent producer, introduced in Kafka 0.11. The problem it solves: when a producer sends a message and does not receive the ack from the broker (timeout, network failure), he does not know if the message was received or not. Retry can therefore create duplicates.

The idempotent producer solves this by assigning each producer a Producer ID (PID) unique and one sequence number for each message. The broker keeps track of the last sequence number received from each PID and automatically deduplicates the retries.

// Configurazione producer idempotente in Java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// ABILITA producer idempotente
// Questo richiede automaticamente: acks=all, retries=MAX_INT, max.in.flight.requests.per.connection <= 5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Queste impostazioni vengono forzate automaticamente con idempotenza:
// props.put(ProducerConfig.ACKS_CONFIG, "all");          // auto
// props.put(ProducerConfig.RETRIES_CONFIG, MAX_VALUE);   // auto
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // auto (max)

// Timeout per la risposta del broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// L'uso e identico a un producer non-idempotente
ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",
    "order-123",
    "{\"orderId\": \"123\", \"amount\": 99.99}"
);

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.printf("Inviato: partition=%d, offset=%d%n",
        metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
    // Con idempotenza, i retry sono sicuri: nessun duplicato
    System.err.println("Errore invio: " + e.getMessage());
} finally {
    producer.close();
}

Step 2: The Transactional API

The idempotent producer guarantees no-duplicates for the writes of a single producer. But what if I have to write about two topics at the same time atomically? For example, a read-process-write that must:

  1. Read from topic-input
  2. Write the result on topic-output
  3. Commit the offset to __consumer_offsets

If it crashes between point 2 and point 3, the message was written to output ma the offset is not committed: upon restart, it reprocesses the same message and writes a duplicate. The Transactional API solves this by enveloping everyone and three steps in an atomic transaction.

// Producer transazionale completo
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// OBBLIGATORIO per transazioni: transactional.id univoco per ogni istanza producer
// Formato consigliato: {app-name}-{partition-number}
// IMPORTANTE: due producer con lo stesso transactional.id non possono essere attivi
// contemporaneamente (il secondo termina la transazione del primo - "fencing")
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-0");

// Il producer transazionale abilita automaticamente l'idempotenza
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

// OBBLIGATORIO: inizializza le transazioni PRIMA di qualsiasi operazione
// Questo registra il producer con il Transaction Coordinator
producer.initTransactions();

// Consumer per leggere l'input
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group");
// CRITICO per EOS: disabilita il commit automatico degli offset
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Leggi solo messaggi "committed" (non messaggi in transazione in corso)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("orders-input"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (records.isEmpty()) continue;

    try {
        // INIZIA la transazione
        producer.beginTransaction();

        for (ConsumerRecord<String, String> record : records) {
            // Processa il messaggio
            String processed = processOrder(record.value());

            // SCRIVI sull'output topic (dentro la transazione)
            producer.send(new ProducerRecord<>("orders-processed", record.key(), processed));
        }

        // COMMITTA gli offset del consumer DENTRO la transazione
        // Questo garantisce che il commit dell'offset e la scrittura sull'output
        // siano atomici: o entrambi succedono, o nessuno
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            offsetsToCommit.put(partition, new OffsetAndMetadata(lastOffset + 1));
        }
        producer.sendOffsetsToTransaction(offsetsToCommit, consumer.groupMetadata());

        // COMMIT della transazione: ora sia gli offset che i messaggi output
        // sono visibili ai consumer con isolation.level=read_committed
        producer.commitTransaction();

    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
        // Errori non recuperabili: termina il producer
        producer.close();
        throw e;
    } catch (KafkaException e) {
        // Errore recuperabile: abortisci la transazione e riprova
        producer.abortTransaction();
        // Il consumer rilegge i messaggi non committati alla prossima iterazione
    }
}

The Transaction Coordinator: How It Works

Il Transaction Coordinator and a component of the Kafka broker which manages the transaction lifecycle. Every transaction is recorded on a special internal topic: __transaction_state (partitioned by scalability, 50 partitions by default).

# Fasi di una transazione Kafka:

# 1. initTransactions() chiamato dal producer
#    -> Transaction Coordinator assegna epoch al transactional.id
#    -> Se c'era una transazione in corso con lo stesso ID (producer precedente):
#       - Termina la vecchia transazione (producer fencing)
#       - Garantisce che una sola istanza sia attiva

# 2. beginTransaction() - solo lato producer (no network call)

# 3. send() per ogni messaggio
#    -> Producer invia al partition leader con il PID e epoch
#    -> Partition leader scrive in un "transaction buffer" (non ancora visibile)

# 4. sendOffsetsToTransaction()
#    -> Transaction Coordinator registra quali consumer group offset
#       fanno parte di questa transazione

# 5. commitTransaction()
#    -> Producer invia PREPARE_COMMIT al Transaction Coordinator
#    -> TC scrive PREPARE_COMMIT su __transaction_state (durable)
#    -> TC notifica tutti i partition leader coinvolti
#    -> Partition leader scrivono COMMIT marker sul log
#    -> I messaggi diventano visibili ai consumer read_committed
#    -> TC scrive COMPLETE_COMMIT su __transaction_state

# 6. abortTransaction()
#    -> TC scrive ABORT marker: i messaggi in buffer sono ignorati
#    -> Consumer read_committed non vedono mai i messaggi abortiti

# Verificare lo stato delle transazioni (Kafka CLI):
kafka-transactions.sh --bootstrap-server kafka-1:9092 \
  --list
# TransactionalId     Producer Epoch  State        LastModifiedTime
# order-processor-0   5               Complete     2026-03-20 10:30:00

Producer Fencing: Protection from Zombie Instances

# Il fencing protegge da scenari di split-brain:
# - Producer A inizia una transazione (transactional.id="proc-0", epoch=5)
# - Producer A va in pausa (GC pause, network partition)
# - Il sistema avvia Producer B con lo stesso transactional.id
# - Producer B chiama initTransactions(): TC bumpa epoch a 6, invalida A
# - Producer A riprende e prova a fare commit: errore ProducerFencedException
#   (epoch 5 < epoch corrente 6)
# - Producer A non puo piu committare: safe!

# Implicazione pratica:
# Non usare lo stesso transactional.id per piu istanze simultanee
# In Kubernetes: usa il pod name come parte del transactional.id
# Ma: se un pod muore e un nuovo pod prende lo stesso name,
# il nuovo pod "recinta" correttamente il vecchio

# Configurazione consigliata per Kubernetes:
TRANSACTIONAL_ID="${APP_NAME}-${KAFKA_PARTITION}-${POD_NAME}"
# Oppure usa l'indice del pod nel StatefulSet:
TRANSACTIONAL_ID="${APP_NAME}-${POD_INDEX}"

EOS End-to-End with Kafka Streams

Kafka Streams (see Article 06 of the series) supports EOS natively with a single configuration: processing.guarantee=exactly_once_v2. The library automatically manages the transactional producer, the consumer read_committed and committing the offsets in the same transaction.

// Kafka Streams con EOS abilitato
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-enrichment-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");

// EOS v2: piu efficiente di v1 (introdotto in Kafka 2.5)
// v2 riduce il numero di transazioni necessarie rispetto a v1
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
    StreamsConfig.EXACTLY_ONCE_V2);

// Con EOS, il commit interval deve essere maggiore (overhead transazionale)
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 200);  // default 100ms

StreamsBuilder builder = new StreamsBuilder();

// Pipeline EOS: ogni record viene processato esattamente una volta
KStream<String, String> inputStream = builder.stream("orders-raw");

inputStream
    .filter((key, value) -> value != null && !value.isEmpty())
    .mapValues(value -> enrichOrder(value))
    .peek((key, value) -> log.info("Processed order: {}", key))
    .to("orders-enriched");

KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);

// Gestione graceful shutdown (importante per transazioni)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    // close() con timeout attende il completamento delle transazioni in corso
    streams.close(Duration.ofSeconds(10));
}));

streams.start();

Implications on Throughput and Latency

Exactly-once is not free. Here are the costs quantified in typical benchmarks:

# Benchmark approssimativo (Kafka 3.x/4.x su hardware standard):
# Throughput (messaggi/sec, payload 1KB):

# At-least-once (acks=all, no transactions):
# Producer: ~200.000 msg/s
# Consumer: ~500.000 msg/s

# Exactly-once con Transactional API:
# Producer: ~100.000 msg/s (-50%)
# Consumer (read_committed): ~400.000 msg/s (-20%)

# Latenza P99 aggiuntiva:
# initTransactions(): una tantum al startup (~100ms)
# beginTransaction() -> commitTransaction(): +5-15ms per batch
# Overhead dipende da: numero partizioni coinvolte, dimensione batch

# Ottimizzazioni per ridurre l'overhead EOS:
# 1. Aumenta linger.ms e batch.size per ridurre il numero di transazioni
# 2. Aumenta commit.interval.ms in Kafka Streams
# 3. Usa EXACTLY_ONCE_V2 (non v1): riduce le transazioni del 50%
# 4. Minimizza il numero di partizioni per transazione

# Configurazioni consigliate per EOS in produzione:
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);          # accoda per 5ms
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     # batch 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); # 64MB buffer

When to Use EOS and When Not to Use It

Checklist: EOS and Necessary?

  • Use EOS for: payments, financial transfers, inventory updates, critical status events (order created/cancelled) where a duplicate causes real business problems.
  • No need for EOS for: analytics logs, monitoring metrics, click/view events where some duplication or loss is acceptable. Use at-least-once with idempotent consumers.
  • Alternative to EOS: Make consumers idempotent (check if you have already processed the message via a unique ID in the database) + at-least-once. Often simpler and more efficient than EOS at Kafka level.

Please note: EOS does not cover external side effects

Exactly-once in Kafka guarantees semantics only within the Kafka cluster. If your consumer writes to an external database (PostgreSQL, Redis, etc.), Kafka EOS does not guarantee anything for those operations. For hybrid systems (Kafka + external database), you need additional patterns like Transactional Outbox or Saga.

Conclusions

The exactly-once semantics in Kafka is one of the most sophisticated implementations in the history of distributed systems. The idempotent producer eliminates duplicates at the network level, the Transactional API guarantees atomicity between multiple topics, and Kafka Streams encapsulates all of this in a single configuration. The cost in latency and throughput and real but manageable with the right optimizations.

The Complete Series: Apache Kafka

  • Article 01 — Apache Kafka Fundamentals: Topics, Partitions and Consumer Groups
  • Article 02 — KRaft in Kafka 4.0: Goodbye ZooKeeper
  • Article 03 — Advanced Kafka Producer and Consumer: Acks, Idempotency and Retry
  • Article 04 (this) — Exactly-Once Semantics in Kafka: Transactions and Coordination
  • Article 05 — Schema Registry: Avro, Protobuf and Schema Evolution
  • Article 06 — Kafka Streams: KTable and Windowing
  • Article 07 — Kafka Connect: Debezium CDC and DB Integration
  • Article 08 — Kafka + Apache Flink: Pipeline Analytics Real-Time