Definir Dependencias entre Tareas con set_upstream y set_downstream

Lectura
20 min~5 min lectura

Concepto clave

En Apache Airflow, las dependencias entre tareas definen el orden de ejecución de tu pipeline. Imagina que estás construyendo una casa: primero debes poner los cimientos, luego las paredes, y finalmente el techo. No puedes poner el techo antes que las paredes. De la misma manera, en un pipeline ETL, no puedes transformar datos antes de extraerlos. Las funciones set_upstream y set_downstream son los métodos principales para establecer estas relaciones de precedencia entre tareas en un DAG.

Cuando usas task_a.set_downstream(task_b), estás diciendo que task_a debe ejecutarse antes que task_b. Equivalentemente, task_b.set_upstream(task_a) logra exactamente lo mismo. Esta dualidad permite flexibilidad en cómo estructuras tu código. Piensa en estas funciones como conectores que unen las piezas de tu pipeline en el orden correcto.

Cómo funciona en la práctica

Veamos un ejemplo paso a paso de un pipeline simple que procesa datos de ventas. Supongamos que tenemos tres tareas: extraer_datos, transformar_datos, y cargar_datos. Queremos que se ejecuten en ese orden exacto.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def extraer():
    print("Extrayendo datos de la base de datos...")

def transformar():
    print("Transformando datos...")

def cargar():
    print("Cargando datos al data warehouse...")

with DAG('pipeline_ventas', start_date=datetime(2023, 1, 1)) as dag:
    extraer_task = PythonOperator(task_id='extraer', python_callable=extraer)
    transformar_task = PythonOperator(task_id='transformar', python_callable=transformar)
    cargar_task = PythonOperator(task_id='cargar', python_callable=cargar)
    
    # Estableciendo dependencias
    extraer_task.set_downstream(transformar_task)
    transformar_task.set_downstream(cargar_task)

En este código, extraer_task.set_downstream(transformar_task) asegura que la extracción ocurra antes de la transformación. Luego, transformar_task.set_downstream(cargar_task) garantiza que la transformación preceda a la carga. Si visualizas este DAG en la interfaz web de Airflow, verás flechas que van de extraer a transformar y de transformar a cargar, mostrando claramente el flujo de datos.

Caso de estudio

Considera un pipeline ETL para un sistema de recomendación de películas. El proceso involucra múltiples fuentes de datos y transformaciones complejas. Aquí hay una tabla que describe las tareas y sus dependencias:

Tarea IDDescripciónDependencias
extraer_usuariosExtraer datos de usuarios desde MySQLNinguna
extraer_peliculasExtraer catálogo de películas desde APINinguna
limpiar_usuariosLimpiar y normalizar datos de usuariosextraer_usuarios
limpiar_peliculasLimpiar datos de películasextraer_peliculas
calcular_similitudCalcular similitud entre usuarios y películaslimpiar_usuarios, limpiar_peliculas
generar_recomendacionesGenerar recomendaciones personalizadascalcular_similitud
enviar_notificacionesEnviar recomendaciones a usuariosgenerar_recomendaciones

Para implementar esto en Airflow, podrías usar un enfoque como:

# Después de definir todas las tareas...
extraer_usuarios.set_downstream(limpiar_usuarios)
extraer_peliculas.set_downstream(limpiar_peliculas)
limpiar_usuarios.set_downstream(calcular_similitud)
limpiar_peliculas.set_downstream(calcular_similitud)
calcular_similitud.set_downstream(generar_recomendaciones)
generar_recomendaciones.set_downstream(enviar_notificaciones)

Nota cómo calcular_similitud depende de dos tareas anteriores (limpiar_usuarios y limpiar_peliculas). Esto crea una dependencia en paralelo, donde ambas tareas de limpieza deben completarse antes de que calcular_similitud pueda ejecutarse.

Errores comunes

  • Ciclos en las dependencias: Definir task_a.set_downstream(task_b) y luego task_b.set_downstream(task_a) crea un ciclo infinito. Airflow detectará esto y fallará al validar el DAG. Siempre verifica que tus dependencias formen un grafo acíclico dirigido (DAG).
  • Olvidar establecer todas las dependencias: Si no defines una dependencia necesaria, las tareas podrían ejecutarse en paralelo cuando deberían ser secuenciales, llevando a errores de datos. Usa la vista de grafo en la interfaz web para confirmar que el flujo es correcto.
  • Confundir upstream con downstream: Recuerda que set_downstream significa "esta tarea va antes que la otra", mientras que set_upstream significa "esta tarea va después de la otra". Una analogía útil: piensa en un río; upstream es aguas arriba (origen), downstream es aguas abajo (destino).
  • No considerar el tiempo de ejecución: En pipelines complejos, algunas tareas pueden fallar o tardar más de lo esperado. Asegúrate de que las dependencias reflejen no solo el orden lógico, sino también los tiempos reales de procesamiento para evitar cuellos de botella.

Checklist de dominio

  1. Puedo explicar la diferencia entre set_upstream y set_downstream sin mirar la documentación.
  2. He creado un DAG con al menos 5 tareas y dependencias múltiples usando estos métodos.
  3. Sé cómo visualizar y verificar dependencias en la interfaz web de Airflow.
  4. Puedo identificar y corregir ciclos en las dependencias de un DAG.
  5. Entiendo cómo las dependencias afectan la ejecución paralela y secuencial de tareas.
  6. He implementado un caso donde una tarea depende de múltiples tareas anteriores.
  7. Puedo traducir un diagrama de flujo de pipeline a código Airflow con dependencias correctas.

Construye un Pipeline de Procesamiento de Logs con Dependencias Complejas

En este ejercicio, crearás un DAG en Airflow para procesar logs de servidor. Sigue estos pasos:

  1. Crea un nuevo archivo Python para tu DAG llamado procesamiento_logs.py.
  2. Define las siguientes tareas usando PythonOperator:
    • descargar_logs: Simula la descarga de logs desde un servidor remoto (usa una función que imprima "Descargando logs...").
    • parsear_logs: Simula el parseo de logs para extraer información estructurada.
    • filtrar_errores: Simula filtrar solo los logs de error.
    • generar_reporte: Simula generar un reporte de errores.
    • enviar_alerta: Simula enviar una alerta si hay errores críticos.
  3. Establece las dependencias de modo que:
    • parsear_logs dependa de descargar_logs.
    • filtrar_errores dependa de parsear_logs.
    • generar_reporte dependa de filtrar_errores.
    • enviar_alerta dependa de filtrar_errores (debe ejecutarse en paralelo con generar_reporte).
  4. Usa tanto set_upstream como set_downstream en tu código para practicar ambos métodos.
  5. Prueba tu DAG ejecutándolo en un entorno local de Airflow o usando el comando airflow tasks test.
Pistas
  • Recuerda que para tareas en paralelo, ambas deben depender de la misma tarea anterior pero no entre sí.
  • Puedes usar set_upstream para una tarea y set_downstream para otra para ver cómo se relacionan.
  • Verifica que no haya ciclos: cada tarea debe tener un camino claro desde el inicio hasta el fin.

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.