Concepto clave
En esta lección, implementaremos un DAG (Directed Acyclic Graph) en Apache Airflow que orquesta un pipeline de datos en tiempo real con extracción, transformación y carga continua. A diferencia de los pipelines batch tradicionales que se ejecutan en intervalos fijos, este enfoque maneja flujos de datos que llegan constantemente, como logs de aplicaciones, métricas de servidores o transacciones financieras.
La clave está en diseñar tareas que sean idempotentes (pueden ejecutarse múltiples veces sin efectos secundarios) y utilizar sensores o triggers basados en eventos para iniciar el procesamiento cuando hay datos nuevos. Imagina esto como una línea de ensamblaje en una fábrica que nunca se detiene: los datos son las piezas que llegan continuamente, y cada estación (tarea del DAG) realiza una operación específica antes de pasar al siguiente paso.
Cómo funciona en la práctica
Vamos a construir un DAG paso a paso para procesar datos de ventas en tiempo real desde una API REST. El pipeline incluye:
- Extracción: Una tarea que consulta una API cada 5 minutos para obtener nuevas ventas.
- Transformación: Limpieza de datos, cálculo de totales y agregación por categoría.
- Carga: Inserción de los resultados en una base de datos analítica como PostgreSQL.
Usaremos operadores de Airflow como PythonOperator para lógica personalizada y PostgresOperator para la carga. Aquí un fragmento de código clave para la extracción:
def extract_sales_data(**context):
import requests
response = requests.get('https://api.ejemplo.com/ventas')
data = response.json()
context['ti'].xcom_push(key='sales_data', value=data)
return dataEste código se ejecuta como parte de una tarea en el DAG, empujando los datos a XCom para que otras tareas los usen.
Caso de estudio
Considera una empresa de e-commerce que necesita analizar ventas en tiempo real para ajustar inventarios. Implementamos un DAG con las siguientes tareas:
| Tarea | Operador | Frecuencia |
|---|---|---|
| extraer_ventas | PythonOperator | Cada 5 min |
| transformar_datos | PythonOperator | Depende de extraer_ventas |
| cargar_a_postgres | PostgresOperator | Depende de transformar_datos |
Los datos de entrada son JSON con campos como product_id, quantity, y price. En la transformación, calculamos el total por venta y agrupamos por categoría de producto. La salida se carga en una tabla ventas_agregadas para dashboards en tiempo real.
En un despliegue real, este DAG procesa hasta 10,000 ventas por hora, con latencia menor a 1 minuto desde la venta hasta el reporte.
Errores comunes
- No manejar fallos en la extracción: Si la API falla, el pipeline se detiene. Solución: Usar retries y timeouts en las tareas, o implementar un sensor que espere hasta que la API esté disponible.
- Procesamiento no idempotente Ejecutar la misma tarea múltiples veces puede duplicar datos. Solución: Diseñar transformaciones que usen timestamps o IDs únicos para evitar duplicados.
- Falta de monitoreo de recursos Pipelines en tiempo real pueden consumir mucha memoria o CPU. Solución: Configurar límites en los operadores y usar métricas de Airflow para alertas.
- Ignorar la calidad de datos Datos corruptos pueden propagarse. Solución: Incluir validaciones en la transformación, como checks de tipos de datos o rangos.
Checklist de dominio
- ¿Puedes diseñar un DAG con tareas para extracción, transformación y carga continua?
- ¿Sabes usar XCom para pasar datos entre tareas en Airflow?
- ¿Implementas idempotencia en tus pipelines para evitar duplicados?
- ¿Configuras retries y alertas para manejar fallos en tiempo real?
- ¿Validas la calidad de datos durante la transformación?
- ¿Optimizas el rendimiento ajustando la concurrencia y recursos?
- ¿Documentas el DAG con descripciones claras y tags?
Implementa un DAG para procesar logs de servidor en tiempo real
En este ejercicio, crearás un DAG en Airflow que procese logs de servidor en tiempo real. Sigue estos pasos:
- Configura un entorno local de Airflow o usa un sandbox en la nube.
- Crea un archivo Python para el DAG llamado
logs_realtime_dag.py. - Define una tarea de extracción que simule leer logs desde un archivo
/tmp/server.logcada 2 minutos. Usa unPythonOperatorcon una función que lea las líneas nuevas. - Añade una tarea de transformación que cuente el número de errores (líneas que contengan 'ERROR') en los logs. Pasa el resultado usando XCom.
- Implementa una tarea de carga que inserte el conteo de errores en una tabla SQLite o PostgreSQL, usando un
PostgresOperatoroSQLiteOperator. - Configura las dependencias entre tareas para que se ejecuten en secuencia.
- Prueba el DAG ejecutándolo manualmente y verifica los resultados en la base de datos.
- Usa
open()con modo 'r' para leer el archivo de logs, y considera almacenar la última posición leída para evitar reprocesar. - Para la transformación, puedes usar una expresión regular o
str.count()para identificar errores. - Si usas SQLite, asegúrate de que la base de datos esté accesible desde Airflow y crea la tabla de antemano.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.