Los datos analíticos en SQL Server funcionan. Hasta que no funcionan. Cuando las tablas crecen a decenas de millones de registros, los queries analíticos compiten con el OLTP, los backups tardan horas, y agregar columnas nuevas a tablas grandes es una operación de bloqueo.
Delta Lake sobre almacenamiento de objetos (Azure Blob, ADLS Gen2, S3) separa el storage del compute, permite queries en paralelo desde Spark o DuckDB, y agrega ACID transactions y time travel sobre formato Parquet.
Este post es el proceso de migración: desde SQL Server con pyodbc hasta Delta Lake con Polars y PyArrow, con todos los detalles que los tutoriales omiten.
Por qué Delta Lake sobre Parquet plano
Parquet es el formato base. Delta Lake agrega:
- Transaction log: cada operación (insert, update, delete) queda registrada en
_delta_log/. Garantiza que lecturas concurrentes ven estado consistente. - Time travel:
DeltaTable("path").load_as_version(5)lee el estado de la tabla en la versión 5. Útil para debugging y para comparar datos históricos. - Schema enforcement: escrituras que no respetan el schema fallan explícitamente.
- MERGE (upsert):
when_matched_update_all().when_not_matched_insert_all()en una sola operación atómica.
Para datos analíticos que se actualizan incremetalmente, Delta Lake es estrictamente superior a Parquet plano.
Type mapping SQL Server → Arrow/Parquet
El primer obstáculo: los tipos de SQL Server no tienen mapeo 1:1 con Arrow. Algunos casos que requieren atención:
from pyarrow import schema, field
import pyarrow as pa
# Mapeo manual de tipos SQL Server problemáticos
TYPE_MAPPING = {
# SQL Server DATETIME2 → timestamp[us, tz=UTC]
# pyodbc retorna datetime naive → forzar UTC explícitamente
"datetime": pa.timestamp("us", tz="UTC"),
"datetime2": pa.timestamp("us", tz="UTC"),
"smalldatetime": pa.timestamp("s", tz="UTC"),
# SQL Server MONEY → decimal(19, 4)
# No usar float — pérdida de precisión en valores monetarios
"money": pa.decimal128(19, 4),
"smallmoney": pa.decimal128(10, 4),
# SQL Server NVARCHAR(MAX) → large_string
"nvarchar_max": pa.large_string(),
"varchar_max": pa.large_string(),
# SQL Server UNIQUEIDENTIFIER → string (UUID como string)
"uniqueidentifier": pa.string(),
# SQL Server BIT → bool
"bit": pa.bool_(),
}
El caso más problemático: DATETIME2 sin timezone en SQL Server. pyodbc retorna objetos datetime naive (sin timezone). Si los conviertes directamente a Arrow, quedan como timestamp[us] sin tz — después de migrar no sabes si eran UTC o local. La regla: forzar UTC en el punto de extracción.
Extracción con pyodbc en chunks
Para tablas grandes, extraer en chunks por rango de ID o fecha:
import pyodbc
import polars as pl
from datetime import date, timedelta
def extract_chunk(
conn_str: str,
table: str,
date_col: str,
start_date: date,
end_date: date,
) -> pl.DataFrame:
query = f"""
SELECT *
FROM {table}
WHERE {date_col} >= ? AND {date_col} < ?
ORDER BY {date_col}
"""
with pyodbc.connect(conn_str) as conn:
conn.execute("SET NOCOUNT ON") # Evitar overhead de rowcount messages
cursor = conn.cursor()
cursor.execute(query, start_date, end_date)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
if not rows:
return pl.DataFrame()
return pl.DataFrame(
[dict(zip(columns, row)) for row in rows],
infer_schema_length=len(rows),
)
El infer_schema_length=len(rows) hace que Polars infiera el schema del chunk completo, no solo de las primeras 100 filas. Evita errores de tipo cuando los primeros registros tienen valores nulos y los siguientes tienen datos.
Normalización de tipos antes de escribir
Después de extraer, normalizar los tipos problemáticos:
def normalize_types(df: pl.DataFrame) -> pl.DataFrame:
result = df
for col_name in df.columns:
col = df[col_name]
# Datetime naive → UTC (SQL Server suele almacenar en UTC sin marcarlo)
if col.dtype == pl.Datetime:
result = result.with_columns(
pl.col(col_name)
.dt.replace_time_zone("UTC")
.alias(col_name)
)
# Strings con valores "NULL" como texto → null real
elif col.dtype == pl.Utf8:
result = result.with_columns(
pl.when(pl.col(col_name).is_in(["NULL", "null", "None", ""]))
.then(None)
.otherwise(pl.col(col_name))
.alias(col_name)
)
# Decimal de Polars como Float64 → truncar a precisión correcta
elif col.dtype == pl.Float64 and col_name in MONEY_COLUMNS:
result = result.with_columns(
pl.col(col_name).round(4).alias(col_name)
)
return result
Escritura a Delta Lake con particionado
El particionado por fecha permite que las queries analíticas lean solo las particiones relevantes, reduciendo el scan de datos:
from deltalake import write_deltalake, DeltaTable
def write_chunk_to_delta(
df: pl.DataFrame,
delta_path: str,
partition_cols: list[str] = ["year", "month"],
) -> None:
if df.is_empty():
return
# Agregar columnas de partición derivadas de la fecha
df_with_partitions = df.with_columns([
pl.col("fecha_registro").dt.year().alias("year"),
pl.col("fecha_registro").dt.month().alias("month"),
])
arrow_table = df_with_partitions.to_arrow()
if DeltaTable.is_deltatable(delta_path):
dt = DeltaTable(delta_path)
(
dt.merge(
source=arrow_table,
predicate="target.id = source.id",
source_alias="source",
target_alias="target",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
else:
write_deltalake(
delta_path,
arrow_table,
partition_by=partition_cols,
mode="overwrite",
)
El particionado por year/month es el más común para datos con timestamp. Para tablas con distribución irregular, particionado por categoría o región puede ser más eficiente.
El pipeline completo de migración histórica
import asyncio
from datetime import date, timedelta
from pathlib import Path
async def migrate_table_historical(
table_name: str,
date_col: str,
start_date: date,
end_date: date,
delta_path: str,
chunk_days: int = 7,
) -> None:
current = start_date
total_rows = 0
while current < end_date:
chunk_end = min(current + timedelta(days=chunk_days), end_date)
print(f"Processing {current} → {chunk_end}")
df = extract_chunk(CONN_STR, table_name, date_col, current, chunk_end)
if not df.is_empty():
df_normalized = normalize_types(df)
write_chunk_to_delta(df_normalized, delta_path)
total_rows += len(df)
print(f" Written {len(df)} rows (total: {total_rows})")
current = chunk_end
print(f"Migration complete: {total_rows} total rows → {delta_path}")
Para tablas de 50M+ registros, usar chunks de 7 días y ejecutar el pipeline con checkpointing (high-water mark) para poder reanudar si se interrumpe.
Verificación post-migración
Antes de desactivar SQL Server como fuente:
def verify_migration(
sql_conn_str: str,
table_name: str,
delta_path: str,
) -> dict:
# Count en SQL Server
with pyodbc.connect(sql_conn_str) as conn:
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
sql_count = cursor.fetchone()[0]
# Count en Delta Lake
dt = DeltaTable(delta_path)
delta_count = len(dt.to_pandas()) # O usar DuckDB para count eficiente
# Sample comparison
with pyodbc.connect(sql_conn_str) as conn:
sql_sample = pl.read_database(
f"SELECT TOP 1000 * FROM {table_name} ORDER BY NEWID()",
conn
)
delta_df = pl.read_delta(delta_path)
# Comparar los 1000 registros del sample...
return {
"sql_count": sql_count,
"delta_count": delta_count,
"count_match": sql_count == delta_count,
"sample_rows": 1000,
}
La verificación por conteo es necesaria pero no suficiente. Para tablas con soft-deletes o updates frecuentes, verificar también que los últimos registros actualizados llegaron correctamente.
Lo que aprendí
El type mapping no es automático. pyodbc + Polars infiere tipos razonablemente, pero datetime naive, MONEY, y UNIQUEIDENTIFIER necesitan mapeo explícito. Si no lo haces en la extracción, lo pagas con corrupción silenciosa de datos.
El particionado es una decisión de diseño, no de implementación. Cambiar el esquema de partición de una tabla Delta Lake grande es costoso (reescribir todo). Pensar el patrón de queries esperado antes de decidir cómo particionar.
Delta Lake MERGE es idempotente para migración incremental. Puedes re-ejecutar el pipeline de migración sin duplicar datos. Si un chunk falla a mitad, el re-run desde ese punto es seguro.
DuckDB para verificación, no Pandas. Para verificar una tabla Delta Lake de 50M registros, pd.read_parquet(delta_path) carga todo en memoria. DuckDB lee directamente los archivos Parquet con predicado pushdown y usa una fracción de la memoria.