Concepto clave: DAGs, Operadores y Tareas en Airflow
En Apache Airflow, un DAG (Grafo Acíclico Dirigido) es la estructura fundamental que define un flujo de trabajo. Imagina un DAG como un plano de construcción para tu pipeline de datos: especifica qué tareas deben ejecutarse, en qué orden y bajo qué condiciones. A diferencia de un script lineal, un DAG permite dependencias complejas, ejecución paralela y manejo de errores de manera elegante.
Los operadores son los componentes que realizan el trabajo real dentro de un DAG. Cada operador representa una unidad de trabajo, como ejecutar un script Python, consultar una base de datos o enviar un correo electrónico. Airflow incluye operadores predefinidos (como PythonOperator, BashOperator, PostgresOperator) y permite crear operadores personalizados. Una tarea es una instancia específica de un operador dentro de un DAG, con parámetros concretos como rutas de archivos o credenciales.
La analogía del mundo real: piensa en un DAG como la receta de un pastel (el plan), los operadores como los utensilios de cocina (batidora, horno, espátula), y las tareas como usar esos utensilios con ingredientes específicos (batir huevos con azúcar, hornear a 180°C). La receta define el orden (no puedes decorar antes de hornear), pero diferentes partes pueden hacerse en paralelo (preparar el glaseado mientras se hornea).
Cómo funciona en la práctica: Construyendo tu primer DAG
Vamos a crear un DAG simple que extrae datos de una API, los transforma y los carga en una base de datos. Este ejemplo usa operadores comunes y muestra cómo definir dependencias.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
def extract_data():
# Simula extracción de datos de una API
print("Extrayendo datos de la API...")
return [1, 2, 3, 4, 5]
def transform_data(data):
# Transforma los datos (ejemplo: duplicar valores)
print(f"Transformando datos: {data}")
return [x * 2 for x in data]
def load_data(transformed_data):
# Simula carga a una base de datos
print(f"Cargando datos: {transformed_data}")
# Definir el DAG
default_args = {
'owner': 'data_engineer',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'mi_primer_etl',
default_args=default_args,
description='Un DAG simple de ETL',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1)
)
# Definir tareas
extract_task = PythonOperator(
task_id='extraer_datos',
python_callable=extract_data,
dag=dag
)
transform_task = PythonOperator(
task_id='transformar_datos',
python_callable=transform_data,
op_args=[extract_task.output],
dag=dag
)
load_task = BashOperator(
task_id='cargar_datos',
bash_command='echo "Datos cargados exitosamente"',
dag=dag
)
# Establecer dependencias
extract_task >> transform_task >> load_taskEste DAG se ejecuta diariamente, comenzando el 1 de enero de 2023. La tarea extraer_datos (PythonOperator) extrae datos, transformar_datos (PythonOperator) los procesa usando la salida de la primera tarea, y cargar_datos (BashOperator) simula la carga. Las dependencias se definen con >>, asegurando que transformar no ocurra antes de extraer, y cargar no antes de transformar.
Caso de estudio: Pipeline de análisis de ventas diarias
Una empresa minorista necesita un pipeline que procese ventas diarias desde múltiples fuentes. El DAG debe:
- Extraer datos de una API REST y un archivo CSV en paralelo
- Unir y limpiar los datos
- Calcular métricas como ventas totales y promedio por región
- Guardar resultados en un data warehouse y enviar un reporte por email
Implementación con datos de ejemplo:
| Tarea | Operador | Descripción | Dependencias |
|---|---|---|---|
| extraer_api | PythonOperator | Extrae ventas de API (ej: [{"id": 1, "monto": 100}, {"id": 2, "monto": 200}]) | Ninguna |
| extraer_csv | PythonOperator | Lee archivo sales.csv con columnas: fecha, producto, cantidad | Ninguna |
| unir_datos | PythonOperator | Combina datos de ambas fuentes, elimina duplicados | extraer_api, extraer_csv |
| calcular_metricas | PythonOperator | Calcula total_ventas = suma(montos), promedio = total_ventas / count | unir_datos |
| guardar_resultados | PostgresOperator | Inserta métricas en tabla sales_metrics | calcular_metricas |
| enviar_reporte | EmailOperator | Envía email con resumen: "Ventas totales: $X, Promedio: $Y" | guardar_resultados |
En este caso, extraer_api y extraer_csv pueden ejecutarse en paralelo porque no dependen entre sí, mejorando el tiempo total de ejecución. La tarea unir_datos espera a que ambas terminen antes de comenzar.
Errores comunes y cómo evitarlos
Al trabajar con DAGs, operadores y tareas, estos errores son frecuentes:
- Dependencias circulares: Definir tareas que dependen entre sí en un ciclo (A → B → A), causando bloqueos. Solución: Revisar el grafo con
airflow dags listy herramientas de visualización, asegurando que sea acíclico. - Uso incorrecto de start_date: Configurar start_date en el futuro o sin considerar schedule_interval, lo que evita ejecuciones. Ejemplo: start_date=datetime(2024,1,1) con schedule_interval='@daily' no ejecutará hasta esa fecha. Solución: Usar fechas pasadas para pruebas y entender que Airflow ejecuta DAGs después de start_date + schedule_interval.
- Operadores bloqueantes: Usar operadores que no liberan recursos (ej: PythonOperator con bucles infinitos), paralizando el scheduler. Solución: Implementar timeouts, usar operadores asíncronos cuando sea posible, y monitorear logs.
- Manejo pobre de errores: No configurar retries o alertas, perdiendo datos en fallos. Solución: Definir retries y retry_delay en default_args, y agregar tareas de fallback (ej: enviar alerta si carga falla después de 3 intentos).
- Overhead en DAGs simples: Crear DAGs con muchas tareas para procesos triviales, añadiendo complejidad innecesaria. Solución: Evaluar si un script simple es suficiente; usar Airflow para orquestación, no para lógica mínima.
Checklist de dominio
Después de esta lección, debes poder:
- Definir un DAG con schedule_interval, start_date y default_args apropiados
- Crear tareas usando al menos tres tipos de operadores (ej: PythonOperator, BashOperator, EmailOperator)
- Establecer dependencias entre tareas usando
>>,<<oset_upstream/set_downstream - Explicar la diferencia entre un operador y una tarea con un ejemplo concreto
- Identificar y corregir un error común en un DAG (ej: dependencia circular)
- Diseñar un DAG para un caso de ETL con extracción paralela y procesamiento secuencial
- Configurar reintentos y manejo de errores básico en las tareas
Crear un DAG para procesamiento de logs diarios
En este ejercicio, crearás un DAG que procese logs de aplicaciones diarios. Sigue estos pasos:
- Configura el entorno: Asegúrate de tener Airflow instalado y funcionando. Crea un archivo Python en la carpeta
dags/de tu instalación. - Define el DAG: Crea un DAG llamado
procesar_logs_diarioscon:- schedule_interval:
@daily - start_date: 1 de enero de 2024
- default_args: owner='tu_nombre', retries=2, retry_delay=timedelta(minutes=2)
- schedule_interval:
- Implementa las tareas:
- Tarea 1:
descargar_logs(BashOperator) que simula descargar logs conecho 'Descargando logs...' - Tarea 2:
filtrar_errores(PythonOperator) que define una función para filtrar líneas con la palabra 'ERROR' de una lista de logs de ejemplo (ej: ["INFO: inicio", "ERROR: fallo", "INFO: fin"]) - Tarea 3:
guardar_resultados(PythonOperator) que guarda los errores filtrados en un archivo de texto simulado (puede imprimirlos) - Tarea 4:
notificar_equipo(BashOperator) que simula una notificación conecho 'Notificación enviada'
- Tarea 1:
- Establece dependencias:
descargar_logsdebe ejecutarse primero, luegofiltrar_erroresyguardar_resultadosen secuencia, y finalmentenotificar_equipodespués deguardar_resultados. - Prueba el DAG: Usa
airflow dags listpara verificar que aparece, y ejecuta una prueba manual conairflow dags test procesar_logs_diarios 2024-01-01.
Entrega el código completo del DAG y una captura de pantalla de la ejecución exitosa o los logs.
Pistas- Recuerda importar los módulos necesarios: DAG, BashOperator, PythonOperator, datetime, timedelta.
- Para PythonOperator, define funciones fuera del DAG y úsalas en python_callable. Usa op_args para pasar datos entre tareas si es necesario.
- Verifica que no haya errores de sintaxis; usa
python -m py_compile tu_archivo.pypara comprobar.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.