Concepto clave
En Apache Airflow, las dependencias entre tareas definen el orden de ejecución de tu pipeline. Imagina que estás construyendo una casa: primero debes poner los cimientos, luego las paredes, y finalmente el techo. No puedes poner el techo antes que las paredes. De la misma manera, en un pipeline ETL, no puedes transformar datos antes de extraerlos. Las funciones set_upstream y set_downstream son los métodos principales para establecer estas relaciones de precedencia entre tareas en un DAG.
Cuando usas task_a.set_downstream(task_b), estás diciendo que task_a debe ejecutarse antes que task_b. Equivalentemente, task_b.set_upstream(task_a) logra exactamente lo mismo. Esta dualidad permite flexibilidad en cómo estructuras tu código. Piensa en estas funciones como conectores que unen las piezas de tu pipeline en el orden correcto.
Cómo funciona en la práctica
Veamos un ejemplo paso a paso de un pipeline simple que procesa datos de ventas. Supongamos que tenemos tres tareas: extraer_datos, transformar_datos, y cargar_datos. Queremos que se ejecuten en ese orden exacto.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def extraer():
print("Extrayendo datos de la base de datos...")
def transformar():
print("Transformando datos...")
def cargar():
print("Cargando datos al data warehouse...")
with DAG('pipeline_ventas', start_date=datetime(2023, 1, 1)) as dag:
extraer_task = PythonOperator(task_id='extraer', python_callable=extraer)
transformar_task = PythonOperator(task_id='transformar', python_callable=transformar)
cargar_task = PythonOperator(task_id='cargar', python_callable=cargar)
# Estableciendo dependencias
extraer_task.set_downstream(transformar_task)
transformar_task.set_downstream(cargar_task)
En este código, extraer_task.set_downstream(transformar_task) asegura que la extracción ocurra antes de la transformación. Luego, transformar_task.set_downstream(cargar_task) garantiza que la transformación preceda a la carga. Si visualizas este DAG en la interfaz web de Airflow, verás flechas que van de extraer a transformar y de transformar a cargar, mostrando claramente el flujo de datos.
Caso de estudio
Considera un pipeline ETL para un sistema de recomendación de películas. El proceso involucra múltiples fuentes de datos y transformaciones complejas. Aquí hay una tabla que describe las tareas y sus dependencias:
| Tarea ID | Descripción | Dependencias |
|---|---|---|
| extraer_usuarios | Extraer datos de usuarios desde MySQL | Ninguna |
| extraer_peliculas | Extraer catálogo de películas desde API | Ninguna |
| limpiar_usuarios | Limpiar y normalizar datos de usuarios | extraer_usuarios |
| limpiar_peliculas | Limpiar datos de películas | extraer_peliculas |
| calcular_similitud | Calcular similitud entre usuarios y películas | limpiar_usuarios, limpiar_peliculas |
| generar_recomendaciones | Generar recomendaciones personalizadas | calcular_similitud |
| enviar_notificaciones | Enviar recomendaciones a usuarios | generar_recomendaciones |
Para implementar esto en Airflow, podrías usar un enfoque como:
# Después de definir todas las tareas...
extraer_usuarios.set_downstream(limpiar_usuarios)
extraer_peliculas.set_downstream(limpiar_peliculas)
limpiar_usuarios.set_downstream(calcular_similitud)
limpiar_peliculas.set_downstream(calcular_similitud)
calcular_similitud.set_downstream(generar_recomendaciones)
generar_recomendaciones.set_downstream(enviar_notificaciones)
Nota cómo calcular_similitud depende de dos tareas anteriores (limpiar_usuarios y limpiar_peliculas). Esto crea una dependencia en paralelo, donde ambas tareas de limpieza deben completarse antes de que calcular_similitud pueda ejecutarse.
Errores comunes
- Ciclos en las dependencias: Definir
task_a.set_downstream(task_b)y luegotask_b.set_downstream(task_a)crea un ciclo infinito. Airflow detectará esto y fallará al validar el DAG. Siempre verifica que tus dependencias formen un grafo acíclico dirigido (DAG). - Olvidar establecer todas las dependencias: Si no defines una dependencia necesaria, las tareas podrían ejecutarse en paralelo cuando deberían ser secuenciales, llevando a errores de datos. Usa la vista de grafo en la interfaz web para confirmar que el flujo es correcto.
- Confundir upstream con downstream: Recuerda que
set_downstreamsignifica "esta tarea va antes que la otra", mientras queset_upstreamsignifica "esta tarea va después de la otra". Una analogía útil: piensa en un río; upstream es aguas arriba (origen), downstream es aguas abajo (destino). - No considerar el tiempo de ejecución: En pipelines complejos, algunas tareas pueden fallar o tardar más de lo esperado. Asegúrate de que las dependencias reflejen no solo el orden lógico, sino también los tiempos reales de procesamiento para evitar cuellos de botella.
Checklist de dominio
- Puedo explicar la diferencia entre
set_upstreamyset_downstreamsin mirar la documentación. - He creado un DAG con al menos 5 tareas y dependencias múltiples usando estos métodos.
- Sé cómo visualizar y verificar dependencias en la interfaz web de Airflow.
- Puedo identificar y corregir ciclos en las dependencias de un DAG.
- Entiendo cómo las dependencias afectan la ejecución paralela y secuencial de tareas.
- He implementado un caso donde una tarea depende de múltiples tareas anteriores.
- Puedo traducir un diagrama de flujo de pipeline a código Airflow con dependencias correctas.
Construye un Pipeline de Procesamiento de Logs con Dependencias Complejas
En este ejercicio, crearás un DAG en Airflow para procesar logs de servidor. Sigue estos pasos:
- Crea un nuevo archivo Python para tu DAG llamado
procesamiento_logs.py. - Define las siguientes tareas usando PythonOperator:
descargar_logs: Simula la descarga de logs desde un servidor remoto (usa una función que imprima "Descargando logs...").parsear_logs: Simula el parseo de logs para extraer información estructurada.filtrar_errores: Simula filtrar solo los logs de error.generar_reporte: Simula generar un reporte de errores.enviar_alerta: Simula enviar una alerta si hay errores críticos.
- Establece las dependencias de modo que:
parsear_logsdependa dedescargar_logs.filtrar_erroresdependa deparsear_logs.generar_reportedependa defiltrar_errores.enviar_alertadependa defiltrar_errores(debe ejecutarse en paralelo congenerar_reporte).
- Usa tanto
set_upstreamcomoset_downstreamen tu código para practicar ambos métodos. - Prueba tu DAG ejecutándolo en un entorno local de Airflow o usando el comando
airflow tasks test.
- Recuerda que para tareas en paralelo, ambas deben depender de la misma tarea anterior pero no entre sí.
- Puedes usar
set_upstreampara una tarea yset_downstreampara otra para ver cómo se relacionan. - Verifica que no haya ciclos: cada tarea debe tener un camino claro desde el inicio hasta el fin.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.