El ETL que heredé funcionaba con datasets pequeños. Cuando el volumen creció, empezó a fallar. Los jobs de sincronización se colgaban después de 30 minutos, el proceso moría con timeout, y la tabla de destino quedaba en estado inconsistente — parcialmente actualizada, sin forma fácil de saber hasta dónde había llegado.
El problema era un chunk fijo de 10,000 registros que asumía que todos los registros pesan lo mismo. No pesan. Algunos rangos de IDs tienen records pequeños. Otros tienen records con arrays de imágenes, JSONs anidados, datos de geo. El mismo chunk de 10,000 puede ser 2MB o 400MB dependiendo del contenido.
Este post es el diseño del ETL reescrito: chunking adaptativo, high-water marks, streaming inserts a Delta Lake, y skip inteligente para no reprocesar lo que ya está.
El problema con chunk size fijo
# Antes — chunk fijo, timeout garantizado en datasets pesados
def sync_records(source_db, target_table, chunk_size=10_000):
offset = 0
while True:
records = source_db.fetch(limit=chunk_size, offset=offset)
if not records:
break
target_table.insert(records)
offset += chunk_size
Dos problemas: el OFFSET en SQL escanea todas las filas anteriores en cada página (performance degrada linealmente), y el chunk fijo ignora el peso real de los datos.
Adaptive chunking por ID y fecha
La solución parte de una observación: los datos tienen una distribución temporal. La mayoría de los sistemas tienen más datos nuevos que datos viejos modificados. Y los IDs suelen ser monótonos crecientes.
El chunking adaptativo divide por rangos de IDs o por días, y ajusta dinámicamente el tamaño del chunk según el volumen real encontrado:
from dataclasses import dataclass
from datetime import date, timedelta
@dataclass
class ChunkSpec:
id_start: int
id_end: int
date_filter: date | None = None
estimated_rows: int = 0
async def compute_adaptive_chunks(
source: SourceDB,
target_hwm: int, # high-water mark — último ID procesado
max_chunk_rows: int = 50_000,
max_chunk_mb: int = 100,
) -> list[ChunkSpec]:
chunks = []
cursor = target_hwm
while True:
# Estimar peso del próximo bloque antes de descargarlo
probe = await source.estimate_range(
id_start=cursor,
id_end=cursor + max_chunk_rows
)
if probe.total_rows == 0:
break
# Si el bloque estimado supera el límite en MB, reducir el rango
if probe.estimated_mb > max_chunk_mb:
adjusted_end = cursor + int(
max_chunk_rows * (max_chunk_mb / probe.estimated_mb)
)
chunks.append(ChunkSpec(cursor, adjusted_end, estimated_rows=probe.total_rows))
cursor = adjusted_end
else:
chunks.append(ChunkSpec(cursor, cursor + max_chunk_rows, estimated_rows=probe.total_rows))
cursor += max_chunk_rows
return chunks
El estimate_range es un query rápido (solo COUNT y SUM de tamaño estimado de columnas) que no descarga los datos. Eso permite ajustar el chunk antes de hacer el fetch real.
High-water marks UTC-safe
Un high-water mark es el puntero al último registro procesado. Permite reanudar un ETL interrumpido sin reprocesar desde el inicio.
El problema con timestamps como high-water mark: zonas horarias. Un registro creado en UTC+5 puede tener timestamp local que parece “anterior” a un registro UTC ya procesado. El ETL puede saltar registros.
La regla: siempre UTC en el high-water mark, siempre UTC en el filtro de query.
from datetime import datetime, timezone
@dataclass
class HighWaterMark:
last_id: int
last_processed_at: datetime # Siempre UTC
source_name: str
@classmethod
def from_db(cls, record: dict) -> "HighWaterMark":
return cls(
last_id=record["last_id"],
last_processed_at=record["last_processed_at"].replace(
tzinfo=timezone.utc # Forzar UTC aunque la DB no lo tenga
),
source_name=record["source_name"],
)
def advance(self, new_last_id: int) -> "HighWaterMark":
return HighWaterMark(
last_id=new_last_id,
last_processed_at=datetime.now(tz=timezone.utc),
source_name=self.source_name,
)
El high-water mark se guarda en una tabla de control separada, actualizada al final de cada chunk exitoso — no al final del ETL completo. Si el proceso se interrumpe en el chunk 7 de 20, el próximo run retoma desde el chunk 7, no desde cero.
Per-day skip
Para datos históricos o re-sincronizaciones completas, el chunking por día agrega una optimización: si ya procesé un día completo y no hubo cambios en esa fuente ese día, saltarlo.
async def should_skip_day(
source: SourceDB,
target: DeltaTable,
day: date,
) -> bool:
source_count = await source.count_for_day(day)
target_count = await target.count_for_day(day)
# Si los conteos coinciden, asumir que el día está sincronizado
# (Para fuentes sin soft-delete, esto es suficiente)
if source_count == target_count:
return True
return False
Para fuentes con soft-deletes o updates frecuentes, el skip por conteo no es suficiente — necesitas comparar checksums o usar change data capture (CDC).
Streaming inserts a Delta Lake
Delta Lake con Polars y PyArrow permite inserts incrementales sin reescribir la tabla completa. El patrón MERGE INTO (upsert) actualiza registros existentes e inserta nuevos en una sola operación:
import polars as pl
from deltalake import DeltaTable, write_deltalake
from deltalake.writer import write_deltalake
def stream_chunk_to_delta(
records: list[dict],
delta_path: str,
merge_keys: list[str],
) -> None:
if not records:
return
df = pl.DataFrame(records)
# Convertir a PyArrow para DeltaLake
arrow_table = df.to_arrow()
if DeltaTable.is_deltatable(delta_path):
dt = DeltaTable(delta_path)
# MERGE: update existentes + insert nuevos
(
dt.merge(
source=arrow_table,
predicate=" AND ".join(
f"target.{k} = source.{k}" for k in merge_keys
),
source_alias="source",
target_alias="target",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
else:
# Primera escritura — crear tabla
write_deltalake(delta_path, arrow_table)
El merge garantiza idempotencia: si el chunk se reprocesa (por un retry), los registros existentes se actualizan con los mismos valores, sin duplicados.
Chunked deletes para storelive
Un caso especial: sincronizar deletes. Los registros que existían en la fuente y ya no existen deben eliminarse del destino.
El problema con DELETE FROM target WHERE id NOT IN (SELECT id FROM source) en tablas grandes: el NOT IN con millones de IDs es catastrófico en performance.
La alternativa: delta de deletes en chunks.
async def sync_deletes_chunked(
source: SourceDB,
target: DeltaTable,
chunk_size: int = 10_000,
) -> int:
total_deleted = 0
cursor = 0
while True:
# IDs que existen en target pero no en source, en chunks
orphan_ids = await find_orphan_ids(
source, target, offset=cursor, limit=chunk_size
)
if not orphan_ids:
break
# DELETE en Delta Lake
dt = DeltaTable(target.path)
dt.delete(f"id IN ({','.join(str(i) for i in orphan_ids)})")
total_deleted += len(orphan_ids)
cursor += chunk_size
return total_deleted
Los deletes también se loggean en el high-water mark para poder auditar qué se borró y cuándo.
Lo que aprendí
Los timeouts de ETL son síntomas, no el problema real. El problema real es la ausencia de checkpointing y chunking adaptativo. Un ETL que puede reanudarse desde el punto de falla no genera timeouts — simplemente toma más tiempo en el peor caso.
UTC everywhere no es opcional. Un bug de timezone en el high-water mark puede causar saltos silenciosos de datos — registros que nunca se sincronizan porque el filtro los excluye. El seguro mínimo es hacer explícita la timezone en cada timestamp de control.
Delta Lake es la respuesta correcta para ETL incremental. El MERGE nativo, el time travel para auditoría, y la compatibilidad con el ecosistema Arrow/Parquet lo hacen más robusto que alternativas SQL puras para este tipo de pipeline.
El estimate antes del fetch vale el query extra. El costo de un COUNT rápido antes del fetch real es despreciable comparado con el costo de abortar un fetch de 400MB a mitad.