Apache Kafka Fundamentals: Topics, Partitions, Offsets and Consumer Groups
Kafka is not simply a message queue: it is a distributed commit log designed to sustain millions of events per second with guaranteed durability. In this fundamental guide, discover the internal structure of topics and partitions and how tracking offsets work precise location, and why the consumer group is the key mechanism for scaling consumption in parallel.
Why Kafka is Different from a Traditional Coda
When designing a distributed system that needs to handle streams of events in real time, the first temptation is to use a classic message queue like RabbitMQ or ActiveMQ. These solutions work well for simple scenarios, but have a key structural limitation: once the consumer consumed the message, the message is deleted. There is no possibility of rereading it, of having more independent consumers who process it differently, or to replay the entire history of events.
Apache Kafka was born in 2011 in LinkedIn with a radically different philosophy: messages (called record) are written in a log append-only and remain there for a configurable period (default: 7 days). Different consumers can read the same records at times different, each keeping track of its position via theoffset. This pattern turns Kafka into much more than a queue: becomes the source of truth for the entire event history of your system.
Kafka in 2026: Key Numbers
- Used for over 80% of Fortune 500 companies for streaming use cases
- Kafka 4.0 (March 2025) permanently removed ZooKeeper, moving to KRaft
- Theoretical throughput: 1 million+ messages/second for brokers (commodity hardware)
- Confluent Cloud: managed Kafka available on AWS, GCP, Azure with latency < 10ms p99
- Ecosystem: 200+ connectors via Kafka Connect, Kafka Streams, Apache Flink integration
The Fundamental Model: Broker, Topic and Partition
Broker: The Cluster Node
Un broker it's simply a Kafka server. A Kafka cluster is composed of one or more brokers, each identified by a
broker.id unique. In production, 3, 6 or 9 brokers are typically used to guarantee fault tolerance. Brokers handle the writing
and reading records, maintaining logs to disk, and replicating between nodes.
With Kafka 4.0 and the new KRaft way, one or more brokers also take on the role of controller, managing the cluster metadata (who is the leader of which partition, which brokers are active, etc.) via an internal Raft consensus log. There is no longer any need of a separate ZooKeeper ensemble.
Topic: The Logical Category of Records
Un topic is the logical name under which producers publish records and consumers read them. You can think of it as a thematic channel:
ordini-effettuati, pagamenti-confermati, eventi-utente. Each topic has its own configuration for retention,
number of partitions, replication factor and compaction policies.
The topics are partitioned: each topic is divided into N physical partitions, distributed among the brokers. It is this distribution which makes Kafka horizontally scalable for both writing and reading.
Partition: The Unity of Parallelism and Ordering
Una partition it is a tidy and immutable append-only log. Each record written to a partition receives a offset monotonically increasing (0, 1, 2, ...). Sorting is guaranteed inside the partition, not between different partitions.
The distribution of records between partitions is determined by partition key: If the producer specifies a key, the record always goes to the same partition (hash of the key modulus number of partitions), guaranteeing sorting for that key. If the key is missing, Kafka uses a sticky round-robin strategy (batch records on the same partition before rotating).
# Creare un topic con 6 partizioni e replication factor 3
# (Kafka 4.0 con KRaft, niente --zookeeper flag)
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config min.insync.replicas=2
# Descrivere il topic per verificare la distribuzione
kafka-topics.sh --describe \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati
The output of --describe shows for each partition: the leader broker, the replicas, and the in-sync replicas (ISR —
In-Sync Replicas). ISRs are the replicas that have replicated all the leader's records: if the leader falls, only one ISR can be
elected as the new leader, ensuring no data loss.
The Producer: Writing Records in Kafka
Il producer it is the component that publishes records on the topics. The producer configuration determines delivery guarantees. The most critical properties are:
bootstrap.servers: list of brokers for the initial connection (the client discovers the other brokers automatically)key.serializerevalue.serializer: How to serialize key and value (StringSerializer, AvroSerializer, etc.)acks: how many reply confirmations to wait before considering the write successful (0,1,all)retries: number of attempts in case of temporary errorlinger.ms: milliseconds to wait before sending a batch (increases throughput at the expense of latency)batch.size: maximum batch size in bytes (default: 16KB)
// Producer Java con configurazione production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class OrdineProducer {
public static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// Garanzie di consegna: all = acks da tutti le ISR
props.put("acks", "all");
// Retry automatico con backoff
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// Batching per throughput
props.put("linger.ms", 5);
props.put("batch.size", 32768); // 32KB
// Compressione: riduce I/O di rete del 60-80%
props.put("compression.type", "snappy");
// Idempotenza: evita duplicati in caso di retry
props.put("enable.idempotence", true);
return new KafkaProducer<>(props);
}
public static void inviaOrdine(KafkaProducer<String, String> producer,
String ordineId, String payload) {
// La chiave (ordineId) determina la partizione target
ProducerRecord<String, String> record =
new ProducerRecord<>("ordini-effettuati", ordineId, payload);
// Invio asincrono con callback
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Errore invio: " + exception.getMessage());
} else {
System.out.printf("Record inviato: topic=%s, partizione=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
}
}
Attention: acks and Throughput
Increasing guarantees has a cost: with acks=all e min.insync.replicas=2, the producer expects at least 2 replies
have written the record before proceeding. This adds latency (typically 1-5ms extra), but also ensures no data loss
if a broker drops immediately after confirmation. For analytical systems that tolerate some loss, acks=1 o acks=0
they offer much higher throughput.
The Consumer: Reading Records from Kafka
The Polling Loop
The Kafka consumer uses a template pull: does not receive push messages, but actively requests them from the broker via calls
poll(). This design ensures that the consumer is not overwhelmed by a volume of messages beyond their capacity
of processing.
// Consumer Java base con gestione degli offset manuale
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class OrdineConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("group.id", "servizio-inventario");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// Comportamento alla prima lettura (nessun offset salvato per il gruppo)
// "earliest" = dall'inizio; "latest" = solo nuovi messaggi
props.put("auto.offset.reset", "earliest");
// Disabilitiamo il commit automatico per controllo preciso
props.put("enable.auto.commit", false);
// Timeout max per il join al consumer group (default: 45s)
props.put("session.timeout.ms", 30000);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("ordini-effettuati"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Offset: %d | Partizione: %d | Chiave: %s%n",
record.offset(), record.partition(), record.key());
// Elabora il record...
elaboraOrdine(record.value());
}
// Commit manuale DOPO l'elaborazione
// Garantisce at-least-once semantics
if (!records.isEmpty()) {
consumer.commitSync();
}
}
}
}
private static void elaboraOrdine(String payload) {
// logica di business...
}
}
Consumer Group: The Scaling Mechanism
Il consumer group it is the fundamental mechanism for scaling consumption in parallel. All consumers sharing
the same group.id they are part of the same group and divide the topic partitions. The rule is simple:
each partition can be assigned to only one consumer per group at a time.
This means that the maximum number of parallel consumers in a group is equal to the number of partitions. If you have 6 partitions and you start 6 consumers in the same group, each gets exactly 1 partition. If you start a 7th consumer, it will remain dormant on standby (useful for quick failover).
Consumer Group: Scaling Scenarios
- 1 consumer, 6 partitions → a consumer processes everything, no parallelism
- 3 consumers, 6 partitions → each consumer manages 2 partitions in parallel
- 6 consumers, 6 partitions → maximum parallelism, 1 partition per consumer
- 9 consumers, 6 partitions → 6 active, 3 on standby for failover
- Two distinct groups, same topic → each group receives ALL messages independently
Offsets: The Position Tracking Mechanism
L'offset is an integer that uniquely identifies the location of a record within a partition. The broker assigns the offset sequentially to each record written: the first record has an offset of 0, the second 1, and so on.
The consumer group stores its own committed offset — i.e. the offset of the last successfully processed record —
in a special internal Kafka topic called __consumer_offsets. This is the starting point in the event of a restart
or consumer failover.
Understanding the difference between these offsets is critical for error handling:
- Log End Offset (LEO): the offset of the next record that will be written to the log (head position)
- High Watermark (HW): the offset of the last record replicated across all ISRs (the consumer only sees records ≤ HW)
- Current Offset: the offset of the next record that the consumer will read in the next poll() call
- Committed Offset: the offset saved in the topic
__consumer_offsets(from which to restart after a crash) - Consumer Lag: Difference between LEO and Committed Offset, indicates how many records the consumer still needs to process
# Controllare il consumer lag di un gruppo
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--describe \
--group servizio-inventario
# Output tipico:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# servizio-inventario ordini-effettuati 0 1250 1280 30
# servizio-inventario ordini-effettuati 1 890 890 0
# servizio-inventario ordini-effettuati 2 2100 2105 5
# Resettare gli offset al principio (per reprocessing)
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--group servizio-inventario \
--topic ordini-effettuati \
--reset-offsets --to-earliest \
--execute
Replication: Durability and Fault Tolerance
Each partition has a leader and zero or more followers (replicas). Producers and consumers communicate always with the leader. Followers replicate data from the leader in an asynchronous but generally fast way.
The set of followers who are “updated enough” to become leaders forms theISR (In-Sync Replicas).
A follower is removed from the ISR if it falls behind more than replica.lag.time.max.ms milliseconds (default: 30s).
When the leader falls, the Kafka controller elects the follower with the highest offset among the ISRs as the new leader.
The combination of replication.factor e min.insync.replicas defines the trade-off between durability and availability:
# Configurazione consigliata per produzione
# replication.factor=3 significa: 1 leader + 2 follower
# topic-level overrides
kafka-topics.sh --alter \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati \
--config min.insync.replicas=2
# Con questa configurazione:
# - acks=all: producer aspetta conferma da leader + 1 follower minimo
# - Se 2 broker su 3 sono down: il cluster rifiuta scritture (ma no data loss)
# - Se solo 1 broker è down: il cluster continua normalmente
# broker-level defaults in server.properties
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
Retention Policy: How long the data remains
Records in Kafka are removed according to policies that can be configured per topic. There are two main modes:
-
Time-based retention (
retention.ms): Records are deleted after a certain period from their timestamp. Default: 604800000ms = 7 days. For critical topics such as audit logs, much higher values (years) are set. -
Size-based retention (
retention.bytes): the log per partition does not exceed a certain size. When the size exceeds the limit, older segments are deleted. -
Log compaction (
cleanup.policy=compact): instead of deleting by time/size, Kafka keeps only the last record for each key. Ideal for state topics (such as database tables replicated via CDC).
# Configurare retention per diversi use case
# Topic eventi real-time: retention breve, alta velocità
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic click-stream \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=3600000 \ # 1 ora
--config retention.bytes=1073741824 # 1GB per partizione
# Topic log audit: retention lunga per compliance
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic audit-log \
--partitions 3 \
--replication-factor 3 \
--config retention.ms=31536000000 \ # 1 anno
--config compression.type=gzip
# Topic di stato con log compaction (es. profili utente)
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic profili-utente \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5
Consumer Python: A Practical Example
The Kafka ecosystem supports many languages. Here is a Python example using the library confluent-kafka
(the official Confluent binding based on librdkafka, much more performant than kafka-python):
# pip install confluent-kafka
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import signal
import sys
TOPIC = "ordini-effettuati"
GROUP_ID = "servizio-analytics-py"
config = {
"bootstrap.servers": "kafka1:9092,kafka2:9092",
"group.id": GROUP_ID,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"session.timeout.ms": 30000,
"max.poll.interval.ms": 300000, # 5 minuti per elaborazioni lente
}
consumer = Consumer(config)
running = True
def graceful_shutdown(signum, frame):
global running
running = False
signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)
try:
consumer.subscribe([TOPIC])
print(f"Consumer avviato, gruppo: {GROUP_ID}")
while running:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Raggiunta la fine della partizione, aspetta nuovi messaggi
print(f"Raggiunto EOF: {msg.topic()} [{msg.partition()}] offset {msg.offset()}")
elif msg.error():
raise KafkaException(msg.error())
else:
ordine = json.loads(msg.value().decode("utf-8"))
print(f"Ricevuto ordine {ordine['id']} da partizione {msg.partition()}")
# Elabora l'ordine...
elabora_ordine(ordine)
# Commit manuale dopo elaborazione riuscita
consumer.commit(asynchronous=False)
finally:
consumer.close()
print("Consumer chiuso correttamente")
def elabora_ordine(ordine):
# Logica di business...
pass
Recommended Architecture: How Many Partitions?
One of the most common questions for those starting with Kafka is: how many partitions to create for a topic? The answer depends on several factors:
- Maximum parallelism of the consumer group: the number of partitions is the maximum number of parallel consumers. Estimate the number of consumers you expect to have at peak.
- Target throughput: Each partition can typically handle 10-50 MB/s writing (depends on the disk). Divide the total throughput by this figure to get the minimum number of partitions needed.
- Sorting: if you need to guarantee the order for a certain key (e.g. all events of the same customer), that client will always end up on the same partition. More partitions = better load distribution for different keys.
- Memory overhead: Each partition requires memory in the broker (~1-2 MB overhead). With 100K total partitions, it's starting to take a toll.
Practical Rule for Partitions
An approximate formula: max(throughput_MB_s / 10, consumer_max_paralleli). For most applications,
6, 12 or 24 partitions are reasonable values. Kafka allows you to grow partitions later, but
not to diminish them: Plan with a little margin.
Log Compaction: The Use Case for State Topics
La log compaction is an advanced feature of Kafka that completely changes the semantics retention: instead of deleting records by time or size, Kafka only keeps thelast record for each key. All older records with the same key are deleted during the compaction process.
This makes compacted topics ideal for representing the current state of entity: user profiles, current prices, system configurations, inventory. A consumer connecting to a topic compacted for the first time it can reconstruct the complete state by reading all the records present (one per key), without having to read the entire history of events.
A record with value null (“tombstone record”) is the way to delete a key from a topic
compacted: after compaction, the key itself also disappears from the log.
# Creare un topic con log compaction
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic profili-utente \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--config segment.ms=86400000 \
--config delete.retention.ms=86400000
# cleanup.policy=compact: abilita compaction
# min.cleanable.dirty.ratio=0.5: compatta quando >50% del log e' "dirty"
# segment.ms=86400000: crea un nuovo segmento ogni 24h
# delete.retention.ms: quanto tenere i tombstone record prima di eliminarli
# Inviare un aggiornamento profilo (chiave = userId)
kafka-console-producer.sh \
--bootstrap-server kafka1:9092 \
--topic profili-utente \
--property parse.key=true \
--property key.separator=:
# Digita: user123:{"nome":"Mario","email":"mario@example.com","eta":30}
# Digita: user456:{"nome":"Anna","email":"anna@example.com","eta":25}
# Digita: user123:{"nome":"Mario","email":"mario.rossi@example.com","eta":31}
# Dopo compaction, nel topic rimane solo l'ultima riga per user123
Kafka internals topic: __consumer_offsets and __transaction_state
Kafka internally uses special topics to manage its state. Knowing them helps to understand how they work of the system and to troubleshoot:
-
__consumer_offsets: stores the committed offsets of each consumer group. It has 50 partitions by default (offsets.topic.num.partitions). The consumer group is assigned to a partition via hash of the group.id. If this topic has replication problems, consumer groups they fail to commit offsets. -
__transaction_state: manages the status of ongoing transactions. Used by the Kafka transactional API to guarantee exactly-once semantics. It has 50 partitions by default. -
@metadata(KRaft only): The quorum controller metadata log. Contains all cluster metadata (topics, partitions, brokers, ACLs, configurations). Accessible only internally to the controllers.
# Ispezionare il topic __consumer_offsets (advanced troubleshooting)
# ATTENZIONE: operazione read-only, non modificare mai questi topic
kafka-console-consumer.sh \
--bootstrap-server kafka1:9092 \
--topic __consumer_offsets \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--from-beginning \
--max-messages 20
# Output esempio:
# [servizio-inventario,ordini-effettuati,0]::OffsetAndMetadata(offset=1250, ...)
# [servizio-inventario,ordini-effettuati,1]::OffsetAndMetadata(offset=890, ...)
# Elencare tutti i consumer group attivi
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--list
# Dettaglio di un gruppo specifico
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--group servizio-inventario \
--describe \
--state # include stato del gruppo (Stable, Rebalancing, Empty, Dead)
Message Header and Timestamp
Each Kafka record has a precise structure:
- Key (optional): determines the target partition, serialized in bytes
- Value: the message payload, serialized in bytes
- Timestamp: creation time on the producer side (
CreateTime) or broker-side ingestion (LogAppendTime), configurable - Headers: key-value pairs for metadata (correlation ID, event type, schema version, etc.)
- Partition + Offset: assigned by the broker at the time of writing
// Aggiungere headers a un ProducerRecord Java
ProducerRecord<String, String> record = new ProducerRecord<>(
"ordini-effettuati",
ordineId,
payload
);
// Headers per tracciabilità e versioning
record.headers()
.add("correlation-id", UUID.randomUUID().toString().getBytes())
.add("schema-version", "2".getBytes())
.add("source-service", "checkout-service".getBytes())
.add("event-type", "OrdineCreato".getBytes());
producer.send(record);
Docker Compose: Quick Start for Local Development
To start experimenting with Kafka locally without dealing with complex configurations, the quickest way is to use Docker Compose with the official Apache Kafka 4.0 image:
# docker-compose.yml minimale per sviluppo locale (single-node KRaft)
version: "3.9"
services:
kafka:
image: apache/kafka:4.0.0
container_name: kafka-local
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: "PLAINTEXT://kafka-local:9092,CONTROLLER://kafka-local:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-local:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
CLUSTER_ID: "local-dev-cluster-id-001"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
# Avvio:
# docker-compose up -d
#
# Verifica:
# docker exec kafka-local kafka-topics.sh --bootstrap-server localhost:9092 --list
Summary: The Fundamental Concepts
After exploring the internal structure of Kafka, here is a summary of the key concepts to memorize:
- Broker: cluster node, manages disk logs and replication
- Topics: logical category of records, divided into partitions
- Partition: sorted append-only log, unit of parallelism; ordering guaranteed only inside
- Offset: progressive position of each record in a partition
- Consumer Group: scaling mechanism; each partition assigned to only one consumer per group
- ISR: set of updated replicas, from which the new leader is elected in the event of a fault
- Consumer Lag: critical health indicator, difference between LEO and committed offset
- Retention: Records remain configurable, they are not deleted after consumption
Next Steps in the Series
Now that you have a solid foundation, the next articles in the series delve into more advanced aspects:
- Article 2 – KRaft in Kafka 4.0: how the new controller works without ZooKeeper, the migration process from Kafka 3.x and the operational benefits in production.
-
Article 3 – Advanced Producer and Consumer: the detailed configuration of
acks,retries,max.in.flight.requestsand the idempotent producer for exactly-once guarantees at the producer level. - Article 4 – Exactly-Once Semantics: Kafka transactions for atomic writes on multiple topics, the transaction coordinator and the implications on throughput.
Link with Other Series
- Observability and OpenTelemetry: How to instrument a Kafka application with OpenTelemetry to track the propagation of events between producers and consumers.
- Platform Engineering: Kafka as a fundamental component of an Internal Developer Platform for event-driven communication between teams.
- PostgreSQL AI: CDC (Change Data Capture) pattern with Debezium to synchronize PostgreSQL to Kafka in real time, the topic of Article 7 of this series.







