Concepto clave
En Apache Airflow, los flujos de trabajo no siempre son lineales. Muchas veces necesitamos tomar decisiones basadas en datos o condiciones externas. Aquí es donde entran en juego dos herramientas poderosas: Branching y SubDAGs.
El Branching (ramificación) permite que tu DAG tome diferentes caminos segun el resultado de una tarea. Imagina que eres un chef en un restaurante: cuando recibes un pedido, primero verificas si tienes todos los ingredientes. Si los tienes, procedes a cocinar; si no, debes ir al mercado. Esta decision condicional es exactamente lo que el branching permite en Airflow.
Los SubDAGs son DAGs dentro de DAGs. Piensa en ellos como subrutinas en programacion: encapsulan logica compleja que puedes reutilizar en multiples flujos. Por ejemplo, si tienes un proceso de limpieza de datos que se repite en varios pipelines, puedes convertirlo en un SubDAG y llamarlo donde sea necesario, manteniendo tu codigo organizado y DRY (Don't Repeat Yourself).
Cómo funciona en la práctica
Veamos un ejemplo paso a paso de branching. Supongamos que procesamos datos de ventas diarias:
- Creamos una tarea
check_data_qualityque valida si los datos estan completos. - Usamos el operador
BranchPythonOperatorpara decidir que camino seguir. - Si la calidad es buena (
quality_score > 0.8), ejecutamosprocess_data. - Si es mala, ejecutamos
notify_teamyfix_data.
El codigo clave se veria asi:
def decide_path(**kwargs):
ti = kwargs['ti']
quality = ti.xcom_pull(task_ids='check_data_quality')
if quality > 0.8:
return 'process_data'
else:
return 'notify_team'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=decide_path,
dag=dag
)Para SubDAGs, primero defines un DAG separado (por ejemplo, clean_data_subdag) con sus propias tareas, luego lo incluyes en tu DAG principal usando SubDagOperator. Esto crea una jerarquia clara y facil de mantener.
Caso de estudio
Una empresa de e-commerce necesita procesar pedidos cada hora. Su pipeline incluye:
| Paso | Descripcion | Tipo |
|---|---|---|
| 1. Extraer pedidos | Obtener nuevos pedidos de la API | PythonOperator |
| 2. Validar stock | Verificar inventario disponible | BranchPythonOperator |
| 3. Procesar normal | Si hay stock, continuar flujo normal | SubDAG |
| 4. Manejar agotado | Si no hay stock, notificar y reintentar | SubDAG |
| 5. Consolidar | Unir resultados de ambas ramas | PythonOperator |
El SubDAG procesar_normal contiene: calcular impuestos, aplicar descuentos, y generar factura. El SubDAG manejar_agotado envia alertas al equipo de compras y programa reintentos. Usando esta estructura, el equipo puede actualizar la logica de cada SubDAG independientemente, sin afectar el flujo principal.
En produccion, este enfoque redujo los errores en un 40% y permitio reutilizar codigo en 3 pipelines diferentes.
Errores comunes
- Branching sin XComs: Olvidar pasar datos entre tareas usando XComs, haciendo que las decisiones se basen en informacion incorrecta. Solucion: Siempre usar
xcom_pushyxcom_pullpara compartir estado. - SubDAGs como cajas negras: Crear SubDAGs demasiado complejos sin documentacion, dificultando el debug. Solucion: Mantener SubDAGs simples y bien documentados, con logs claros.
- Dependencias circulares: En branching, crear ciclos donde una tarea depende de si misma indirectamente. Solucion: Mapear el flujo en papel antes de codificar y usar
dag.test()para validar. - Ignorar estados de tareas: No manejar correctamente los estados
skippedofaileden ramas condicionales. Solucion: Configurartrigger_rulesapropiadas comonone_failedodummy. - SubDAGs con recursos compartidos: Usar el mismo pool o queue para SubDAGs que para tareas principales, causando cuellos de botella. Solucion: Asignar recursos especificos a cada SubDAG.
Checklist de dominio
- Puedo explicar la diferencia entre BranchPythonOperator y un if normal en Python.
- He implementado al menos un DAG con branching que toma decisiones basadas en datos reales.
- Se crear y probar un SubDAG independiente antes de integrarlo.
- Uso XComs efectivamente para pasar datos entre tareas en flujos condicionales.
- Manejo correctamente los estados de tareas (skipped, failed) en mis ramas.
- Documento mis SubDAGs con ejemplos de uso y parametros esperados.
- Puedo debuggear un DAG con branching usando la UI de Airflow y logs.
Implementa un pipeline de limpieza de datos con branching y SubDAG
En este ejercicio, crearas un DAG que procesa datos de sensores IoT. Sigue estos pasos:
- Crea un DAG llamado
iot_data_pipelineque se ejecute diariamente. - Agrega una tarea inicial
extract_sensor_dataque simule extraer datos (puedes generar datos aleatorios). - Implementa una tarea
validate_dataque use BranchPythonOperator para decidir:- Si mas del 90% de los datos son validos, ir a la rama
clean_with_subdag. - Si no, ir a la rama
manual_review.
- Si mas del 90% de los datos son validos, ir a la rama
- Crea un SubDAG llamado
clean_data_subdagque realice tres operaciones de limpieza: eliminar duplicados, normalizar formatos, y calcular promedios. - En la rama
manual_review, agrega una tarea que simule enviar una alerta por email. - Une ambas ramas en una tarea final
load_to_warehouseque consolide los resultados. - Prueba tu DAG con diferentes porcentajes de datos validos para verificar que el branching funciona.
Entrega: Codigo Python del DAG y captura de pantalla de la ejecucion en la UI de Airflow.
Pistas- Usa PythonOperator para simular tareas si no tienes acceso real a datos de sensores.
- Recuerda que BranchPythonOperator debe retornar el task_id de la siguiente tarea a ejecutar.
- Para el SubDAG, define primero las tareas de limpieza en una funcion separada.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.