Branching y SubDAGs para Flujos Condicionales

Lectura
25 min~4 min lectura

Concepto clave

En Apache Airflow, los flujos de trabajo no siempre son lineales. Muchas veces necesitamos tomar decisiones basadas en datos o condiciones externas. Aquí es donde entran en juego dos herramientas poderosas: Branching y SubDAGs.

El Branching (ramificación) permite que tu DAG tome diferentes caminos segun el resultado de una tarea. Imagina que eres un chef en un restaurante: cuando recibes un pedido, primero verificas si tienes todos los ingredientes. Si los tienes, procedes a cocinar; si no, debes ir al mercado. Esta decision condicional es exactamente lo que el branching permite en Airflow.

Los SubDAGs son DAGs dentro de DAGs. Piensa en ellos como subrutinas en programacion: encapsulan logica compleja que puedes reutilizar en multiples flujos. Por ejemplo, si tienes un proceso de limpieza de datos que se repite en varios pipelines, puedes convertirlo en un SubDAG y llamarlo donde sea necesario, manteniendo tu codigo organizado y DRY (Don't Repeat Yourself).

Cómo funciona en la práctica

Veamos un ejemplo paso a paso de branching. Supongamos que procesamos datos de ventas diarias:

  1. Creamos una tarea check_data_quality que valida si los datos estan completos.
  2. Usamos el operador BranchPythonOperator para decidir que camino seguir.
  3. Si la calidad es buena (quality_score > 0.8), ejecutamos process_data.
  4. Si es mala, ejecutamos notify_team y fix_data.

El codigo clave se veria asi:

def decide_path(**kwargs):
    ti = kwargs['ti']
    quality = ti.xcom_pull(task_ids='check_data_quality')
    if quality > 0.8:
        return 'process_data'
    else:
        return 'notify_team'

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=decide_path,
    dag=dag
)

Para SubDAGs, primero defines un DAG separado (por ejemplo, clean_data_subdag) con sus propias tareas, luego lo incluyes en tu DAG principal usando SubDagOperator. Esto crea una jerarquia clara y facil de mantener.

Caso de estudio

Una empresa de e-commerce necesita procesar pedidos cada hora. Su pipeline incluye:

PasoDescripcionTipo
1. Extraer pedidosObtener nuevos pedidos de la APIPythonOperator
2. Validar stockVerificar inventario disponibleBranchPythonOperator
3. Procesar normalSi hay stock, continuar flujo normalSubDAG
4. Manejar agotadoSi no hay stock, notificar y reintentarSubDAG
5. ConsolidarUnir resultados de ambas ramasPythonOperator

El SubDAG procesar_normal contiene: calcular impuestos, aplicar descuentos, y generar factura. El SubDAG manejar_agotado envia alertas al equipo de compras y programa reintentos. Usando esta estructura, el equipo puede actualizar la logica de cada SubDAG independientemente, sin afectar el flujo principal.

En produccion, este enfoque redujo los errores en un 40% y permitio reutilizar codigo en 3 pipelines diferentes.

Errores comunes

  • Branching sin XComs: Olvidar pasar datos entre tareas usando XComs, haciendo que las decisiones se basen en informacion incorrecta. Solucion: Siempre usar xcom_push y xcom_pull para compartir estado.
  • SubDAGs como cajas negras: Crear SubDAGs demasiado complejos sin documentacion, dificultando el debug. Solucion: Mantener SubDAGs simples y bien documentados, con logs claros.
  • Dependencias circulares: En branching, crear ciclos donde una tarea depende de si misma indirectamente. Solucion: Mapear el flujo en papel antes de codificar y usar dag.test() para validar.
  • Ignorar estados de tareas: No manejar correctamente los estados skipped o failed en ramas condicionales. Solucion: Configurar trigger_rules apropiadas como none_failed o dummy.
  • SubDAGs con recursos compartidos: Usar el mismo pool o queue para SubDAGs que para tareas principales, causando cuellos de botella. Solucion: Asignar recursos especificos a cada SubDAG.

Checklist de dominio

  1. Puedo explicar la diferencia entre BranchPythonOperator y un if normal en Python.
  2. He implementado al menos un DAG con branching que toma decisiones basadas en datos reales.
  3. Se crear y probar un SubDAG independiente antes de integrarlo.
  4. Uso XComs efectivamente para pasar datos entre tareas en flujos condicionales.
  5. Manejo correctamente los estados de tareas (skipped, failed) en mis ramas.
  6. Documento mis SubDAGs con ejemplos de uso y parametros esperados.
  7. Puedo debuggear un DAG con branching usando la UI de Airflow y logs.

Implementa un pipeline de limpieza de datos con branching y SubDAG

En este ejercicio, crearas un DAG que procesa datos de sensores IoT. Sigue estos pasos:

  1. Crea un DAG llamado iot_data_pipeline que se ejecute diariamente.
  2. Agrega una tarea inicial extract_sensor_data que simule extraer datos (puedes generar datos aleatorios).
  3. Implementa una tarea validate_data que use BranchPythonOperator para decidir:
    • Si mas del 90% de los datos son validos, ir a la rama clean_with_subdag.
    • Si no, ir a la rama manual_review.
  4. Crea un SubDAG llamado clean_data_subdag que realice tres operaciones de limpieza: eliminar duplicados, normalizar formatos, y calcular promedios.
  5. En la rama manual_review, agrega una tarea que simule enviar una alerta por email.
  6. Une ambas ramas en una tarea final load_to_warehouse que consolide los resultados.
  7. Prueba tu DAG con diferentes porcentajes de datos validos para verificar que el branching funciona.

Entrega: Codigo Python del DAG y captura de pantalla de la ejecucion en la UI de Airflow.

Pistas
  • Usa PythonOperator para simular tareas si no tienes acceso real a datos de sensores.
  • Recuerda que BranchPythonOperator debe retornar el task_id de la siguiente tarea a ejecutar.
  • Para el SubDAG, define primero las tareas de limpieza en una funcion separada.

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.