Advanced Kafka Producer and Consumer: Acks, Idempotency and Retry
The behavior of a Kafka producer depends critically on three parameters: acks, retries e
max.in.flight.requests.per.connection. Wrong configuration can lead to data loss
or to undetected duplicates. This guide covers every scenario with real-world configurations, explains the idempotent producer
introduced in Kafka 0.11 and how the consumer handles offset and reprocessing.
The Three Delivery Guarantees
Before getting into setup, it's critical to understand the three delivery modes that Kafka supports and their practical meaning:
- At-most-once: The message is sent once, without retry. If the broker does not receive it, it is permanently lost. Zero duplicates, possible data loss. Acceptable for non-critical metrics, debug logs, click-stream events where the loss of some events is tolerable.
- At-least-once: The producer tries again if there is an error. The message will arrive at least once, but it may arrive multiple times (duplicates) if the broker received it but did not send confirmation before the timeout. The consumer must be idempotent to handle duplicates. Most common scenario in production.
- Exactly-once: The message is processed exactly once, without loss and without duplicates. Requires idempotent producer (for at-exactly-once producer side) or Kafka transactions (for exactly-once end-to-end between producer and consumer). Significant performance overhead.
The Golden Rule
In a distributed system, it is impossible to guarantee exactly-once without overhead. 90% of systems Kafka in US production at-least-once with idempotent consumers (consumer-side deduplication via database or cache). Exactly-once via Kafka transactions is used for financial pipelines, billing systems, or anywhere duplication of an event causes real harm.
The acks Parameter: How many Confirmations to Wait for
The parameter acks of the producer defines how many replicas must acknowledge receipt first
that the producer considers the request completed:
acks=0 (Fire and Forget)
The producer sends the record and does not wait for any response from the broker. Maximum throughput, minimum latency, but no guarantee: if the broker is down or the record is written to a broker who then falls first to reply, the record is lost. The semantics are at-most-once.
// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record
acks=1 (Leader Only)
The producer waits for confirmation only from the partition's leading broker. Followers may not have yet replicated the record when confirmation arrives. If the leader falls immediately after sending the confirmation but before the followers have replied, the record is lost. Semantics at-least-once with risk of data loss in very short fault windows.
// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
// -> possibile duplicato se broker ha ricevuto ma non risposto
acks=all (or acks=-1): All ISRs
The producer waits for the record to be written on all the ISR (In-Sync Replicas) of the partition.
With min.insync.replicas=2 e replication.factor=3, this means that at least
2 replies (leader + 1 follower) must confirm. Only then does the producer receive the ack.
Semantics at-least-once with no data loss until at least min.insync.replicas
brokers are active.
// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all"); // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000); // attende fino a 60s se il broker è congestionato
// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)
// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova
The Duplicate Problem with Retry
Consider this scenario with acks=all e retries=3:
- The producer sends the R1 record to the lead broker
- The broker writes R1 to disk and sends ack to the producer
- The ack is lost (network times out before it reaches the producer)
- The producer, not receiving ack, enters
request.timeout.ms, thinks the writing has failed - The producer tries again and sends R1 again
- The broker receives R1 a second time and writes it as a separate record
- The topic now contains duplicate R1
This is the correct behavior of at-least-once: each data arrives at least once, but duplicates can arrive. To eliminate duplicates at the producer level, you use theidempotent producer.
The Idempotent Producer
The idempotent producer, introduced in Kafka 0.11 (2017), eliminates duplicates caused by producer retry. The mechanism is based on two concepts:
- Producer ID (PID): When the idempotent producer connects, the broker assigns a unique PID. The PID persists for the life of the producer; if the producer restarts, it gets a new PID.
- Sequence Number: each record sent carries a monotonically increasing sequence number (0, 1, 2, ...). The broker keeps track of the last sequence number received for each PID+partition. If a record arrives with sequence number already seen, it is silently discarded (deduplication on the broker side).
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Abilitare idempotenza
props.put("enable.idempotence", true);
// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException
// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)
Limit of the Idempotent Producer
The idempotent producer ensures that every record appears exactly once in the Kafka log for the producer's current session. If the producer restarts (new PID), an in-flight record it could be rewritten. And above all: it guarantees nothing about consumer processing. If the consumer processes a record and then crashes before committing the offset, on the next startup will reprocess the same record. For exactly-once end-to-end you need the transactional API.
max.in.flight.requests.per.connection and Sorting
The parameter max.in.flight.requests.per.connection (MIFR) controls how many production requests
they can be flying to a single broker at the same time. It has a critical impact on sorting
of messages in case of retry:
- MIFR=1: each request must be confirmed before sending another. Guaranteed sorting, but reduced throughput (no pipelining).
- MIFR > 1 without idempotence: pipelining active, higher throughput, but if batch N fails and batch N-1 is already in flight, after the retry of N the records appear in order N-1, N when N would have had to precede N-1. Sorting is no longer guaranteed.
-
MIFR ≤ 5 with idempotence: with
enable.idempotence=true, Kafka guarantees sorting even with up to 5 requests in flight, thanks to sequence numbers. It is the default value when idempotency is enabled and maximum supported to maintain warranty.
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1); // nessun pipelining
// Througput limitato ma ordine garantito
// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry
// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer
Complete Producer Configuration for Production
// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerFactory {
/**
* Crea un producer ottimizzato per throughput elevato
* con garanzie at-least-once e idempotenza abilitata.
*/
public static KafkaProducer<String, byte[]> createHighThroughputProducer(
String bootstrapServers) {
Properties props = new Properties();
// Connessione
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
// Durabilità e idempotenza
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// acks=all e retries=MAX impostati automaticamente
// Batching: aspetta fino a 10ms per accumulare records
// Riduce numero di richieste al broker, aumenta throughput
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale
// Compressione: snappy è bilanciata tra CPU e ratio
// lz4 per massima velocità; zstd per massimo ratio
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// Timeout: se il buffer è pieno, blocca fino a max.block.ms
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
// Request timeout: quanto aspettare risposta dal broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// Delivery timeout: timeout totale inclusi tutti i retry
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
return new KafkaProducer<>(props);
}
/**
* Crea un producer per eventi critici (financial, billing)
* con latenza minima e massime garanzie.
*/
public static KafkaProducer<String, byte[]> createLowLatencyProducer(
String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
// Massima durabilità
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Nessun batching: invia immediatamente ogni record
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
// Nessuna compressione: elimina latenza CPU
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
return new KafkaProducer<>(props);
}
}
Advanced Consumer: Commit and Reprocessing
Auto-commit: Simple but Risky
With enable.auto.commit=true (default), Kafka automatically commits the offset every
auto.commit.interval.ms milliseconds (default: 5000ms = 5 seconds). The problem:
the commit happens in the background, regardless of when the records are actually processed.
Problematic scenario with auto-commit:
- Poll returns 100 records with offset 1-100
- The consumer begins processing the records
- After 5 seconds the timer starts: offset 100 is automatically committed
- The consumer crashes after processing only records 1-60
- On reboot, the consumer starts at offset 100: records 61-100 were skipped (data loss)
Synchronous Manual Commit
// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");
// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);
// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000); // 5 minuti
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
elaboraOrdine(record);
}
// commitSync() bloca fino a conferma dal broker
// In caso di errore, lancia CommitFailedException
// Garantisce at-least-once: il commit avviene dopo l'elaborazione
consumer.commitSync();
}
} catch (CommitFailedException e) {
// Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
// Gestisci il rebalance e riprendi
log.error("Commit fallito, probabile rebalance", e);
}
Commit per Partition (Fine Granularity)
// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
// Raggruppa i record per partizione
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// Elabora il record
elaboraOrdine(record);
// Traccia l'offset per questa partizione
// IMPORTANTE: commita offset+1 (il prossimo record da leggere)
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
// Commit di tutti gli offset accumulati
consumer.commitSync(offsets);
}
Rebalance: How it works and how to manage it
Un rebalance occurs when the consumer group changes: a consumer enters, exits, or crashes. During the rebalance, all consumers in the group they stop reading (stop-the-world) and the group coordinator reassigns partitions. In recent versions of Kafka (2.4+) it is available the rebalance cooperatives (incremental cooperative rebalancing) which reduces the impact: only partitions that change assignees are revoked and reassigned.
// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
Collections.singletonList("ordini-effettuati"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Chiamato prima che le partizioni vengano riassegnate
// Commita gli offset delle partizioni che stiamo per perdere
log.info("Partizioni revocate: {}", partitions);
// Commit degli offset non ancora committati per le partizioni revocate
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
for (TopicPartition tp : partitions) {
if (currentOffsets.containsKey(tp)) {
toCommit.put(tp, currentOffsets.get(tp));
}
}
if (!toCommit.isEmpty()) {
consumer.commitSync(toCommit);
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Chiamato dopo che le nuove partizioni sono state assegnate
log.info("Partizioni assegnate: {}", partitions);
// Possiamo inizializzare state locale per le nuove partizioni
}
}
);
Consumer in Python with Advanced Offset Management
from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging
log = logging.getLogger(__name__)
class AdvancedKafkaConsumer:
"""Consumer Kafka con commit manuale e gestione rebalance."""
def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
self.topic = topic
self.pending_offsets: Dict[tuple, int] = {}
config = {
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
# Cooperative rebalance: meno interruzioni
"partition.assignment.strategy": "cooperative-sticky",
"session.timeout.ms": 30000,
"max.poll.interval.ms": 300000,
}
self.consumer = Consumer(config)
self.consumer.subscribe([topic], on_assign=self._on_assign,
on_revoke=self._on_revoke)
def _on_assign(self, consumer, partitions):
log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")
def _on_revoke(self, consumer, partitions):
"""Commita gli offset prima di perdere le partizioni."""
log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
to_commit = []
for p in partitions:
key = (p.topic, p.partition)
if key in self.pending_offsets:
to_commit.append(
TopicPartition(p.topic, p.partition,
self.pending_offsets[key])
)
if to_commit:
consumer.commit(offsets=to_commit, asynchronous=False)
def process_messages(self, handler_fn, batch_size: int = 100):
"""Loop principale di elaborazione."""
batch_count = 0
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
log.error(f"Errore Kafka: {msg.error()}")
continue
# Elabora il messaggio
try:
handler_fn(msg)
# Traccia l'offset (offset+1 = prossimo da leggere)
self.pending_offsets[(msg.topic(), msg.partition())] = \
msg.offset() + 1
batch_count += 1
except Exception as e:
log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
# Qui potresti implementare una DLQ (Dead Letter Queue)
# o skippare il record problematico
# Commit ogni N record
if batch_count >= batch_size:
self._commit_pending()
batch_count = 0
finally:
# Commit finale prima di chiudere
self._commit_pending()
self.consumer.close()
def _commit_pending(self):
"""Commita tutti gli offset pendenti."""
if not self.pending_offsets:
return
offsets = [
TopicPartition(topic, partition, offset)
for (topic, partition), offset in self.pending_offsets.items()
]
self.consumer.commit(offsets=offsets, asynchronous=False)
self.pending_offsets.clear()
log.debug(f"Committati {len(offsets)} offset")
Performance Tuning: Key Parameters
Producer side
| Parameter | Default | Effect |
|---|---|---|
linger.ms |
0 | Waits N ms to accumulate larger batches |
batch.size |
16384 (16KB) | Maximum batch size per partition |
buffer.memory |
33554432 (32MB) | Total in-memory buffer for all batches |
compression.type |
it is not | none/gzip/snappy/lz4/zstd |
Consumer side
| Parameter | Default | Effect |
|---|---|---|
fetch.min.bytes |
1 | Minimum data size to return from fetch |
fetch.max.wait.ms |
500 | Maximum wait if fetch.min.bytes not reached |
max.poll.records |
500 | Maximum records for single poll() |
max.partition.fetch.bytes |
1048576 (1MB) | Maximum data per partition per fetch |
Testing with EmbeddedKafka
Testing code that uses Kafka requires an available cluster. For unit and integration tests,
Spring Kafka delivers @EmbeddedKafka which starts an in-memory broker during testing:
// Dipendenza Maven
// <dependency>
// <groupId>org.springframework.kafka</groupId>
// <artifactId>spring-kafka-test</artifactId>
// <scope>test</scope>
// </dependency>
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;
@SpringBootTest
@EmbeddedKafka(
partitions = 3,
topics = {"ordini-test"},
brokerProperties = {
"auto.create.topics.enable=false",
"default.replication.factor=1"
}
)
class OrdineProducerTest {
@Autowired
private OrdineProducer producer;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void shouldSendOrderSuccessfully() throws Exception {
// Crea un consumer di test per verificare che il messaggio sia arrivato
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"test-group", "true", embeddedKafka
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");
// Invia un ordine
producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");
// Verifica che il messaggio sia stato ricevuto entro 3 secondi
ConsumerRecords<String, String> records =
KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));
assertThat(records.count()).isEqualTo(1);
ConsumerRecord<String, String> record = records.iterator().next();
assertThat(record.key()).isEqualTo("order-001");
assertThat(record.value()).contains("99.99");
consumer.close();
}
}
For testing with Testcontainers (more realistic, use a real Docker broker):
// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@Testcontainers
class OrdineProducerIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("apache/kafka:4.0.0")
);
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Test
void testConBrokerReale() {
// Test con broker Kafka 4.0 reale in Docker
// Garantisce che il comportamento corrisponda alla produzione
}
}
Next Steps in the Series
With advanced producers and consumers included, you can tackle the most complex challenge:
- Article 4 – Exactly-Once Semantics: Kafka transactions to secure exactly end-to-end processing, with transaction coordinator and throughput implications. Indispensable for financial pipelines.
- Article 5 – Registry Scheme: How to use Avro and Protobuf with Schema Registry to avoid schema incompatibilities between producers and consumers in different teams.
- Article 6 – Kafka Streams: embedded stream processing in Java with the Streams DSL, which internally uses the same producer and consumer APIs explored in this article.
Link with Other Series
- Architecture (From Microservices to Modular Monolith): Kafka producers and consumers as a concrete implementation of event-driven patterns in distributed architectures.
- Advanced Java: Deep dive into Kafka's Java API with thread safety, lifecycle management and testing with EmbeddedKafka.







