Concepto clave
En los pipelines ETL, la transformación de datos es la fase donde los datos crudos se convierten en información útil y estructurada. Imagina que eres un chef: los ingredientes crudos (datos sin procesar) llegan a tu cocina, pero necesitas cortarlos, mezclarlos y cocinarlos para crear un plato delicioso (datos analizables). En el contexto de Airflow, esta transformación ocurre dentro de las tareas de un DAG, utilizando herramientas como Pandas, una biblioteca de Python especializada en manipulación de datos.
La integración de Pandas en Airflow permite automatizar transformaciones complejas como limpieza, agregación, uniones y cálculos estadísticos. A diferencia de procesar datos manualmente, Airflow orquesta estas transformaciones de manera repetible y monitorizable. Piensa en un sistema de correo: Airflow es el cartero que entrega paquetes (datos) a diferentes oficinas (tareas), y Pandas es el empleado que abre, clasifica y organiza el contenido dentro de cada oficina.
Cómo funciona en la práctica
Para transformar datos con Pandas en Airflow, se crea una tarea PythonOperator que ejecuta un script de transformación. Este script carga datos desde una fuente (por ejemplo, un archivo CSV o una base de datos), aplica operaciones de Pandas y guarda el resultado. Aquí un ejemplo paso a paso:
- Define una función en Python que use Pandas para leer datos, por ejemplo, desde un archivo CSV temporal.
- Dentro de la función, aplica transformaciones como eliminar valores nulos, renombrar columnas o calcular nuevas métricas.
- Guarda el resultado en un formato como Parquet o en una base de datos.
- Integra esta función en un DAG de Airflow usando PythonOperator, asegurando dependencias con otras tareas.
Ejemplo de código básico en una tarea Airflow:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd
def transform_data():
df = pd.read_csv('/tmp/raw_data.csv')
df = df.dropna() # Limpieza
df['total'] = df['quantity'] * df['price'] # Transformación
df.to_parquet('/tmp/transformed_data.parquet')
dag = DAG('etl_pipeline', schedule_interval='@daily')
transform_task = PythonOperator(
task_id='transform_with_pandas',
python_callable=transform_data,
dag=dag
)Caso de estudio
Supongamos que trabajas en una empresa de e-commerce que necesita analizar ventas diarias. Los datos crudos llegan en un archivo CSV con columnas como order_id, product, quantity, price, y date. El objetivo es transformar estos datos para calcular el total de ventas por producto y día, eliminando pedidos cancelados (donde quantity es negativa).
Usando Pandas en Airflow, se crea un DAG que:
- Carga el CSV en un DataFrame de Pandas.
- Filtra filas con quantity > 0.
- Agrega una columna total_sale = quantity * price.
- Agrupa por product y date para sumar total_sale.
- Guarda el resultado en una tabla de base de datos para reportes.
Esto automatiza el proceso, asegurando que cada día los datos se transformen consistentemente sin intervención manual.
Errores comunes
- No manejar memoria eficientemente: Pandas carga datos en memoria; para datasets grandes, esto puede causar fallos. Solución: Usar técnicas como chunking o considerar herramientas como Dask para big data.
- Ignorar el manejo de errores en las tareas: Si una transformación falla, el DAG puede detenerse sin limpieza. Solución: Incluir excepciones en el código Python y usar retries en Airflow.
- Acoplar lógica de transformación con lógica de orquestación: Mezclar código de Pandas directamente en el DAG hace el mantenimiento difícil. Solución: Separar la lógica en funciones modulares y reutilizables.
- No validar datos después de la transformación: Asumir que los datos son correctos puede llevar a errores en análisis posteriores. Solución: Agregar pasos de validación, como verificar tipos de datos o rangos esperados.
- Olvidar la idempotencia: Si una tarea se re-ejecuta, podría duplicar datos. Solución: Diseñar transformaciones que produzcan el mismo resultado sin importar cuántas veces se ejecuten, por ejemplo, sobrescribiendo archivos de salida.
Checklist de dominio
- Puedo crear una función Python que use Pandas para limpiar y transformar un dataset simple.
- Entiendo cómo integrar una función de Pandas en un DAG de Airflow usando PythonOperator.
- Sé manejar errores comunes en transformaciones, como valores nulos o tipos incorrectos.
- Puedo diseñar una transformación idempotente que no cause duplicados en re-ejecuciones.
- Conozco técnicas para optimizar el uso de memoria con Pandas en pipelines grandes.
- Sé validar datos después de una transformación para asegurar calidad.
- Puedo documentar un proceso de transformación para colaboración en equipo.
Crear un Pipeline ETL con Pandas en Airflow para Datos de Ventas
En este ejercicio, construirás un DAG de Airflow que transforma datos de ventas usando Pandas. Sigue estos pasos:
- Prepara un archivo CSV llamado
sales_raw.csvcon datos de ejemplo. Usa esta estructura:order_id product quantity price date 1 Laptop 2 800 2023-10-01 2 Mouse 5 20 2023-10-01 3 Laptop 1 800 2023-10-02 4 Keyboard -1 50 2023-10-02 - Crea una función Python llamada
transform_sales_dataque:- Cargue
sales_raw.csven un DataFrame de Pandas. - Filtre filas donde quantity sea mayor a 0 (eliminando cancelaciones).
- Calcule una nueva columna
total_salecomo quantity * price. - Agrupe los datos por product y date, sumando total_sale.
- Guarde el resultado en un archivo Parquet llamado
sales_transformed.parquet.
- Cargue
- Define un DAG en Airflow con un PythonOperator que ejecute
transform_sales_data. Configura el DAG para ejecutarse diariamente. - Prueba el DAG ejecutándolo manualmente en tu entorno Airflow y verifica que
sales_transformed.parquetse genere correctamente.
- Recuerda que Pandas requiere rutas absolutas o relativas al directorio de trabajo; ajusta las rutas según tu entorno.
- Para agrupar y sumar en Pandas, usa df.groupby(['product', 'date'])['total_sale'].sum().reset_index().
- Asegúrate de que tu DAG tenga un start_date válido y no use ejecuciones futuras (por ejemplo, usa dag.run() para pruebas).
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.