pythonetldelta-lakepolarspyarrowdata-engineeringazurepatterns

ETL que no explota en producción: chunking adaptivo, high-water marks y Delta Lake

El ETL que heredé funcionaba con datasets pequeños. Cuando el volumen creció, empezó a fallar. Los jobs de sincronización se colgaban después de 30 minutos, el proceso moría con timeout, y la tabla de destino quedaba en estado inconsistente — parcialmente actualizada, sin forma fácil de saber hasta dónde había llegado.

El problema era un chunk fijo de 10,000 registros que asumía que todos los registros pesan lo mismo. No pesan. Algunos rangos de IDs tienen records pequeños. Otros tienen records con arrays de imágenes, JSONs anidados, datos de geo. El mismo chunk de 10,000 puede ser 2MB o 400MB dependiendo del contenido.

Este post es el diseño del ETL reescrito: chunking adaptativo, high-water marks, streaming inserts a Delta Lake, y skip inteligente para no reprocesar lo que ya está.

El problema con chunk size fijo

# Antes — chunk fijo, timeout garantizado en datasets pesados
def sync_records(source_db, target_table, chunk_size=10_000):
    offset = 0
    while True:
        records = source_db.fetch(limit=chunk_size, offset=offset)
        if not records:
            break
        target_table.insert(records)
        offset += chunk_size

Dos problemas: el OFFSET en SQL escanea todas las filas anteriores en cada página (performance degrada linealmente), y el chunk fijo ignora el peso real de los datos.

Adaptive chunking por ID y fecha

La solución parte de una observación: los datos tienen una distribución temporal. La mayoría de los sistemas tienen más datos nuevos que datos viejos modificados. Y los IDs suelen ser monótonos crecientes.

El chunking adaptativo divide por rangos de IDs o por días, y ajusta dinámicamente el tamaño del chunk según el volumen real encontrado:

from dataclasses import dataclass
from datetime import date, timedelta

@dataclass
class ChunkSpec:
    id_start: int
    id_end: int
    date_filter: date | None = None
    estimated_rows: int = 0

async def compute_adaptive_chunks(
    source: SourceDB,
    target_hwm: int,        # high-water mark — último ID procesado
    max_chunk_rows: int = 50_000,
    max_chunk_mb: int = 100,
) -> list[ChunkSpec]:
    chunks = []
    cursor = target_hwm

    while True:
        # Estimar peso del próximo bloque antes de descargarlo
        probe = await source.estimate_range(
            id_start=cursor,
            id_end=cursor + max_chunk_rows
        )

        if probe.total_rows == 0:
            break

        # Si el bloque estimado supera el límite en MB, reducir el rango
        if probe.estimated_mb > max_chunk_mb:
            adjusted_end = cursor + int(
                max_chunk_rows * (max_chunk_mb / probe.estimated_mb)
            )
            chunks.append(ChunkSpec(cursor, adjusted_end, estimated_rows=probe.total_rows))
            cursor = adjusted_end
        else:
            chunks.append(ChunkSpec(cursor, cursor + max_chunk_rows, estimated_rows=probe.total_rows))
            cursor += max_chunk_rows

    return chunks

El estimate_range es un query rápido (solo COUNT y SUM de tamaño estimado de columnas) que no descarga los datos. Eso permite ajustar el chunk antes de hacer el fetch real.

High-water marks UTC-safe

Un high-water mark es el puntero al último registro procesado. Permite reanudar un ETL interrumpido sin reprocesar desde el inicio.

El problema con timestamps como high-water mark: zonas horarias. Un registro creado en UTC+5 puede tener timestamp local que parece “anterior” a un registro UTC ya procesado. El ETL puede saltar registros.

La regla: siempre UTC en el high-water mark, siempre UTC en el filtro de query.

from datetime import datetime, timezone

@dataclass
class HighWaterMark:
    last_id: int
    last_processed_at: datetime  # Siempre UTC
    source_name: str

    @classmethod
    def from_db(cls, record: dict) -> "HighWaterMark":
        return cls(
            last_id=record["last_id"],
            last_processed_at=record["last_processed_at"].replace(
                tzinfo=timezone.utc  # Forzar UTC aunque la DB no lo tenga
            ),
            source_name=record["source_name"],
        )

    def advance(self, new_last_id: int) -> "HighWaterMark":
        return HighWaterMark(
            last_id=new_last_id,
            last_processed_at=datetime.now(tz=timezone.utc),
            source_name=self.source_name,
        )

El high-water mark se guarda en una tabla de control separada, actualizada al final de cada chunk exitoso — no al final del ETL completo. Si el proceso se interrumpe en el chunk 7 de 20, el próximo run retoma desde el chunk 7, no desde cero.

Per-day skip

Para datos históricos o re-sincronizaciones completas, el chunking por día agrega una optimización: si ya procesé un día completo y no hubo cambios en esa fuente ese día, saltarlo.

async def should_skip_day(
    source: SourceDB,
    target: DeltaTable,
    day: date,
) -> bool:
    source_count = await source.count_for_day(day)
    target_count = await target.count_for_day(day)

    # Si los conteos coinciden, asumir que el día está sincronizado
    # (Para fuentes sin soft-delete, esto es suficiente)
    if source_count == target_count:
        return True

    return False

Para fuentes con soft-deletes o updates frecuentes, el skip por conteo no es suficiente — necesitas comparar checksums o usar change data capture (CDC).

Streaming inserts a Delta Lake

Delta Lake con Polars y PyArrow permite inserts incrementales sin reescribir la tabla completa. El patrón MERGE INTO (upsert) actualiza registros existentes e inserta nuevos en una sola operación:

import polars as pl
from deltalake import DeltaTable, write_deltalake
from deltalake.writer import write_deltalake

def stream_chunk_to_delta(
    records: list[dict],
    delta_path: str,
    merge_keys: list[str],
) -> None:
    if not records:
        return

    df = pl.DataFrame(records)

    # Convertir a PyArrow para DeltaLake
    arrow_table = df.to_arrow()

    if DeltaTable.is_deltatable(delta_path):
        dt = DeltaTable(delta_path)
        # MERGE: update existentes + insert nuevos
        (
            dt.merge(
                source=arrow_table,
                predicate=" AND ".join(
                    f"target.{k} = source.{k}" for k in merge_keys
                ),
                source_alias="source",
                target_alias="target",
            )
            .when_matched_update_all()
            .when_not_matched_insert_all()
            .execute()
        )
    else:
        # Primera escritura — crear tabla
        write_deltalake(delta_path, arrow_table)

El merge garantiza idempotencia: si el chunk se reprocesa (por un retry), los registros existentes se actualizan con los mismos valores, sin duplicados.

Chunked deletes para storelive

Un caso especial: sincronizar deletes. Los registros que existían en la fuente y ya no existen deben eliminarse del destino.

El problema con DELETE FROM target WHERE id NOT IN (SELECT id FROM source) en tablas grandes: el NOT IN con millones de IDs es catastrófico en performance.

La alternativa: delta de deletes en chunks.

async def sync_deletes_chunked(
    source: SourceDB,
    target: DeltaTable,
    chunk_size: int = 10_000,
) -> int:
    total_deleted = 0
    cursor = 0

    while True:
        # IDs que existen en target pero no en source, en chunks
        orphan_ids = await find_orphan_ids(
            source, target, offset=cursor, limit=chunk_size
        )
        if not orphan_ids:
            break

        # DELETE en Delta Lake
        dt = DeltaTable(target.path)
        dt.delete(f"id IN ({','.join(str(i) for i in orphan_ids)})")

        total_deleted += len(orphan_ids)
        cursor += chunk_size

    return total_deleted

Los deletes también se loggean en el high-water mark para poder auditar qué se borró y cuándo.

Lo que aprendí

Los timeouts de ETL son síntomas, no el problema real. El problema real es la ausencia de checkpointing y chunking adaptativo. Un ETL que puede reanudarse desde el punto de falla no genera timeouts — simplemente toma más tiempo en el peor caso.

UTC everywhere no es opcional. Un bug de timezone en el high-water mark puede causar saltos silenciosos de datos — registros que nunca se sincronizan porque el filtro los excluye. El seguro mínimo es hacer explícita la timezone en cada timestamp de control.

Delta Lake es la respuesta correcta para ETL incremental. El MERGE nativo, el time travel para auditoría, y la compatibilidad con el ecosistema Arrow/Parquet lo hacen más robusto que alternativas SQL puras para este tipo de pipeline.

El estimate antes del fetch vale el query extra. El costo de un COUNT rápido antes del fetch real es despreciable comparado con el costo de abortar un fetch de 400MB a mitad.

Volver al blog