Concepto clave
Un pipeline de datos en tiempo real con Apache Airflow no procesa datos instantáneamente como un sistema de streaming, sino que orquesta tareas que se ejecutan con alta frecuencia (por ejemplo, cada minuto) para manejar datos casi en tiempo real. Piensa en esto como una línea de ensamblaje en una fábrica: los datos llegan continuamente, y Airflow coordina los pasos de extracción, transformación y carga (ETL) de manera automatizada y confiable, asegurando que cada lote se procese en el momento adecuado.
En este contexto, los DAGs (Directed Acyclic Graphs) son el corazón de Airflow, definiendo las dependencias entre tareas. Para pipelines en tiempo real, se configuran con intervalos cortos usando schedule_interval (como '*/1 * * * *' para cada minuto). La clave está en diseñar DAGs que sean idempotentes (pueden ejecutarse múltiples veces sin efectos secundarios) y tolerantes a fallos, ya que los datos fluyen constantemente y los errores deben manejarse sin interrumpir el flujo completo.
Cómo funciona en la práctica
Imagina que estás construyendo un pipeline para analizar transacciones de una tienda online en tiempo real. Sigue estos pasos:
- Diseño del DAG: Define un DAG con un schedule_interval de 5 minutos. Las tareas incluyen: extraer datos de una API de transacciones, transformarlos para calcular métricas como ventas por producto, y cargar los resultados en una base de datos analítica.
- Configuración de tareas: Usa operadores de Airflow como PythonOperator para la extracción y transformación, y PostgresOperator para la carga. Establece dependencias con set_upstream o >> para que las tareas se ejecuten en orden.
- Manejo de errores: Implementa retries y alertas en caso de fallos en la API o la base de datos, usando parámetros como retries y email_on_failure.
- Monitoreo: Utiliza la interfaz web de Airflow para ver el estado de las ejecuciones y métricas como duración de tareas.
Ejemplo de código para un DAG básico:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def extract_data():
# Lógica para extraer datos de una API
pass
def transform_data():
# Lógica para transformar datos
pass
def load_data():
# Lógica para cargar a una base de datos
pass
default_args = {
'owner': 'data_engineer',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'tienda_online_realtime',
default_args=default_args,
description='Pipeline para transacciones en tiempo real',
schedule_interval='*/5 * * * *',
start_date=datetime(2023, 10, 1)
)
extract_task = PythonOperator(
task_id='extract_transactions',
python_callable=extract_data,
dag=dag
)
transform_task = PythonOperator(
task_id='transform_metrics',
python_callable=transform_data,
dag=dag
)
load_task = PythonOperator(
task_id='load_to_db',
python_callable=load_data,
dag=dag
)
extract_task >> transform_task >> load_taskCaso de estudio
Una empresa de logística necesita monitorear entregas en tiempo real para optimizar rutas. Implementan un pipeline con Airflow que:
- Extrae: Datos de GPS de camiones cada 2 minutos desde una API REST.
- Transforma: Calcula la distancia recorrida y estima tiempos de entrega usando Python y pandas.
- Carga: Inserta los resultados en una tabla de PostgreSQL para dashboards en tiempo real.
El DAG se ejecuta cada 2 minutos (schedule_interval='*/2 * * * *'), con tareas paralelas para diferentes regiones. Usan XComs para pasar datos entre tareas, como las coordenadas procesadas. Un error común fue no manejar datos duplicados; lo solucionaron agregando una capa de idempotencia en la transformación.
Resultado: Reducción del 15% en tiempos de entrega gracias a la visibilidad en tiempo real proporcionada por el pipeline.
Errores comunes
- Programar DAGs con start_date en el futuro o muy atrás: Esto puede causar ejecuciones no deseadas o faltantes. Solución: Usa una start_date fija en el pasado y prueba con airflow backfill si es necesario.
- No manejar fallos en fuentes de datos externas: Si una API falla, el pipeline puede detenerse. Solución: Implementa retries y timeouts en las tareas, y considera usar sensores para esperar datos.
- Olvidar la idempotencia: En pipelines en tiempo real, los datos pueden reprocesarse accidentalmente. Solución: Diseña tareas que verifiquen si los datos ya fueron procesados, usando marcas de tiempo o IDs únicos.
- Sobrecargar el scheduler con intervalos muy cortos: Un schedule_interval de segundos puede saturar Airflow. Solución: Usa intervalos de minutos o más, y optimiza el código para ejecuciones rápidas.
- Ignorar el monitoreo de recursos: Pipelines frecuentes pueden consumir mucha CPU o memoria. Solución: Configura límites en los operadores y monitorea métricas del sistema.
Checklist de dominio
- ¿Puedes diseñar un DAG con schedule_interval para ejecuciones cada 1-5 minutos?
- ¿Sabes configurar retries y manejo de errores en tareas de Airflow?
- ¿Eres capaz de implementar idempotencia en un pipeline de datos?
- ¿Puedes usar XComs para pasar datos entre tareas en un DAG?
- ¿Sabes monitorear y depurar ejecuciones de DAGs en la interfaz web de Airflow?
- ¿Puedes optimizar un DAG para reducir su tiempo de ejecución?
- ¿Eres capaz de integrar Airflow con fuentes de datos comunes como APIs o bases de datos?
Implementa un Pipeline de Datos en Tiempo Real para Análisis de Redes Sociales
En este ejercicio, crearás un DAG en Airflow que simule un pipeline en tiempo real para analizar publicaciones de redes sociales. Sigue estos pasos:
- Configura el entorno: Asegúrate de tener Airflow instalado y funcionando localmente o en un entorno de prueba.
- Diseña el DAG: Crea un archivo Python llamado
social_media_realtime.py. Define un DAG con:- Nombre:
social_media_analysis - schedule_interval: '*/3 * * * *' (cada 3 minutos)
- start_date: una fecha pasada, como datetime(2023, 10, 1)
- default_args con owner='tu_nombre' y retries=2
- Nombre:
- Agrega tareas: Implementa tres tareas usando PythonOperator:
- extraer_publicaciones: Simula la extracción de datos de una API de redes sociales (puedes usar una lista fija de diccionarios con claves como 'usuario', 'texto', 'timestamp').
- analizar_sentimiento: Transforma los datos para analizar el sentimiento del texto (puede ser simple, como contar palabras positivas/negativas).
- guardar_resultados: Carga los resultados en un archivo CSV o una base de datos SQLite, asegurándote de evitar duplicados usando el timestamp.
- Establece dependencias: Ordena las tareas para que se ejecuten secuencialmente.
- Prueba el DAG: Ejecuta
airflow dags listpara verificar que aparezca, y usaairflow tasks test social_media_analysis extraer_publicaciones 2023-10-01para probar una tarea. - Monitorea: Accede a la interfaz web de Airflow y verifica que el DAG se active cada 3 minutos y complete las ejecuciones exitosamente.
Entrega: El archivo Python del DAG y una captura de pantalla de la interfaz web mostrando al menos una ejecución exitosa.
Pistas- Usa el módulo datetime para manejar timestamps y evitar duplicados en la carga.
- Para simular datos, puedes hardcodear una lista de publicaciones en la función de extracción.
- Si usas SQLite, recuerda cerrar conexiones después de cada tarea para evitar bloqueos.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.