Agregar Transformaciones de Datos y Carga a Destino

Video
30 min~10 min lectura
Objetivo de la lección

Sin embargo, el corazón de un pipeline ETL reside en la transformación .

Puntos de control
  • Transformaciones de Datos en Airflow: Más Allá de la Extracción Simple
  • Concepto Clave: El Patrón de Transformación y Carga Idempotente
  • Cómo Funciona en la Práctica: Un Flujo Paso a Paso
  • Código en Acción: Implementación de un DAG de Transformación y Carga

Reproductor de video

Transformaciones de Datos en Airflow: Más Allá de la Extracción Simple

En lecciones anteriores, hemos dominado la extracción de datos desde múltiples fuentes y su almacenamiento provisional. Sin embargo, el corazón de un pipeline ETL reside en la transformación. Esta fase es donde los datos crudos se convierten en información valiosa, estructurada y lista para el análisis. En Airflow, las transformaciones no ocurren por arte de magia; deben ser diseñadas intencionalmente dentro de las tareas de nuestros DAGs, considerando la idempotencia, la eficiencia y la trazabilidad. No se trata solo de cambiar el formato de una columna, sino de aplicar lógica de negocio, unir conjuntos de datos, limpiar inconsistencias y enriquecer la información para cumplir con los requisitos del esquema de destino.

La carga a destino, por su parte, es el acto final que pone a disposición de los usuarios finales o de otros sistemas los datos transformados. Esta etapa debe garantizar la integridad de los datos y manejar escenarios complejos como actualizaciones incrementales, upserts (actualizar o insertar), o la carga completa de tablas de hechos y dimensiones en un entorno de data warehouse. Un error común es subestimar esta fase, asumiendo que es una simple copia. En realidad, requiere una estrategia definida para resolver conflictos, gestionar transacciones y, en muchos casos, desencadenar procesos posteriores como la actualización de vistas materializadas o índices.

En este proyecto integrador, elevaremos nuestro pipeline para incluir transformaciones complejas que simulan un escenario real: procesaremos datos de ventas y de clientes para generar un conjunto de datos analítico listo para reportes. Utilizaremos PythonOperator para lógica personalizada y PostgresOperator o MySqlHook para cargas eficientes en bases de datos relacionales, asegurando que cada paso sea monitorizable y recuperable.

Concepto Clave: El Patrón de Transformación y Carga Idempotente

Imagina que eres un chef en un restaurante de alta cocina. Recibes ingredientes crudos (extracción). Tu misión no es solo cocinarlos (transformación), sino asegurarte de que cada vez que prepares el mismo plato a partir de los mismos ingredientes, el resultado sea exactamente igual, independientemente de cuántas veces repitas el proceso. Además, debes emplatarlo (carga) de manera que, si el camarero tropieza y deja caer el plato, puedas preparar otro idéntico rápidamente sin empezar desde cero con los ingredientes crudos. Esto es la idempotencia en ETL: la capacidad de ejecutar una tarea múltiples veces produciendo el mismo resultado final, sin duplicados o efectos secundarios no deseados.

En el contexto técnico, esto se traduce en transformaciones cuyos resultados son deterministas y en estrategias de carga que manejan inteligentemente los datos existentes. Por ejemplo, en lugar de hacer un simple INSERT que fallaría con registros duplicados, usamos comandos como INSERT ON CONFLICT UPDATE en PostgreSQL o REPLACE / cargas en tablas temporales seguidas de merges en MySQL. El DAG debe estar diseñado para que, si falla la tarea de carga y se re-ejecuta, no cree registros duplicados ni corrompa los datos ya cargados correctamente. La clave es que cada ejecución del DAG, dada una misma entrada de datos, debe producir una salida idéntica en el destino.

Cómo Funciona en la Práctica: Un Flujo Paso a Paso

Vamos a detallar el proceso para nuestra tarea de transformación y carga. Supongamos que tenemos dos tablas en nuestro staging area: stg_ventas y stg_clientes. El objetivo es crear una tabla consolidada llamada hechos_ventas_analiticas. Primero, la tarea de transformación se activa después de que las extracciones a staging hayan finalizado con éxito. Esta tarea, implementada con un PythonOperator, leerá los datos desde las tablas de staging usando un hook de base de datos (como PostgresHook). Luego, aplicará la lógica de negocio: unirá las tablas por la clave del cliente, calculará el importe total neto (restándole descuentos e impuestos), categorizará al cliente según su antigüedad y agregará una marca temporal de transformación.

El resultado de esta transformación no se escribirá directamente a la base de datos de destino. En su lugar, se guardará en un DataFrame de Pandas o, para conjuntos muy grandes, se persistirá en una tabla temporal de staging llamada tmp_ventas_transformadas. Este es un paso crucial para separar la lógica de transformación de la lógica de carga, facilitando las pruebas y la depuración. A continuación, una segunda tarea, dedicada exclusivamente a la carga, tomará los datos de esta tabla temporal y los volcará a la tabla final de hechos. Utilizará una sentencia SQL de MERGE o UPSERT para actualizar los registros existentes (identificados por una clave de negocio compuesta) e insertar los nuevos. Finalmente, una tarea de limpieza opcional eliminará la tabla temporal, manteniendo el entorno ordenado.

Código en Acción: Implementación de un DAG de Transformación y Carga

A continuación, presentamos un DAG completo que orquesta este proceso. Incluye la transformación compleja y la carga idempotente a una base de datos PostgreSQL.


from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import pandas as pd

default_args = {
    'owner': 'analytics',
    'depends_on_past': False,
    'start_date': datetime(2023, 10, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'proyecto_integrador_transformacion_carga',
    default_args=default_args,
    description='DAG para transformar datos de staging y cargar a tabla de hechos',
    schedule_interval='@daily',
    catchup=False
)

def transformar_datos(**context):
    """
    Tarea Python para realizar transformaciones complejas.
    Lee de staging, aplica lógica y escribe en una tabla temporal.
    """
    # 1. Conectar a la BD de staging (usando el hook)
    pg_hook = PostgresHook(postgres_conn_id='postgres_staging_conn')
    engine = pg_hook.get_sqlalchemy_engine()

    # 2. Extraer datos de las tablas de staging
    query_ventas = "SELECT venta_id, cliente_id, producto_id, cantidad, precio_unitario, descuento, fecha_venta FROM stg_ventas WHERE fecha_carga = '{{ ds }}'"
    query_clientes = "SELECT cliente_id, nombre, fecha_registro, segmento FROM stg_clientes"

    df_ventas = pd.read_sql(query_ventas, engine)
    df_clientes = pd.read_sql(query_clientes, engine)

    # 3. Aplicar transformaciones
    # Unir los datos
    df_completo = pd.merge(df_ventas, df_clientes, on='cliente_id', how='left')
    # Calcular importe neto
    df_completo['importe_neto'] = (df_completo['cantidad'] * df_completo['precio_unitario']) - df_completo['descuento']
    # Categorizar antigüedad del cliente
    df_completo['dias_antiguedad'] = (pd.to_datetime('today') - pd.to_datetime(df_completo['fecha_registro'])).dt.days
    df_completo['categoria_antiguedad'] = pd.cut(df_completo['dias_antiguedad'],
                                                 bins=[0, 365, 730, 2000],
                                                 labels=['Nuevo', 'Regular', 'Antiguo'])
    # Agregar marca de transformación
    df_completo['fecha_transformacion'] = datetime.now()

    # 4. Seleccionar columnas finales y guardar en tabla temporal
    df_final = df_completo[['venta_id', 'cliente_id', 'producto_id', 'fecha_venta',
                            'nombre', 'segmento', 'cantidad', 'importe_neto',
                            'categoria_antiguedad', 'fecha_transformacion']]

    # 5. Escribir a una tabla temporal en la BD de DESTINO (no en staging)
    dest_hook = PostgresHook(postgres_conn_id='postgres_destino_conn')
    dest_engine = dest_hook.get_sqlalchemy_engine()
    df_final.to_sql('tmp_ventas_transformadas', dest_engine, if_exists='replace', index=False)

    # Podríamos pasar metadatos, como el número de filas, via XCom
    context['ti'].xcom_push(key='filas_transformadas', value=len(df_final))
    print(f"Transformación completada. {len(df_final)} filas procesadas.")

# Definición de tareas
tarea_transformar = PythonOperator(
    task_id='transformar_datos_ventas_clientes',
    python_callable=transformar_datos,
    provide_context=True,
    dag=dag,
)

# SQL para carga idempotente usando UPSERT (INSERT ... ON CONFLICT)
sql_carga_upsert = """
INSERT INTO hechos_ventas_analiticas
    (venta_id, cliente_id, producto_id, fecha_venta, nombre_cliente, segmento_cliente,
     cantidad, importe_neto, categoria_antiguedad, fecha_carga)
SELECT
    venta_id, cliente_id, producto_id, fecha_venta, nombre, segmento,
    cantidad, importe_neto, categoria_antiguedad, fecha_transformacion
FROM tmp_ventas_transformadas
ON CONFLICT (venta_id, fecha_venta) DO UPDATE SET
    nombre_cliente = EXCLUDED.nombre_cliente,
    segmento_cliente = EXCLUDED.segmento_cliente,
    cantidad = EXCLUDED.cantidad,
    importe_neto = EXCLUDED.importe_neto,
    categoria_antiguedad = EXCLUDED.categoria_antiguedad,
    fecha_carga = EXCLUDED.fecha_carga;
"""

tarea_cargar_destino = PostgresOperator(
    task_id='cargar_a_tabla_hechos',
    postgres_conn_id='postgres_destino_conn',
    sql=sql_carga_upsert,
    dag=dag,
)

tarea_limpiar_temporal = PostgresOperator(
    task_id='limpiar_tabla_temporal',
    postgres_conn_id='postgres_destino_conn',
    sql="DROP TABLE IF EXISTS tmp_ventas_transformadas;",
    dag=dag,
)

# Orquestación del flujo
tarea_transformar >> tarea_cargar_destino >> tarea_limpiar_temporal
Tip Importante: Observa el uso de '{{ ds }}' en la consulta SQL dentro de la función Python. Airflow no lo reemplaza automáticamente allí. Para que funcione, necesitarías usar el método get_template_context o, mejor aún, pasar la fecha como un parámetro a la función usando op_kwargs en el PythonOperator. Una alternativa más limpia es usar un SQLOperator para la extracción o templating de Jinja dentro de strings si se usa PostgresOperator. Este ejemplo simplificado asume que la lógica de filtrado por fecha se maneja internamente.

Para complementar, aquí está el código SQL para crear la tabla de destino, que debe existir antes de ejecutar el DAG.


-- Tabla de hechos de destino en el data warehouse
CREATE TABLE IF NOT EXISTS hechos_ventas_analiticas (
    venta_id INT NOT NULL,
    cliente_id INT NOT NULL,
    producto_id INT NOT NULL,
    fecha_venta DATE NOT NULL,
    nombre_cliente VARCHAR(255),
    segmento_cliente VARCHAR(50),
    cantidad DECIMAL(10,2),
    importe_neto DECIMAL(15,2),
    categoria_antiguedad VARCHAR(20),
    fecha_carga TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    -- Clave primaria compuesta para identificar unívocamente un registro y permitir el UPSERT
    PRIMARY KEY (venta_id, fecha_venta)
);

-- Índice para mejorar rendimiento en consultas por cliente y fecha
CREATE INDEX IF NOT EXISTS idx_hechos_cliente_fecha ON hechos_ventas_analiticas(cliente_id, fecha_venta);

Errores Comunes y Cómo Evitarlos

Al implementar transformaciones y cargas en Airflow, varios escollos pueden aparecer. Identificarlos de antemano ahorra horas de depuración.

1. Transformaciones No Idempotentes: Usar funciones aleatorias o basadas en la hora actual sin control en la lógica de transformación puede generar resultados diferentes en cada ejecución. Cómo evitarlo: Aísla la lógica no determinista. Para marcas de tiempo, usa la fecha de ejecución del DAG ({{ ds }}) o la hora de procesamiento del lote de datos, no datetime.now() de manera indiscriminada.

2. Manejo Inadecuado de la Memoria en Transformaciones Grandes: Cargar DataFrames de Pandas completos con millones de registros puede colapsar el worker de Airflow. Cómo evitarlo: Utiliza procesamiento por lotes (chunking) en las lecturas SQL, considera el uso de PySparkOperator para datos masivos, o realiza las transformaciones directamente en la base de datos con SQL si es posible.

3. Falta de Aislamiento entre Ejecuciones: Si múltiples ejecuciones del mismo DAG (por ejemplo, por catchup) escriben en la misma tabla temporal sin aislarse, se pisan los datos. Cómo evitarlo: Nombra las tablas temporales incluyendo la fecha de ejecución (ej. tmp_ventas_{{ ds_nodash }}) o usa esquemas de base de datos separados por ejecución.

4. UPSERTs Ineficientes o Peligrosos: Un ON CONFLICT o MERGE mal planteado puede bloquear tablas enteras o actualizar columnas innecesariamente. Cómo evitarlo: Asegúrate de que la condición de conflicto use una clave primaria o única bien definida. En el código de actualización (parte DO UPDATE SET), actualiza solo las columnas que realmente pueden cambiar, no todas.

5. No Validar la Calidad de los Datos Antes de la Carga: Cargar datos con nulos en columnas NOT NULL o tipos de datos incorrectos hará fallar toda la tarea. Cómo evitarlo: Incluye una tarea de validación explícita entre la transformación y la carga. Esta tarea puede verificar recuentos, la ausencia de nulos críticos, o reglas de negocio usando Python o SQL, y fallar deliberadamente si no se cumplen.

Checklist de Dominio

Antes de considerar esta lección completamente integrada, asegúrate de poder verificar los siguientes puntos:

  • Puedo diseñar una tarea de transformación en Python que lea desde una fuente de datos relacional usando Hooks de Airflow y aplique lógica de negocio compleja (uniones, cálculos, categorizaciones).
  • Comprendo el principio de idempotencia y sé implementar una estrategia de carga UPSERT o MERGE en la base de datos de destino para evitar duplicados.
  • Sé separar claramente las responsabilidades: una tarea para transformar (y escribir a un área temporal) y otra tarea distinta para cargar al destino final.
  • Puedo manejar el paso de parámetros (como la fecha de ejecución) entre el contexto de Airflow y mis funciones Python para hacer las transformaciones deterministas.
  • Soy capaz de identificar y proponer soluciones para al menos tres de los errores comunes listados en la sección anterior.
  • He creado y probado un DAG que incluye al menos una transformación Python y una carga SQL idempotente, con sus dependencias correctamente encadenadas.
  • Puedo explicar la importancia de la tabla temporal intermedia (tmp_ventas_transformadas) y los pros y contras de su uso.
  • Sé dónde y cómo agregar tareas de monitoreo básico, como el envío del número de filas procesadas vía XCom o el registro de métricas personalizadas.
Falar no WhatsApp
De lección a portfolio

Convertí esta lección en evidencia para Data Analyst.

Sumá un mini caso con datos, una conclusión de negocio y una captura del resultado. Eso pesa más que decir que viste la herramienta.

Paso 1

Publicá una consulta, dashboard o notebook con una conclusión clara.

Paso 2

Agregá contexto: problema, dato usado, decisión recomendada y limitación.

Paso 3

Guardá el enlace en tu CV, LinkedIn o portfolio antes de postular.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión

Agregar Transformaciones de Datos y Carga a Des... | Cursalo