Entender DAGs, Operadores y Tareas en Airflow

Lectura
20 min~6 min lectura

Concepto clave: DAGs, Operadores y Tareas en Airflow

En Apache Airflow, un DAG (Grafo Acíclico Dirigido) es la estructura fundamental que define un flujo de trabajo. Imagina un DAG como un plano de construcción para tu pipeline de datos: especifica qué tareas deben ejecutarse, en qué orden y bajo qué condiciones. A diferencia de un script lineal, un DAG permite dependencias complejas, ejecución paralela y manejo de errores de manera elegante.

Los operadores son los componentes que realizan el trabajo real dentro de un DAG. Cada operador representa una unidad de trabajo, como ejecutar un script Python, consultar una base de datos o enviar un correo electrónico. Airflow incluye operadores predefinidos (como PythonOperator, BashOperator, PostgresOperator) y permite crear operadores personalizados. Una tarea es una instancia específica de un operador dentro de un DAG, con parámetros concretos como rutas de archivos o credenciales.

La analogía del mundo real: piensa en un DAG como la receta de un pastel (el plan), los operadores como los utensilios de cocina (batidora, horno, espátula), y las tareas como usar esos utensilios con ingredientes específicos (batir huevos con azúcar, hornear a 180°C). La receta define el orden (no puedes decorar antes de hornear), pero diferentes partes pueden hacerse en paralelo (preparar el glaseado mientras se hornea).

Cómo funciona en la práctica: Construyendo tu primer DAG

Vamos a crear un DAG simple que extrae datos de una API, los transforma y los carga en una base de datos. Este ejemplo usa operadores comunes y muestra cómo definir dependencias.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

def extract_data():
    # Simula extracción de datos de una API
    print("Extrayendo datos de la API...")
    return [1, 2, 3, 4, 5]

def transform_data(data):
    # Transforma los datos (ejemplo: duplicar valores)
    print(f"Transformando datos: {data}")
    return [x * 2 for x in data]

def load_data(transformed_data):
    # Simula carga a una base de datos
    print(f"Cargando datos: {transformed_data}")

# Definir el DAG
default_args = {
    'owner': 'data_engineer',
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'mi_primer_etl',
    default_args=default_args,
    description='Un DAG simple de ETL',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1)
)

# Definir tareas
extract_task = PythonOperator(
    task_id='extraer_datos',
    python_callable=extract_data,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transformar_datos',
    python_callable=transform_data,
    op_args=[extract_task.output],
    dag=dag
)

load_task = BashOperator(
    task_id='cargar_datos',
    bash_command='echo "Datos cargados exitosamente"',
    dag=dag
)

# Establecer dependencias
extract_task >> transform_task >> load_task

Este DAG se ejecuta diariamente, comenzando el 1 de enero de 2023. La tarea extraer_datos (PythonOperator) extrae datos, transformar_datos (PythonOperator) los procesa usando la salida de la primera tarea, y cargar_datos (BashOperator) simula la carga. Las dependencias se definen con >>, asegurando que transformar no ocurra antes de extraer, y cargar no antes de transformar.

Caso de estudio: Pipeline de análisis de ventas diarias

Una empresa minorista necesita un pipeline que procese ventas diarias desde múltiples fuentes. El DAG debe:

  1. Extraer datos de una API REST y un archivo CSV en paralelo
  2. Unir y limpiar los datos
  3. Calcular métricas como ventas totales y promedio por región
  4. Guardar resultados en un data warehouse y enviar un reporte por email

Implementación con datos de ejemplo:

TareaOperadorDescripciónDependencias
extraer_apiPythonOperatorExtrae ventas de API (ej: [{"id": 1, "monto": 100}, {"id": 2, "monto": 200}])Ninguna
extraer_csvPythonOperatorLee archivo sales.csv con columnas: fecha, producto, cantidadNinguna
unir_datosPythonOperatorCombina datos de ambas fuentes, elimina duplicadosextraer_api, extraer_csv
calcular_metricasPythonOperatorCalcula total_ventas = suma(montos), promedio = total_ventas / countunir_datos
guardar_resultadosPostgresOperatorInserta métricas en tabla sales_metricscalcular_metricas
enviar_reporteEmailOperatorEnvía email con resumen: "Ventas totales: $X, Promedio: $Y"guardar_resultados
En este caso, extraer_api y extraer_csv pueden ejecutarse en paralelo porque no dependen entre sí, mejorando el tiempo total de ejecución. La tarea unir_datos espera a que ambas terminen antes de comenzar.

Errores comunes y cómo evitarlos

Al trabajar con DAGs, operadores y tareas, estos errores son frecuentes:

  • Dependencias circulares: Definir tareas que dependen entre sí en un ciclo (A → B → A), causando bloqueos. Solución: Revisar el grafo con airflow dags list y herramientas de visualización, asegurando que sea acíclico.
  • Uso incorrecto de start_date: Configurar start_date en el futuro o sin considerar schedule_interval, lo que evita ejecuciones. Ejemplo: start_date=datetime(2024,1,1) con schedule_interval='@daily' no ejecutará hasta esa fecha. Solución: Usar fechas pasadas para pruebas y entender que Airflow ejecuta DAGs después de start_date + schedule_interval.
  • Operadores bloqueantes: Usar operadores que no liberan recursos (ej: PythonOperator con bucles infinitos), paralizando el scheduler. Solución: Implementar timeouts, usar operadores asíncronos cuando sea posible, y monitorear logs.
  • Manejo pobre de errores: No configurar retries o alertas, perdiendo datos en fallos. Solución: Definir retries y retry_delay en default_args, y agregar tareas de fallback (ej: enviar alerta si carga falla después de 3 intentos).
  • Overhead en DAGs simples: Crear DAGs con muchas tareas para procesos triviales, añadiendo complejidad innecesaria. Solución: Evaluar si un script simple es suficiente; usar Airflow para orquestación, no para lógica mínima.

Checklist de dominio

Después de esta lección, debes poder:

  1. Definir un DAG con schedule_interval, start_date y default_args apropiados
  2. Crear tareas usando al menos tres tipos de operadores (ej: PythonOperator, BashOperator, EmailOperator)
  3. Establecer dependencias entre tareas usando >>, << o set_upstream/set_downstream
  4. Explicar la diferencia entre un operador y una tarea con un ejemplo concreto
  5. Identificar y corregir un error común en un DAG (ej: dependencia circular)
  6. Diseñar un DAG para un caso de ETL con extracción paralela y procesamiento secuencial
  7. Configurar reintentos y manejo de errores básico en las tareas

Crear un DAG para procesamiento de logs diarios

En este ejercicio, crearás un DAG que procese logs de aplicaciones diarios. Sigue estos pasos:

  1. Configura el entorno: Asegúrate de tener Airflow instalado y funcionando. Crea un archivo Python en la carpeta dags/ de tu instalación.
  2. Define el DAG: Crea un DAG llamado procesar_logs_diarios con:
    • schedule_interval: @daily
    • start_date: 1 de enero de 2024
    • default_args: owner='tu_nombre', retries=2, retry_delay=timedelta(minutes=2)
  3. Implementa las tareas:
    • Tarea 1: descargar_logs (BashOperator) que simula descargar logs con echo 'Descargando logs...'
    • Tarea 2: filtrar_errores (PythonOperator) que define una función para filtrar líneas con la palabra 'ERROR' de una lista de logs de ejemplo (ej: ["INFO: inicio", "ERROR: fallo", "INFO: fin"])
    • Tarea 3: guardar_resultados (PythonOperator) que guarda los errores filtrados en un archivo de texto simulado (puede imprimirlos)
    • Tarea 4: notificar_equipo (BashOperator) que simula una notificación con echo 'Notificación enviada'
  4. Establece dependencias: descargar_logs debe ejecutarse primero, luego filtrar_errores y guardar_resultados en secuencia, y finalmente notificar_equipo después de guardar_resultados.
  5. Prueba el DAG: Usa airflow dags list para verificar que aparece, y ejecuta una prueba manual con airflow dags test procesar_logs_diarios 2024-01-01.

Entrega el código completo del DAG y una captura de pantalla de la ejecución exitosa o los logs.

Pistas
  • Recuerda importar los módulos necesarios: DAG, BashOperator, PythonOperator, datetime, timedelta.
  • Para PythonOperator, define funciones fuera del DAG y úsalas en python_callable. Usa op_args para pasar datos entre tareas si es necesario.
  • Verifica que no haya errores de sintaxis; usa python -m py_compile tu_archivo.py para comprobar.

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.