delta-lakesql-serverpolarspyarrowpythondata-engineeringmigration

Cómo migré tablas de SQL Server a Delta Lake sin perder datos

Los datos analíticos en SQL Server funcionan. Hasta que no funcionan. Cuando las tablas crecen a decenas de millones de registros, los queries analíticos compiten con el OLTP, los backups tardan horas, y agregar columnas nuevas a tablas grandes es una operación de bloqueo.

Delta Lake sobre almacenamiento de objetos (Azure Blob, ADLS Gen2, S3) separa el storage del compute, permite queries en paralelo desde Spark o DuckDB, y agrega ACID transactions y time travel sobre formato Parquet.

Este post es el proceso de migración: desde SQL Server con pyodbc hasta Delta Lake con Polars y PyArrow, con todos los detalles que los tutoriales omiten.

Por qué Delta Lake sobre Parquet plano

Parquet es el formato base. Delta Lake agrega:

Para datos analíticos que se actualizan incremetalmente, Delta Lake es estrictamente superior a Parquet plano.

Type mapping SQL Server → Arrow/Parquet

El primer obstáculo: los tipos de SQL Server no tienen mapeo 1:1 con Arrow. Algunos casos que requieren atención:

from pyarrow import schema, field
import pyarrow as pa

# Mapeo manual de tipos SQL Server problemáticos
TYPE_MAPPING = {
    # SQL Server DATETIME2 → timestamp[us, tz=UTC]
    # pyodbc retorna datetime naive → forzar UTC explícitamente
    "datetime": pa.timestamp("us", tz="UTC"),
    "datetime2": pa.timestamp("us", tz="UTC"),
    "smalldatetime": pa.timestamp("s", tz="UTC"),

    # SQL Server MONEY → decimal(19, 4)
    # No usar float — pérdida de precisión en valores monetarios
    "money": pa.decimal128(19, 4),
    "smallmoney": pa.decimal128(10, 4),

    # SQL Server NVARCHAR(MAX) → large_string
    "nvarchar_max": pa.large_string(),
    "varchar_max": pa.large_string(),

    # SQL Server UNIQUEIDENTIFIER → string (UUID como string)
    "uniqueidentifier": pa.string(),

    # SQL Server BIT → bool
    "bit": pa.bool_(),
}

El caso más problemático: DATETIME2 sin timezone en SQL Server. pyodbc retorna objetos datetime naive (sin timezone). Si los conviertes directamente a Arrow, quedan como timestamp[us] sin tz — después de migrar no sabes si eran UTC o local. La regla: forzar UTC en el punto de extracción.

Extracción con pyodbc en chunks

Para tablas grandes, extraer en chunks por rango de ID o fecha:

import pyodbc
import polars as pl
from datetime import date, timedelta

def extract_chunk(
    conn_str: str,
    table: str,
    date_col: str,
    start_date: date,
    end_date: date,
) -> pl.DataFrame:
    query = f"""
        SELECT *
        FROM {table}
        WHERE {date_col} >= ? AND {date_col} < ?
        ORDER BY {date_col}
    """

    with pyodbc.connect(conn_str) as conn:
        conn.execute("SET NOCOUNT ON")  # Evitar overhead de rowcount messages
        cursor = conn.cursor()
        cursor.execute(query, start_date, end_date)

        columns = [desc[0] for desc in cursor.description]
        rows = cursor.fetchall()

        if not rows:
            return pl.DataFrame()

        return pl.DataFrame(
            [dict(zip(columns, row)) for row in rows],
            infer_schema_length=len(rows),
        )

El infer_schema_length=len(rows) hace que Polars infiera el schema del chunk completo, no solo de las primeras 100 filas. Evita errores de tipo cuando los primeros registros tienen valores nulos y los siguientes tienen datos.

Normalización de tipos antes de escribir

Después de extraer, normalizar los tipos problemáticos:

def normalize_types(df: pl.DataFrame) -> pl.DataFrame:
    result = df

    for col_name in df.columns:
        col = df[col_name]

        # Datetime naive → UTC (SQL Server suele almacenar en UTC sin marcarlo)
        if col.dtype == pl.Datetime:
            result = result.with_columns(
                pl.col(col_name)
                  .dt.replace_time_zone("UTC")
                  .alias(col_name)
            )

        # Strings con valores "NULL" como texto → null real
        elif col.dtype == pl.Utf8:
            result = result.with_columns(
                pl.when(pl.col(col_name).is_in(["NULL", "null", "None", ""]))
                  .then(None)
                  .otherwise(pl.col(col_name))
                  .alias(col_name)
            )

        # Decimal de Polars como Float64 → truncar a precisión correcta
        elif col.dtype == pl.Float64 and col_name in MONEY_COLUMNS:
            result = result.with_columns(
                pl.col(col_name).round(4).alias(col_name)
            )

    return result

Escritura a Delta Lake con particionado

El particionado por fecha permite que las queries analíticas lean solo las particiones relevantes, reduciendo el scan de datos:

from deltalake import write_deltalake, DeltaTable

def write_chunk_to_delta(
    df: pl.DataFrame,
    delta_path: str,
    partition_cols: list[str] = ["year", "month"],
) -> None:
    if df.is_empty():
        return

    # Agregar columnas de partición derivadas de la fecha
    df_with_partitions = df.with_columns([
        pl.col("fecha_registro").dt.year().alias("year"),
        pl.col("fecha_registro").dt.month().alias("month"),
    ])

    arrow_table = df_with_partitions.to_arrow()

    if DeltaTable.is_deltatable(delta_path):
        dt = DeltaTable(delta_path)
        (
            dt.merge(
                source=arrow_table,
                predicate="target.id = source.id",
                source_alias="source",
                target_alias="target",
            )
            .when_matched_update_all()
            .when_not_matched_insert_all()
            .execute()
        )
    else:
        write_deltalake(
            delta_path,
            arrow_table,
            partition_by=partition_cols,
            mode="overwrite",
        )

El particionado por year/month es el más común para datos con timestamp. Para tablas con distribución irregular, particionado por categoría o región puede ser más eficiente.

El pipeline completo de migración histórica

import asyncio
from datetime import date, timedelta
from pathlib import Path

async def migrate_table_historical(
    table_name: str,
    date_col: str,
    start_date: date,
    end_date: date,
    delta_path: str,
    chunk_days: int = 7,
) -> None:
    current = start_date
    total_rows = 0

    while current < end_date:
        chunk_end = min(current + timedelta(days=chunk_days), end_date)

        print(f"Processing {current}{chunk_end}")

        df = extract_chunk(CONN_STR, table_name, date_col, current, chunk_end)

        if not df.is_empty():
            df_normalized = normalize_types(df)
            write_chunk_to_delta(df_normalized, delta_path)
            total_rows += len(df)
            print(f"  Written {len(df)} rows (total: {total_rows})")

        current = chunk_end

    print(f"Migration complete: {total_rows} total rows → {delta_path}")

Para tablas de 50M+ registros, usar chunks de 7 días y ejecutar el pipeline con checkpointing (high-water mark) para poder reanudar si se interrumpe.

Verificación post-migración

Antes de desactivar SQL Server como fuente:

def verify_migration(
    sql_conn_str: str,
    table_name: str,
    delta_path: str,
) -> dict:
    # Count en SQL Server
    with pyodbc.connect(sql_conn_str) as conn:
        cursor = conn.cursor()
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        sql_count = cursor.fetchone()[0]

    # Count en Delta Lake
    dt = DeltaTable(delta_path)
    delta_count = len(dt.to_pandas())  # O usar DuckDB para count eficiente

    # Sample comparison
    with pyodbc.connect(sql_conn_str) as conn:
        sql_sample = pl.read_database(
            f"SELECT TOP 1000 * FROM {table_name} ORDER BY NEWID()",
            conn
        )

    delta_df = pl.read_delta(delta_path)
    # Comparar los 1000 registros del sample...

    return {
        "sql_count": sql_count,
        "delta_count": delta_count,
        "count_match": sql_count == delta_count,
        "sample_rows": 1000,
    }

La verificación por conteo es necesaria pero no suficiente. Para tablas con soft-deletes o updates frecuentes, verificar también que los últimos registros actualizados llegaron correctamente.

Lo que aprendí

El type mapping no es automático. pyodbc + Polars infiere tipos razonablemente, pero datetime naive, MONEY, y UNIQUEIDENTIFIER necesitan mapeo explícito. Si no lo haces en la extracción, lo pagas con corrupción silenciosa de datos.

El particionado es una decisión de diseño, no de implementación. Cambiar el esquema de partición de una tabla Delta Lake grande es costoso (reescribir todo). Pensar el patrón de queries esperado antes de decidir cómo particionar.

Delta Lake MERGE es idempotente para migración incremental. Puedes re-ejecutar el pipeline de migración sin duplicar datos. Si un chunk falla a mitad, el re-run desde ese punto es seguro.

DuckDB para verificación, no Pandas. Para verificar una tabla Delta Lake de 50M registros, pd.read_parquet(delta_path) carga todo en memoria. DuckDB lee directamente los archivos Parquet con predicado pushdown y usa una fracción de la memoria.

Volver al blog