Proyecto: Construir y Desplegar el Pipeline Completo en un Entorno Simulado

Lectura
40 min~5 min lectura

Concepto clave

En esta lección, aprenderás a construir y desplegar un pipeline de datos en tiempo real completo usando Apache Airflow. Un pipeline en tiempo real procesa datos a medida que llegan, en lugar de hacerlo en lotes programados. Imagina una cadena de montaje en una fábrica: cada estación (tarea en Airflow) realiza una operación específica (como limpiar, transformar o cargar datos) de forma continua, asegurando que el producto final (datos analíticos) esté siempre actualizado.

El corazón de este proceso es el DAG (Directed Acyclic Graph), que define las dependencias entre tareas. En un entorno simulado, replicarás condiciones reales, como fuentes de datos que emiten eventos (por ejemplo, logs de aplicaciones o transacciones financieras) y destinos como bases de datos o dashboards. Esto te prepara para escenarios de producción donde la latencia y la confiabilidad son críticas.

Cómo funciona en la práctica

Para implementar un pipeline en tiempo real con Airflow, sigue estos pasos:

  1. Diseña el DAG: Define tareas como extraer_datos, transformar_datos y cargar_datos, con dependencias claras. Usa sensores (por ejemplo, FileSensor) para detectar nuevos archivos de datos en un directorio simulado.
  2. Configura el entorno: En un contenedor Docker o máquina virtual, instala Airflow y configura conexiones a fuentes (como una API REST simulada) y destinos (por ejemplo, PostgreSQL).
  3. Ejecuta y monitorea: Inicia el DAG con un programador frecuente (por ejemplo, cada minuto) y usa la interfaz web de Airflow para verificar el estado de las tareas y logs.

Ejemplo de código para una tarea de extracción:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def extraer_datos():
    # Simula la lectura de un archivo JSON nuevo
    import json
    with open('/data/input/eventos.json', 'r') as f:
        datos = json.load(f)
    return datos

dag = DAG('pipeline_tiempo_real', start_date=datetime(2023, 1, 1), schedule_interval='* * * * *')
extraer = PythonOperator(task_id='extraer_datos', python_callable=extraer_datos, dag=dag)

Caso de estudio

Considera una empresa de e-commerce que necesita analizar las visitas a su sitio web en tiempo real. El pipeline con Airflow incluye:

  • Fuente: Un servidor web que genera logs en formato JSON cada vez que un usuario visita una página.
  • Procesamiento: Un DAG con tareas para: (1) leer nuevos archivos de log, (2) filtrar visitas válidas, (3) agregar datos por país y página, y (4) cargar resultados a una tabla en BigQuery.
  • Resultado: Un dashboard que muestra tráfico en vivo, actualizado cada minuto.

Tabla de ejemplo de datos transformados:

PaísPáginaVisitasTimestamp
México/productos1502023-10-01 10:00:00
España/carrito852023-10-01 10:01:00
En tiempo real, la latencia debe ser inferior a 5 segundos para que los datos sean útiles en toma de decisiones inmediatas.

Errores comunes

  • Programación demasiado frecuente: Configurar schedule_interval en segundos puede saturar el sistema. Usa intervalos como */5 * * * * (cada 5 minutos) para balancear actualización y rendimiento.
  • Falta de manejo de errores: No incluir reintentos o alertas en tareas críticas. En el DAG, define retries=3 y retry_delay para recuperarse de fallos temporales.
  • Dependencias circulares: Crear un ciclo en el DAG (por ejemplo, tarea A depende de B y B depende de A) causa fallos. Verifica el grafo en la interfaz web para asegurar que sea acíclico.
  • Ignorar la escalabilidad: Usar operadores pesados en un solo nodo puede ralentizar el pipeline. Considera KubernetesPodOperator para distribuir carga en entornos grandes.

Checklist de dominio

  1. Diseñé un DAG con al menos 4 tareas interconectadas para un flujo en tiempo real.
  2. Configuré sensores para detectar nuevos datos en una fuente simulada (por ejemplo, archivos o mensajes).
  3. Implementé tareas de transformación que procesen datos en streaming (por ejemplo, agregaciones con ventanas de tiempo).
  4. Desplegué el pipeline en un entorno Docker y verifiqué su ejecución continua.
  5. Monitoreé el rendimiento usando métricas como tiempo de ejecución y tasa de errores en la UI de Airflow.
  6. Documenté el proceso de despliegue y configuración para replicación en producción.
  7. Probé el manejo de fallos simulando una interrupción en la fuente de datos.

Construye un Pipeline de Logs de Aplicación en Tiempo Real

En este ejercicio, crearás y desplegarás un pipeline que procesa logs de una aplicación web simulada en tiempo real usando Airflow. Sigue estos pasos:

  1. Prepara el entorno: Instala Apache Airflow en un contenedor Docker. Crea un directorio /data/logs para almacenar archivos de log simulados (por ejemplo, en formato JSON con campos como timestamp, usuario, acción).
  2. Diseña el DAG: Define un DAG llamado procesar_logs_tiempo_real con las siguientes tareas:
    • detectar_log_nuevo: Usa FileSensor para monitorear nuevos archivos en /data/logs.
    • leer_log: Lee el archivo detectado y carga los datos en memoria.
    • filtrar_errores: Filtra entradas con nivel de log "ERROR".
    • agregar_por_usuario: Agrupa errores por usuario y cuenta ocurrencias.
    • guardar_resultados: Guarda los resultados agregados en un archivo CSV en /data/output.
    Configura dependencias para que las tareas se ejecuten en orden.
  3. Simula datos: Escribe un script en Python que genere archivos de log cada 30 segundos en /data/logs, con datos aleatorios (por ejemplo, timestamp actual, usuario entre "user1" y "user5", acción como "login" o "purchase", nivel como "INFO" o "ERROR").
  4. Ejecuta y prueba: Inicia el DAG con un schedule_interval de */2 * * * * (cada 2 minutos). Usa la UI de Airflow para verificar que las tareas se completen exitosamente y que los archivos CSV se generen con datos agregados.
  5. Optimiza: Añade manejo de errores configurando retries=2 en tareas críticas y monitorea el rendimiento en los logs.
Pistas
  • Usa el operador PythonOperator para tareas personalizadas como leer y filtrar logs.
  • Configura el FileSensor con un timeout adecuado (por ejemplo, 300 segundos) para evitar bloqueos.
  • Verifica que las rutas de archivos en Docker estén montadas correctamente usando volúmenes.

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.