Concepto clave
En Apache Airflow, las dependencias definen el orden de ejecución de las tareas dentro de un DAG. Piensa en ellas como las reglas de un juego de dominó: cada ficha (tarea) solo cae cuando la anterior la empuja. El control de flujo es cómo Airflow gestiona estas dependencias, decidiendo qué tareas ejecutar, cuándo y bajo qué condiciones.
En el mundo real, imagina una cadena de montaje en una fábrica de coches. No puedes pintar el chasis antes de soldarlo, ni instalar el motor antes de tener el chasis pintado. Cada paso depende del anterior. Airflow te permite modelar estas dependencias de forma explícita, asegurando que tu pipeline ETL siga un flujo lógico y predecible.
Cómo funciona en la práctica
Veamos un ejemplo paso a paso para un pipeline que procesa datos de ventas:
- Define las tareas como operadores de Python (por ejemplo,
extract_sales_data,transform_customer_info,load_to_data_warehouse). - Establece dependencias usando los operadores
>>(set_upstream) y<<(set_downstream). Por ejemplo:extract_sales_data >> transform_customer_info >> load_to_data_warehouse. - Configura triggers como
all_success(por defecto) oall_failedpara manejar casos de error. - Usa
BranchPythonOperatorpara crear ramificaciones condicionales, como decidir si procesar datos de alta prioridad primero.
Un fragmento de código básico:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def extract():
print("Extrayendo datos...")
def transform():
print("Transformando datos...")
def load():
print("Cargando datos...")
with DAG('sales_pipeline', start_date=datetime(2023, 1, 1)) as dag:
t1 = PythonOperator(task_id='extract', python_callable=extract)
t2 = PythonOperator(task_id='transform', python_callable=transform)
t3 = PythonOperator(task_id='load', python_callable=load)
t1 >> t2 >> t3Caso de estudio
Una empresa de e-commerce necesita un pipeline diario que:
- Extrae datos de ventas de una API REST y de una base de datos MySQL.
- Transforma los datos, unificando formatos y calculando métricas como el valor promedio por pedido.
- Carga los resultados en un data warehouse de Redshift para análisis.
Las dependencias se modelan así:
| Tarea | Dependencias | Condición |
|---|---|---|
| extraer_api | Ninguna | Ejecutar siempre |
| extraer_mysql | Ninguna | Ejecutar siempre |
| transformar_datos | extraer_api, extraer_mysql | Solo si ambas tienen éxito |
| cargar_redshift | transformar_datos | Si tiene éxito |
En este caso, transformar_datos espera a que ambas tareas de extracción terminen, evitando procesar datos incompletos.
Errores comunes
- Ciclos de dependencia: Crear un bucle donde la tarea A depende de B y B depende de A. Airflow lanzará un error. Solución: Revisar el grafo del DAG con herramientas como
airflow dags show. - Dependencias implícitas: Confiar en el orden de definición en lugar de establecer dependencias explícitas. Esto puede llevar a ejecuciones aleatorias. Solución: Siempre usar
>>o métodos comoset_upstream. - Ignorar el estado de las tareas: No manejar casos donde una tarea falla, causando que el pipeline se detenga inesperadamente. Solución: Usar triggers como
all_doneo implementar retries y alertas. - Sobrecarga de ramificaciones: Crear demasiadas ramas condicionales que hacen el DAG difícil de mantener. Solución: Simplificar la lógica o dividir en múltiples DAGs.
- No probar dependencias en desarrollo: Asumir que las dependencias funcionan sin pruebas con datos de ejemplo. Solución: Usar el modo de depuración de Airflow y datasets pequeños.
Checklist de dominio
- Puedo definir dependencias entre tareas usando operadores como
>>y<<. - Sé cómo usar
BranchPythonOperatorpara crear flujos condicionales basados en datos. - Entiendo la diferencia entre triggers como
all_success,one_failed, yall_done. - Puedo detectar y resolver ciclos de dependencia en un DAG.
- Sé configurar retries y timeouts para manejar fallos en tareas dependientes.
- Puedo visualizar el grafo de dependencias en la interfaz web de Airflow.
- Entiendo cómo las dependencias afectan la paralelización y el rendimiento del pipeline.
Diseña un DAG con dependencias complejas para un pipeline de logistica
Crea un DAG en Airflow que modele un pipeline de logistica para una empresa de transporte. Sigue estos pasos:
- Define 5 tareas:
recibir_pedidos(extrae pedidos de una API),validar_inventario(verifica stock en base de datos),asignar_ruta(calcula la ruta optima),notificar_cliente(envia un email), yactualizar_sistema(actualiza el sistema de logistica). - Establece dependencias:
recibir_pedidosdebe ejecutarse primero.validar_inventarioyasignar_rutapueden ejecutarse en paralelo despues derecibir_pedidos, pero ambos deben terminar antes denotificar_cliente. actualizar_sistemadebe ejecutarse solo sinotificar_clientetiene exito, pero con un retry de 2 intentos si falla.- Usa operadores de Python simulados (puedes usar funciones que impriman mensajes) y asegurate de no crear ciclos.
- Prueba tu DAG en un entorno local o con el simulador de Airflow, verificando el orden de ejecucion en el grafo.
- Recuerda que para ejecucion en paralelo, las tareas no deben tener dependencias directas entre ellas, pero si una dependencia comun anterior.
- Usa
PythonOperatorcon funciones simples para simular las tareas, enfocate en la estructura de dependencias. - Verifica que
actualizar_sistematengaretries=2en sus parametros.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.