Concepto clave
En esta lección, aprenderás a construir y desplegar un pipeline de datos en tiempo real completo usando Apache Airflow. Un pipeline en tiempo real procesa datos a medida que llegan, en lugar de hacerlo en lotes programados. Imagina una cadena de montaje en una fábrica: cada estación (tarea en Airflow) realiza una operación específica (como limpiar, transformar o cargar datos) de forma continua, asegurando que el producto final (datos analíticos) esté siempre actualizado.
El corazón de este proceso es el DAG (Directed Acyclic Graph), que define las dependencias entre tareas. En un entorno simulado, replicarás condiciones reales, como fuentes de datos que emiten eventos (por ejemplo, logs de aplicaciones o transacciones financieras) y destinos como bases de datos o dashboards. Esto te prepara para escenarios de producción donde la latencia y la confiabilidad son críticas.
Cómo funciona en la práctica
Para implementar un pipeline en tiempo real con Airflow, sigue estos pasos:
- Diseña el DAG: Define tareas como
extraer_datos,transformar_datosycargar_datos, con dependencias claras. Usa sensores (por ejemplo,FileSensor) para detectar nuevos archivos de datos en un directorio simulado. - Configura el entorno: En un contenedor Docker o máquina virtual, instala Airflow y configura conexiones a fuentes (como una API REST simulada) y destinos (por ejemplo, PostgreSQL).
- Ejecuta y monitorea: Inicia el DAG con un programador frecuente (por ejemplo, cada minuto) y usa la interfaz web de Airflow para verificar el estado de las tareas y logs.
Ejemplo de código para una tarea de extracción:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def extraer_datos():
# Simula la lectura de un archivo JSON nuevo
import json
with open('/data/input/eventos.json', 'r') as f:
datos = json.load(f)
return datos
dag = DAG('pipeline_tiempo_real', start_date=datetime(2023, 1, 1), schedule_interval='* * * * *')
extraer = PythonOperator(task_id='extraer_datos', python_callable=extraer_datos, dag=dag)Caso de estudio
Considera una empresa de e-commerce que necesita analizar las visitas a su sitio web en tiempo real. El pipeline con Airflow incluye:
- Fuente: Un servidor web que genera logs en formato JSON cada vez que un usuario visita una página.
- Procesamiento: Un DAG con tareas para: (1) leer nuevos archivos de log, (2) filtrar visitas válidas, (3) agregar datos por país y página, y (4) cargar resultados a una tabla en BigQuery.
- Resultado: Un dashboard que muestra tráfico en vivo, actualizado cada minuto.
Tabla de ejemplo de datos transformados:
| País | Página | Visitas | Timestamp |
|---|---|---|---|
| México | /productos | 150 | 2023-10-01 10:00:00 |
| España | /carrito | 85 | 2023-10-01 10:01:00 |
En tiempo real, la latencia debe ser inferior a 5 segundos para que los datos sean útiles en toma de decisiones inmediatas.
Errores comunes
- Programación demasiado frecuente: Configurar
schedule_intervalen segundos puede saturar el sistema. Usa intervalos como*/5 * * * *(cada 5 minutos) para balancear actualización y rendimiento. - Falta de manejo de errores: No incluir reintentos o alertas en tareas críticas. En el DAG, define
retries=3yretry_delaypara recuperarse de fallos temporales. - Dependencias circulares: Crear un ciclo en el DAG (por ejemplo, tarea A depende de B y B depende de A) causa fallos. Verifica el grafo en la interfaz web para asegurar que sea acíclico.
- Ignorar la escalabilidad: Usar operadores pesados en un solo nodo puede ralentizar el pipeline. Considera
KubernetesPodOperatorpara distribuir carga en entornos grandes.
Checklist de dominio
- Diseñé un DAG con al menos 4 tareas interconectadas para un flujo en tiempo real.
- Configuré sensores para detectar nuevos datos en una fuente simulada (por ejemplo, archivos o mensajes).
- Implementé tareas de transformación que procesen datos en streaming (por ejemplo, agregaciones con ventanas de tiempo).
- Desplegué el pipeline en un entorno Docker y verifiqué su ejecución continua.
- Monitoreé el rendimiento usando métricas como tiempo de ejecución y tasa de errores en la UI de Airflow.
- Documenté el proceso de despliegue y configuración para replicación en producción.
- Probé el manejo de fallos simulando una interrupción en la fuente de datos.
Construye un Pipeline de Logs de Aplicación en Tiempo Real
En este ejercicio, crearás y desplegarás un pipeline que procesa logs de una aplicación web simulada en tiempo real usando Airflow. Sigue estos pasos:
- Prepara el entorno: Instala Apache Airflow en un contenedor Docker. Crea un directorio
/data/logspara almacenar archivos de log simulados (por ejemplo, en formato JSON con campos como timestamp, usuario, acción). - Diseña el DAG: Define un DAG llamado
procesar_logs_tiempo_realcon las siguientes tareas:detectar_log_nuevo: UsaFileSensorpara monitorear nuevos archivos en/data/logs.leer_log: Lee el archivo detectado y carga los datos en memoria.filtrar_errores: Filtra entradas con nivel de log "ERROR".agregar_por_usuario: Agrupa errores por usuario y cuenta ocurrencias.guardar_resultados: Guarda los resultados agregados en un archivo CSV en/data/output.
- Simula datos: Escribe un script en Python que genere archivos de log cada 30 segundos en
/data/logs, con datos aleatorios (por ejemplo, timestamp actual, usuario entre "user1" y "user5", acción como "login" o "purchase", nivel como "INFO" o "ERROR"). - Ejecuta y prueba: Inicia el DAG con un
schedule_intervalde*/2 * * * *(cada 2 minutos). Usa la UI de Airflow para verificar que las tareas se completen exitosamente y que los archivos CSV se generen con datos agregados. - Optimiza: Añade manejo de errores configurando
retries=2en tareas críticas y monitorea el rendimiento en los logs.
- Usa el operador PythonOperator para tareas personalizadas como leer y filtrar logs.
- Configura el FileSensor con un timeout adecuado (por ejemplo, 300 segundos) para evitar bloqueos.
- Verifica que las rutas de archivos en Docker estén montadas correctamente usando volúmenes.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.