Concepto clave
Apache Airflow es una plataforma de orquestación de flujos de trabajo que permite programar y monitorear pipelines de datos mediante DAGs (Directed Acyclic Graphs). Un DAG es una colección de tareas con dependencias definidas que se ejecutan en un orden específico, similar a cómo una receta de cocina tiene pasos secuenciales: no puedes hornear un pastel antes de mezclar los ingredientes.
La arquitectura de Airflow se basa en componentes clave: el scheduler (que planifica las ejecuciones), el executor (que ejecuta las tareas), y la base de datos (que almacena metadatos). Imagina un controlador de tráfico aéreo: el scheduler coordina cuándo despegan los aviones (tareas), el executor es la pista donde aterrizan, y la base de datos registra todos los vuelos.
Cómo funciona en la práctica
Para crear un pipeline ETL básico en Airflow, primero defines un DAG en Python. Por ejemplo, un pipeline que extrae datos de una API, los transforma y los carga en una base de datos. Los pasos son:
- Importar los módulos necesarios:
from airflow import DAGyfrom airflow.operators.python import PythonOperator. - Definir funciones Python para cada tarea (extraer, transformar, cargar).
- Crear un objeto DAG con parámetros como
schedule_interval(ej., diario). - Instanciar operadores que referencien las funciones y establecer dependencias con
>>(ej.,extraer >> transformar >> cargar).
Al ejecutar, Airflow visualiza el DAG en su interfaz web, mostrando el estado de cada tarea (éxito, fallo, en ejecución).
Caso de estudio
Una empresa de retail necesita actualizar diariamente su dashboard de ventas. Implementamos un DAG en Airflow con estas tareas:
| Tarea | Operador | Descripción |
|---|---|---|
| extraer_ventas | PythonOperator | Consulta una API REST para obtener datos de ventas del día anterior |
| limpiar_datos | PythonOperator | Limpia valores nulos y formatea fechas |
| calcular_metricas | PythonOperator | Calcula KPI como ventas totales y promedio por tienda |
| cargar_bigquery | BigQueryOperator | Inserta los datos procesados en BigQuery para análisis |
El DAG se programa para ejecutarse a las 2 AM cada día, asegurando que el dashboard esté actualizado al inicio de la jornada laboral. Usamos schedule_interval='0 2 * * *' (notación cron) y configuramos reintentos automáticos en caso de fallos temporales.
En producción, este pipeline procesa más de 1 millón de registros diarios, reduciendo el tiempo manual de 4 horas a 15 minutos automatizados.
Errores comunes
- No definir correctamente las dependencias entre tareas: Esto puede causar ejecuciones en paralelo no deseadas o errores de orden. Solución: Usar
>>oset_upstream/set_downstreamexplícitamente y validar el gráfico en la UI. - Programar DAGs con intervalos solapados: Configurar
schedule_intervalmuy frecuente puede llevar a ejecuciones concurrentes que colapsen recursos. Solución: Calcular el tiempo de ejecución y añadir márgenes, o usar pools para limitar concurrencia. - No manejar fallos en tareas: Airflow reintenta por defecto, pero sin lógica de recuperación (ej., reintentar solo en errores de red), los pipelines pueden fallar repetidamente. Solución: Implementar
retriesyretry_delayen el DAG, y usar sentencias try-catch en las funciones. - Ignorar la configuración del executor: Usar el executor LocalExecutor en producción limita la escalabilidad. Solución: Migrar a CeleryExecutor o KubernetesExecutor para distribuir carga en clusters.
Checklist de dominio
- Puedo explicar qué es un DAG y cómo representa un flujo de trabajo en Airflow.
- He configurado un entorno local de Airflow usando Docker o instalación directa.
- Sé crear un DAG básico con al menos tres tareas y dependencias definidas.
- Entiendo cómo programar ejecuciones con
schedule_interval(ej., usando cron o expresiones de intervalo). - Puedo monitorear y depurar tareas fallidas en la interfaz web de Airflow.
- He manejado parámetros como
start_dateycatchuppara evitar ejecuciones no deseadas. - Sé configurar conexiones y hooks en Airflow para integrar con bases de datos o APIs externas.
Configura un entorno local de Airflow y crea tu primer DAG ETL
En este ejercicio, configurarás Apache Airflow en tu máquina local y desarrollarás un DAG simple que simula un pipeline ETL. Sigue estos pasos:
- Instalación: Instala Airflow usando pip en un entorno virtual de Python. Ejecuta:
pip install apache-airflow. Luego, inicializa la base de datos conairflow db init. - Configuración básica: Inicia el servidor web con
airflow webserver --port 8080y el scheduler en otra terminal conairflow scheduler. Accede alocalhost:8080en tu navegador. - Creación del DAG: En la carpeta
~/airflow/dags, crea un archivo Python llamadomi_primer_dag.py. Define un DAG con IDmi_dag_etl, programado para ejecutarse diariamente. - Implementación de tareas: Añade tres tareas usando PythonOperator:
extraer: Una función que devuelve una lista de números aleatorios (simula extracción de datos).transformar: Una función que calcula el cuadrado de cada número en la lista.cargar: Una función que imprime los resultados transformados en la consola (simula carga).
- Definición de dependencias: Establece que
extraerdebe ejecutarse antes quetransformar, ytransformarantes quecargar. - Prueba y monitoreo: Activa el DAG manualmente desde la UI de Airflow y verifica que las tareas se ejecuten en orden y finalicen con éxito.
- Usa
from datetime import datetimepara definirstart_dateen el DAG y evita fechas futuras. - Si el DAG no aparece en la UI, revisa que el archivo esté en la carpeta correcta y que el scheduler esté corriendo.
- Para simular datos en la tarea de extracción, puedes usar
import randomy generar una lista conrandom.randint(1, 10).
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.