Orquestación del DAG: Scheduling, Dependencias y Parámetros Dinámicos

Video
20 min~11 min lectura

Reproductor de video

Introducción a la Orquestación Avanzada en Airflow

La verdadera potencia de Apache Airflow se despliega cuando dominamos la orquestación de nuestros DAGs. Más allá de definir tareas y sus dependencias lineales, un pipeline ETL complejo exige un control preciso sobre cuándo se ejecuta, cómo se adapta a diferentes condiciones y cómo fluyen los datos entre sus componentes. Esta lección se adentra en los mecanismos que convierten un DAG simple en un sistema de producción robusto y confiable. Abordaremos el scheduling sofisticado, la gestión de dependencias más allá de `>>` y `<<`, y el uso de parámetros dinámicos para crear pipelines genéricos y reutilizables.

Imagina que tu pipeline no solo extrae datos de una fuente, sino que debe hacerlo para decenas de clientes, cada uno con configuraciones ligeramente distintas, o que ciertas tareas solo deben ejecutarse si la tarea anterior generó un volumen de datos suficiente. Estos escenarios son comunes en entornos de producción y requieren un diseño deliberado del DAG. La orquestación avanzada es el conjunto de herramientas y patrones que Airflow proporciona para resolver estos problemas, permitiéndote construir sistemas que no solo procesan datos, sino que toman decisiones sobre cómo procesarlos.

Concepto Clave: El DAG como Sistema de Control de Flujo

Antes de sumergirnos en los detalles técnicos, es crucial internalizar un concepto fundamental: en Airflow, un DAG no es simplemente una lista de tareas en orden. Es un sistema de control de flujo dirigido por eventos y tiempo. Piensa en él como el director de orquesta de una sinfonía. El director (el programador o scheduler) no toca ningún instrumento, sino que decide cuándo entra cada sección (las tareas), basándose en la partitura (el código del DAG) y en señales del entorno (dependencias y estados). Algunos instrumentos (tareas) solo pueden comenzar cuando otros han terminado un compás específico (estado de éxito), y la obra puede tener repeticiones o variaciones (parámetros dinámicos) para cada presentación.

Esta analogía se extiende a los parámetros dinámicos. Imagina que la misma sinfonía debe adaptarse para tocar en una catedral, un teatro o al aire libre. La partitura base es la misma, pero la afinación, el volumen y tal vez algunos arreglos (los parámetros) cambian según el contexto. En Airflow, un DAG parametrizado es esa partitura adaptable, que puede ejecutarse con diferentes configuraciones (por ejemplo, `client_id=45` o `region='europe'`) sin necesidad de reescribir el código para cada caso, permitiendo una reutilización masiva y una mantenibilidad superior.

Scheduling Avanzado y Ventanas de Ejecución

El parámetro `schedule_interval` es la piedra angular de la automatización. Mientras que un valor como `@daily` es sencillo, los pipelines complejos a menudo requieren cron expressions más elaboradas o incluso intervalos personalizados. Por ejemplo, un pipeline que consume datos de un sistema que se actualiza cada 4 horas los días laborables necesita un `schedule_interval` como `0 */4 * * 1-5`. Es vital comprender que Airflow programa una ejecución del DAG (una DAG Run) al *final* del intervalo. Una tarea programada con `@daily` se ejecutará justo después de la medianoche, procesando los datos del día *anterior*.

Además del intervalo, los argumentos `start_date` y `end_date` definen la ventana de vida del DAG. Un error común es usar una fecha dinámica como `datetime.now()` para `start_date`. Esto causa problemas porque el scheduler evalúa esta expresión cada vez que analiza el DAG, moviendo constantemente el punto de inicio. Debe ser una fecha fija y estática en el pasado. Para DAGs que deben procesar datos históricos, Airflow permite realizar backfills, ejecutando el DAG para intervalos de tiempo pasados, lo cual es una funcionalidad poderosa para la inicialización de pipelines.


from datetime import datetime, timedelta
from airflow import DAG

# Definición CORRECTA con fecha estática
default_args = {
    'owner': 'data_team',
    'start_date': datetime(2023, 10, 1),  # Fecha fija en el pasado
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

# DAG programado para ejecutarse a las 6 AM todos los días, y también los sábados a mediodía.
dag = DAG(
    'mi_pipeline_complejo',
    default_args=default_args,
    description='Un DAG con scheduling complejo',
    schedule_interval='0 6 * * *',  # Todos los días a las 6 AM. También podria ser '0 6,12 * * 6' para sábados.
    catchup=False,  # Importante: Evita backfills automáticos si no se desean.
    tags=['ETL', 'produccion'],
)

Dependencias Complejas y Flujos Condicionales

Las dependencias simples con los operadores `set_upstream`/`set_downstream` o `>>` y `<<` definen un grafo estático. Sin embargo, los pipelines del mundo real necesitan lógica condicional. ¿Qué pasa si una tarea de validación falla y debemos enviar una alerta en lugar de proceder con la transformación? Airflow ofrece varias herramientas. Los Branch Operators, como `BranchPythonOperator`, permiten elegir dinámicamente qué rama del flujo seguir basándose en el resultado de una función. Es como un desvío en una línea de montaje que redirige el producto según un control de calidad.

Otra herramienta poderosa es el concepto de Triggers Rules. Por defecto, una tarea se ejecuta solo si todas sus tareas antecedentes directas han tenido éxito (`all_success`). Pero podemos cambiarlo. La regla `all_done` ejecuta la tarea sin importar el estado de las antecedentes (útil para tareas de limpieza). La regla `one_success` ejecuta la tarea si al menos una antecedente tuvo éxito, y `none_failed` se ejecuta si ninguna antecedente falló (ignorando estados "skipped"). Esto permite construir flujos robustos de manejo de errores y ejecución en paralelo con reunificación condicional.


from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.edgemodifier import Label

def _validar_volumen(**context):
    ti = context['ti']
    # Supongamos que 'extraer_datos' empuja un conteo a XCom
    volumen = ti.xcom_pull(task_ids='extraer_datos', key='numero_registros')
    if volumen > 1000:
        return 'transformar_lote_grande'
    else:
        return 'transformar_lote_pequeno'

with DAG('dag_condicional', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:

    inicio = DummyOperator(task_id='inicio')
    extraer = PythonOperator(task_id='extraer_datos', python_callable=lambda: 1500)  # Simula volumen alto

    validar = BranchPythonOperator(
        task_id='validar_volumen',
        python_callable=_validar_volumen
    )

    transformar_grande = PythonOperator(task_id='transformar_lote_grande', python_callable=lambda: print("Procesando gran volumen"))
    transformar_pequeno = PythonOperator(task_id='transformar_lote_pequeno', python_callable=lambda: print("Procesando volumen pequeño"))

    notificar = PythonOperator(
        task_id='notificar_fin',
        python_callable=lambda: print("Pipeline completado"),
        trigger_rule='none_failed'  # Se ejecuta incluso si una rama fue 'skipped'
    )

    inicio >> extraer >> validar
    validar >> [transformar_grande, transformar_pequeno]
    [transformar_grande, transformar_pequeno] >> notificar

Cómo funciona en la práctica: Parámetros Dinámicos con Jinja y XCom

Vamos a seguir un ejemplo paso a paso de un pipeline que procesa datos para múltiples países. El objetivo es tener un único DAG que, en cada ejecución, procese un país específico pasado como parámetro. Paso 1: Definimos el DAG con un `schedule_interval` (por ejemplo, `@daily`). Paso 2: Configuramos la ejecución para recibir un parámetro. Esto se puede hacer mediante la interfaz web al activar un DAG Run manualmente, o programáticamente usando el concepto de DAG Runs configurados. Paso 3: Dentro de las tareas, accedemos a este parámetro dinámico usando el contexto de Airflow y la plantilla Jinja `{{ dag_run.conf.get('pais', 'default') }}`.

Paso 4: La tarea de extracción usa este parámetro para consultar solo los datos del país correspondiente. Paso 5: Los resultados de la extracción (por ejemplo, el número de registros o una muestra) pueden pasarse a tareas posteriores usando XCom. Paso 6: La tarea de transformación recibe tanto el parámetro original (para lógica específica del país) como los datos de XCom de la extracción. Paso 7: Finalmente, la tarea de carga utiliza toda esta información para escribir los datos en una tabla o ruta de destino que incluya el nombre del país, manteniendo la separación de datos. Este patrón evita la duplicación de código y centraliza la lógica.

Tip Crítico: Los parámetros dinámicos (`dag_run.conf`) son ideales para valores que cambian entre ejecuciones del *mismo* DAG. Para valores que son constantes para todas las ejecuciones pero que podrían cambiar ocasionalmente (como URLs de API o nombres de tablas), usa las Variables de Airflow almacenadas en la metabase. Combina ambos: usa una Variable para la clave de la API y un parámetro dinámico para el `client_id`.

Código en acción: Pipeline ETL Parametrizado Completo

A continuación, un ejemplo integral que combina scheduling, dependencias condicionales y parámetros dinámicos en un DAG funcional. Este pipeline simula la extracción, transformación y carga de datos de ventas para un país dado, con una rama de alerta si los datos están vacíos.


from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
import pandas as pd
import logging

def extraer_ventas(**context):
    pais = context['dag_run'].conf.get('pais', 'Chile')  # Parámetro dinámico
    fecha = context['data_interval_start'].strftime('%Y-%m-%d')
    logging.info(f"Extrayendo datos de ventas para {pais} en la fecha {fecha}")
    # Simulación: Datos extraídos
    datos_simulados = {'fecha': [fecha, fecha], 'monto': [100, 200], 'producto': ['A', 'B']}
    df = pd.DataFrame(datos_simulados)
    # Guardar datos en un archivo temporal o pasar via XCom
    json_data = df.to_json()
    context['ti'].xcom_push(key='datos_extraidos', value=json_data)
    context['ti'].xcom_push(key='pais', value=pais)
    return f"Extracción completada para {pais}"

def validar_datos(**context):
    ti = context['ti']
    json_data = ti.xcom_pull(task_ids='tarea_extraer', key='datos_extraidos')
    df = pd.read_json(json_data)
    if df.empty:
        logging.warning("Datos extraídos están VACÍOS. Se activará rama de alerta.")
        return 'tarea_alertar_vacio'
    else:
        logging.info(f"Datos validados. {len(df)} registros encontrados.")
        return 'tarea_transformar'

def transformar_datos(**context):
    ti = context['ti']
    json_data = ti.xcom_pull(task_ids='tarea_extraer', key='datos_extraidos')
    pais = ti.xcom_pull(task_ids='tarea_extraer', key='pais')
    df = pd.read_json(json_data)
    # Transformación específica por país
    if pais == 'Argentina':
        df['monto'] = df['monto'] * 1.65  # Simula conversión de moneda
    df['procesado_en'] = datetime.now()
    datos_transformados = df.to_json()
    ti.xcom_push(key='datos_transformados', value=datos_transformados)
    return "Transformación completada"

def cargar_datos(**context):
    ti = context['ti']
    datos_transformados = ti.xcom_pull(task_ids='tarea_transformar', key='datos_transformados')
    pais = ti.xcom_pull(task_ids='tarea_extraer', key='pais')
    df = pd.read_json(datos_transformados)
    # Simular carga a un destino (ej. nombre de tabla incluye el país)
    ruta_destino = f"/data/lake/ventas/pais={pais}/ventas_{context['ds']}.parquet"
    # df.to_parquet(ruta_destino) # Comentado para simulación
    logging.info(f"Datos cargados simuladamente en: {ruta_destino}")
    return f"Carga exitosa para {pais}"

def alertar_vacio(**context):
    pais = context['ti'].xcom_pull(task_ids='tarea_extraer', key='pais')
    logging.error(f"ALERTA: No hay datos de ventas para {pais} en la fecha {context['ds']}. Investigar fuente.")
    # Aquí se integraría con un servicio de notificaciones (Slack, Email, PagerDuty)
    return "Alerta enviada"

default_args = {
    'owner': 'ventas_etl',
    'start_date': datetime(2023, 11, 1),
    'retries': 1,
}

with DAG(
    'etl_ventas_por_pais',
    default_args=default_args,
    description='ETL parametrizado para procesar ventas por país',
    schedule_interval='0 2 * * *',  # Ejecución diaria a las 2 AM
    catchup=False,
    tags=['ventas', 'parametrizado', 'produccion'],
) as dag:

    inicio = DummyOperator(task_id='inicio')

    extraer = PythonOperator(
        task_id='tarea_extraer',
        python_callable=extraer_ventas,
        provide_context=True,
    )

    validar = BranchPythonOperator(
        task_id='tarea_validar',
        python_callable=validar_datos,
        provide_context=True,
    )

    transformar = PythonOperator(
        task_id='tarea_transformar',
        python_callable=transformar_datos,
        provide_context=True,
    )

    cargar = PythonOperator(
        task_id='tarea_cargar',
        python_callable=cargar_datos,
        provide_context=True,
    )

    alertar = PythonOperator(
        task_id='tarea_alertar_vacio',
        python_callable=alertar_vacio,
        provide_context=True,
    )

    fin = DummyOperator(
        task_id='fin',
        trigger_rule='none_failed',  # Se ejecuta incluso si fuimos por la rama de alerta (que tiene éxito)
    )

    # Definición del grafo de dependencias
    inicio >> extraer >> validar
    validar >> [transformar, alertar]
    transformar >> cargar
    [cargar, alertar] >> fin

Errores comunes y cómo evitarlos

1. Fecha de inicio dinámica: Usar `datetime.now()` en `start_date`. Esto causa que el scheduler "mueva la meta" constantemente, pudiendo generar ejecuciones inesperadas o ninguna. Solución: Usa siempre una fecha fija y estática en el pasado (ej: `datetime(2023, 1, 1)`). Para retrasar la primera ejecución, ajusta el `schedule_interval`.

2. Olvidar `provide_context=True` o `**context`: En operadores Python personalizados, si necesitas acceder a `dag_run.conf`, `data_interval_start` o `XCom`, debes habilitar el contexto. Solución: Siempre incluye `**context` en la firma de tu función y, en el `PythonOperator`, establece `provide_context=True` (en versiones más nuevas, el valor por defecto suele ser True).

3. Mal manejo del estado "skipped" en ramificaciones: Cuando usas un `BranchOperator`, las tareas no seleccionadas se marcan como `skipped`. Si una tarea posterior depende de ellas con la regla por defecto `all_success`, fallará. Solución: Usa `trigger_rule='none_failed'` o `'all_done'` en las tareas que reúnen las ramas, como se muestra en el ejemplo de `notificar` y `fin`.

4. Abuso de XCom para grandes volúmenes de datos: XCom está diseñado para metadatos y mensajes pequeños (por defecto, 48KB en la base de datos). No lo uses para pasar DataFrames completos. Solución: Usa almacenamiento intermedio como S3, GCS, o el sistema de archivos local, y pasa solo la ruta del archivo vía XCom.

5. Programación ambigua con `catchup=True`: Dejar `catchup=True` (valor por defecto) en un DAG con un `start_date` muy antiguo puede generar una cola masiva de ejecuciones históricas al desplegar el DAG por primera vez, saturando los recursos. Solución: Evalúa conscientemente. Para pipelines operativos, suele ser mejor establecer `catchup=False`. Los backfills históricos se pueden lanzar manualmente con control preciso.

Checklist de dominio

Antes de considerar dominada esta lección, asegúrate de poder verificar los siguientes puntos:

  • Puedo definir un DAG con una expresión cron compleja (ej: "los días 1 y 15 del mes a las 3:30 AM") y explicar cuándo se creará la DAG Run.
  • Sé la diferencia entre `start_date` estática y dinámica, y puedo explicar las consecuencias de usar `datetime.now()`.
  • Puedo diseñar un DAG que use `BranchPythonOperator` para dirigir el flujo a diferentes conjuntos de tareas basándose en los datos de XCom de una tarea anterior.
  • He implementado una tarea que utiliza `trigger_rule='none_failed'` o `'one_success'` para reunificar ramas de ejecución condicional.
  • Puedo crear y ejecutar manualmente un DAG Run pasando parámetros de configuración (ej: `{"cliente_id": 123}`) y acceder a esos parámetros dentro de mis funciones de tareas.
  • Sé cómo y cuándo usar `dag_run.conf` (parámetros dinámicos) versus Variables de Airflow (configuración semi-estática).
  • Puedo explicar el problema de pasar grandes volúmenes de datos a través de XCom y describir al menos una alternativa adecuada.
  • Comprendo el efecto de la configuración `catchup` y puedo decidir cuándo establecerla en `True` o `False` para un caso de uso dado.
De lección a portfolio

Convertí esta lección en una habilidad visible para entrevistas.

Guardá el curso, completá los ejercicios y conectá esta habilidad con una ruta de empleo, data, IA, programación o marketing.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión