Kafka es at-least-once por diseño. Un mensaje puede entregarse más de una vez. Lo que no debería pasar es que un mensaje se procese de forma incorrecta y el offset avance de todas formas — eso es pérdida silenciosa de datos.
El auto-commit de Kafka avanza el offset en un intervalo fijo, independientemente de si el procesamiento del mensaje fue exitoso. Es el modo por defecto y la causa más común de pérdida de datos en consumers mal configurados.
Este post es el diseño de un consumer Kafka con aiokafka que hace commit manual, maneja rebalanceo de particiones, y envía mensajes que fallan a una dead letter queue.
Por qué auto-commit pierde mensajes
t=0: Consumer recibe mensaje M1
t=1: Consumer empieza a procesar M1
t=2: auto-commit avanza offset a M1 ← mensaje "committed"
t=3: Consumer falla procesando M1
t=4: Consumer reinicia
t=5: Consumer empieza desde M2 ← M1 nunca se procesó correctamente
Con auto-commit habilitado, el offset avanza aunque el procesamiento haya fallado. La semántica real es “at-most-once” — el mensaje puede procesarse cero o una vez, nunca más de una.
Para garantías at-least-once, el commit del offset debe ocurrir después de que el procesamiento fue exitoso.
Consumer básico con commit manual
import asyncio
from aiokafka import AIOKafkaConsumer
from aiokafka.errors import KafkaError
import logging
logger = logging.getLogger(__name__)
async def consume(
bootstrap_servers: str,
topic: str,
group_id: str,
) -> None:
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
enable_auto_commit=False, # Commit manual
auto_offset_reset="earliest",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)
await consumer.start()
try:
async for message in consumer:
try:
await process_message(message)
# Commit SOLO si el procesamiento fue exitoso
await consumer.commit()
logger.info("message.processed",
extra={"topic": message.topic,
"partition": message.partition,
"offset": message.offset})
except ProcessingError as e:
logger.error("message.processing_failed",
extra={"offset": message.offset, "error": str(e)})
# No hacemos commit — el mensaje se reentregará
# En un sistema real: enviar a DLQ antes de continuar
await send_to_dlq(message, error=e)
await consumer.commit() # Commit después de DLQ
finally:
await consumer.stop()
El flujo: procesar → si exitoso, commit → si falla, enviar a DLQ → commit. El commit siempre ocurre, pero después del DLQ cuando hay error — el mensaje no se pierde ni bloquea el consumer indefinidamente.
Manejo de rebalanceo de particiones
Cuando un consumer entra o sale del grupo (scale-up, deploy, crash), Kafka rebalancea las particiones entre los consumers activos. Durante el rebalanceo, el consumer recibe notificaciones:
from aiokafka import AIOKafkaConsumer
from aiokafka.abc import ConsumerRebalanceListener
class RebalanceHandler(ConsumerRebalanceListener):
def __init__(self, consumer: AIOKafkaConsumer):
self._consumer = consumer
self._pending_commits: dict = {}
async def on_partitions_revoked(self, revoked):
# Particiones que se van a reasignar
# Hacer commit de los offsets pendientes ANTES de perder las particiones
logger.info("partitions.revoked", extra={"partitions": str(revoked)})
if self._pending_commits:
try:
await self._consumer.commit(self._pending_commits)
self._pending_commits.clear()
logger.info("partitions.committed_before_revoke")
except KafkaError as e:
logger.error("partitions.commit_failed", extra={"error": str(e)})
async def on_partitions_assigned(self, assigned):
# Particiones que se acaban de asignar
logger.info("partitions.assigned", extra={"partitions": str(assigned)})
async def consume_with_rebalance(bootstrap_servers: str, topic: str, group_id: str):
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
enable_auto_commit=False,
)
handler = RebalanceHandler(consumer)
await consumer.start()
try:
consumer.subscribe([topic], listener=handler)
async for message in consumer:
await process_message(message)
handler._pending_commits[
TopicPartition(message.topic, message.partition)
] = OffsetAndMetadata(message.offset + 1, "")
finally:
await consumer.stop()
on_partitions_revoked es el momento crítico. Si hay offsets sin commitear cuando las particiones se reasignan, esos mensajes se reprocessarán por el consumer que recibe esas particiones. El commit en on_partitions_revoked minimiza el reprocessing.
Dead Letter Queue
Los mensajes que fallan después de N reintentos van a un topic DLQ:
from aiokafka import AIOKafkaProducer
import json
from datetime import datetime, timezone
async def send_to_dlq(
message,
error: Exception,
dlq_producer: AIOKafkaProducer,
dlq_topic: str = "dlq",
) -> None:
dlq_payload = {
"original_topic": message.topic,
"original_partition": message.partition,
"original_offset": message.offset,
"original_key": message.key.decode() if message.key else None,
"original_value": message.value,
"error_type": type(error).__name__,
"error_message": str(error),
"failed_at": datetime.now(tz=timezone.utc).isoformat(),
}
await dlq_producer.send(
dlq_topic,
key=message.key,
value=json.dumps(dlq_payload).encode("utf-8"),
)
logger.warning("message.sent_to_dlq",
extra={"topic": message.topic, "offset": message.offset})
El DLQ preserva el mensaje original con metadata adicional (error, timestamp, topic de origen). Un proceso separado puede:
- Monitorear el DLQ para alertas
- Reprocessar mensajes del DLQ después de corregir el bug
- Analizar patrones de fallos
Retry con backoff antes del DLQ
No todo fallo debe ir al DLQ inmediatamente. Los errores transitorios (timeout de red, base de datos temporalmente no disponible) se resuelven con retry:
import tenacity
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True,
)
async def process_with_retry(message) -> None:
await process_message(message)
async def handle_message(
message,
consumer: AIOKafkaConsumer,
dlq_producer: AIOKafkaProducer,
) -> None:
try:
await process_with_retry(message)
await consumer.commit()
except Exception as e:
# Agotó los reintentos — ir al DLQ
logger.error("message.max_retries_exceeded",
extra={"offset": message.offset, "attempts": 3})
await send_to_dlq(message, error=e, dlq_producer=dlq_producer)
await consumer.commit() # Commit después de DLQ para no reprocessar
3 reintentos con backoff exponencial cubren la mayoría de errores transitorios. Los errores persistentes (bug de código, schema inválido) van al DLQ después del tercer intento.
Procesamiento por batch
Para mensajes livianos donde el overhead de procesar uno a uno es significativo, el procesamiento por batch mejora el throughput:
async def consume_batched(
consumer: AIOKafkaConsumer,
batch_size: int = 100,
max_wait_ms: int = 500,
) -> None:
batch = []
async for message in consumer:
batch.append(message)
if len(batch) >= batch_size:
await process_batch(batch)
await consumer.commit()
batch.clear()
# Procesar el batch final (puede ser < batch_size)
if batch:
await process_batch(batch)
await consumer.commit()
El riesgo del procesamiento por batch: si el proceso cae cuando hay 50 mensajes en el batch sin commitear, todos los 50 se reprocesarán en el próximo run. La idempotencia del process_batch es obligatoria.
Lo que aprendí
auto-commit = at-most-once, no at-least-once. Este es el malentendido más común en Kafka. Kafka el broker es at-least-once. El consumer con auto-commit es at-most-once porque puede commitear offsets de mensajes que no se procesaron correctamente.
El commit en on_partitions_revoked es la única protección contra reprocessing masivo en rebalanceos. Sin él, cada deploy que causa rebalanceo puede reprocesar cientos o miles de mensajes.
La DLQ es infraestructura obligatoria, no opcional. Sin DLQ, un mensaje que falla consistentemente bloquea el consumer para siempre (o hasta que lo elimines manualmente). Con DLQ, el consumer avanza y el mensaje problemático queda aislado para análisis.
Idempotencia en el procesador es el seguro final. Con at-least-once delivery y commit manual, los mensajes se pueden procesar más de una vez en casos de fallo. El procesador tiene que manejar duplicados correctamente.