Implementar el DAG con Extracción, Transformación y Carga Continua

Video
30 min~4 min lectura

Reproductor de video

Concepto clave

En esta lección, implementaremos un DAG (Directed Acyclic Graph) en Apache Airflow que orquesta un pipeline de datos en tiempo real con extracción, transformación y carga continua. A diferencia de los pipelines batch tradicionales que se ejecutan en intervalos fijos, este enfoque maneja flujos de datos que llegan constantemente, como logs de aplicaciones, métricas de servidores o transacciones financieras.

La clave está en diseñar tareas que sean idempotentes (pueden ejecutarse múltiples veces sin efectos secundarios) y utilizar sensores o triggers basados en eventos para iniciar el procesamiento cuando hay datos nuevos. Imagina esto como una línea de ensamblaje en una fábrica que nunca se detiene: los datos son las piezas que llegan continuamente, y cada estación (tarea del DAG) realiza una operación específica antes de pasar al siguiente paso.

Cómo funciona en la práctica

Vamos a construir un DAG paso a paso para procesar datos de ventas en tiempo real desde una API REST. El pipeline incluye:

  1. Extracción: Una tarea que consulta una API cada 5 minutos para obtener nuevas ventas.
  2. Transformación: Limpieza de datos, cálculo de totales y agregación por categoría.
  3. Carga: Inserción de los resultados en una base de datos analítica como PostgreSQL.

Usaremos operadores de Airflow como PythonOperator para lógica personalizada y PostgresOperator para la carga. Aquí un fragmento de código clave para la extracción:

def extract_sales_data(**context):
    import requests
    response = requests.get('https://api.ejemplo.com/ventas')
    data = response.json()
    context['ti'].xcom_push(key='sales_data', value=data)
    return data

Este código se ejecuta como parte de una tarea en el DAG, empujando los datos a XCom para que otras tareas los usen.

Caso de estudio

Considera una empresa de e-commerce que necesita analizar ventas en tiempo real para ajustar inventarios. Implementamos un DAG con las siguientes tareas:

TareaOperadorFrecuencia
extraer_ventasPythonOperatorCada 5 min
transformar_datosPythonOperatorDepende de extraer_ventas
cargar_a_postgresPostgresOperatorDepende de transformar_datos

Los datos de entrada son JSON con campos como product_id, quantity, y price. En la transformación, calculamos el total por venta y agrupamos por categoría de producto. La salida se carga en una tabla ventas_agregadas para dashboards en tiempo real.

En un despliegue real, este DAG procesa hasta 10,000 ventas por hora, con latencia menor a 1 minuto desde la venta hasta el reporte.

Errores comunes

  • No manejar fallos en la extracción: Si la API falla, el pipeline se detiene. Solución: Usar retries y timeouts en las tareas, o implementar un sensor que espere hasta que la API esté disponible.
  • Procesamiento no idempotente Ejecutar la misma tarea múltiples veces puede duplicar datos. Solución: Diseñar transformaciones que usen timestamps o IDs únicos para evitar duplicados.
  • Falta de monitoreo de recursos Pipelines en tiempo real pueden consumir mucha memoria o CPU. Solución: Configurar límites en los operadores y usar métricas de Airflow para alertas.
  • Ignorar la calidad de datos Datos corruptos pueden propagarse. Solución: Incluir validaciones en la transformación, como checks de tipos de datos o rangos.

Checklist de dominio

  1. ¿Puedes diseñar un DAG con tareas para extracción, transformación y carga continua?
  2. ¿Sabes usar XCom para pasar datos entre tareas en Airflow?
  3. ¿Implementas idempotencia en tus pipelines para evitar duplicados?
  4. ¿Configuras retries y alertas para manejar fallos en tiempo real?
  5. ¿Validas la calidad de datos durante la transformación?
  6. ¿Optimizas el rendimiento ajustando la concurrencia y recursos?
  7. ¿Documentas el DAG con descripciones claras y tags?

Implementa un DAG para procesar logs de servidor en tiempo real

En este ejercicio, crearás un DAG en Airflow que procese logs de servidor en tiempo real. Sigue estos pasos:

  1. Configura un entorno local de Airflow o usa un sandbox en la nube.
  2. Crea un archivo Python para el DAG llamado logs_realtime_dag.py.
  3. Define una tarea de extracción que simule leer logs desde un archivo /tmp/server.log cada 2 minutos. Usa un PythonOperator con una función que lea las líneas nuevas.
  4. Añade una tarea de transformación que cuente el número de errores (líneas que contengan 'ERROR') en los logs. Pasa el resultado usando XCom.
  5. Implementa una tarea de carga que inserte el conteo de errores en una tabla SQLite o PostgreSQL, usando un PostgresOperator o SQLiteOperator.
  6. Configura las dependencias entre tareas para que se ejecuten en secuencia.
  7. Prueba el DAG ejecutándolo manualmente y verifica los resultados en la base de datos.
Pistas
  • Usa open() con modo 'r' para leer el archivo de logs, y considera almacenar la última posición leída para evitar reprocesar.
  • Para la transformación, puedes usar una expresión regular o str.count() para identificar errores.
  • Si usas SQLite, asegúrate de que la base de datos esté accesible desde Airflow y crea la tabla de antemano.

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.