Configurar Sensores para Esperar Eventos Externos

Video
20 min~9 min lectura
Objetivo de la lección

Introducción a los Sensores en Apache Airflow En el mundo de la orquestación de datos, no todos los procesos pueden o deben ejecutarse de inmediato.

Puntos de control
  • Introducción a los Sensores en Apache Airflow
  • Concepto Clave: La Espera Inteligente y el Patrón Poke
  • Cómo Funciona en la Práctica: Anatomía de un Sensor
  • Código en Acción: Implementando un DAG con Múltiples Sensores

Reproductor de video

Introducción a los Sensores en Apache Airflow

En el mundo de la orquestación de datos, no todos los procesos pueden o deben ejecutarse de inmediato. A menudo, un paso en tu pipeline depende de que un evento externo ocurra: que un archivo llegue a un bucket de S3, que una partición se materialice en Hive, o que una base de datos termine un proceso de carga. Aquí es donde los Sensores de Apache Airflow brillan. Un Sensor es un tipo especial de operador diseñado para esperar, de manera eficiente y persistente, hasta que se cumpla una condición específica en un sistema externo. A diferencia de los operadores de acción, que hacen algo, los sensores esperan a que algo suceda, vigilando el estado del mundo exterior.

La implementación de sensores es fundamental para construir DAGs robustos y resilientes que reflejen dependencias reales, no solo temporales o secuenciales. Sin ellos, tendrías que recurrir a soluciones rudimentarias como time.sleep en tus tareas, lo que es ineficiente, propenso a errores y desperdicia recursos de tu infraestructura. Los sensores, por el contrario, están diseñados para "despertar" periódicamente, verificar la condición y, si no se cumple, volver a dormir (cediendo su slot de ejecución), un patrón conocido como poke. Esta lección te equipará para dominar el uso de sensores, configurando la espera inteligente que tus pipelines complejos necesitan.

Concepto Clave: La Espera Inteligente y el Patrón Poke

Imagina que estás organizando una cena. Tienes todo listo para servir el plato principal, pero estás esperando a que el panadero entregue el pan fresco. Tienes dos opciones: quedarte parado en la puerta, mirando fijamente la calle sin hacer nada más (equivalente a un while True con sleep), o puedes seguir preparando la mesa, ajustando la música, y solo salir a verificar si el panadero ha llegado cada 5 minutos. Esta segunda estrategia es el patrón poke. Eres productivo mientras esperas el evento externo (la llegada del panadero). En Airflow, el "salir a verificar" es la función de poke del sensor, y el tiempo entre verificaciones se controla con el parámetro poke_interval.

Otro concepto crucial es el timeout. No puedes esperar el pan para siempre. Si después de 2 horas el panadero no llega, decides servir la cena sin pan y marcar ese evento como un problema. En Airflow, el parámetro timeout define el tiempo total máximo que un sensor estará en estado "running" esperando. Si se supera este tiempo, la tarea falla. La combinación de poke_interval y timeout te permite ajustar el balance entre la sensibilidad (verificar muy seguido) y la eficiencia de recursos (no saturar el sistema externo con consultas). Un sensor bien configurado es un ciudadano educado en tu clúster de Airflow.

Tip del Experto: El modo por defecto de un sensor es poke, donde la tarea se ejecuta en un ciclo de espera. Sin embargo, para condiciones que pueden tomar mucho tiempo (horas), considera el modo reschedule. En este modo, si la condición no se cumple, la tarea se "apaga" completamente y libera su worker. Luego, el scheduler la reprograma para un nuevo intento más tarde. Esto es mucho más amigable con tus recursos, especialmente en entornos con muchos sensores de larga duración.

Cómo Funciona en la Práctica: Anatomía de un Sensor

Para implementar un sensor, típicamente heredarás de la clase base BaseSensorOperator e implementarás el método poke(context). Este método contiene la lógica de verificación: debe devolver True si la condición se cumple (éxito) o False si aún no (sigue esperando). Airflow se encarga del ciclo de poke por ti. Sin embargo, la comunidad y los proveedores ya han creado decenas de sensores listos para usar para los sistemas más comunes: S3KeySensor para AWS S3, SqlSensor para bases de datos SQL, HivePartitionSensor para Hive, ExternalTaskSensor para esperar otras tareas de Airflow, y muchos más.

Vamos a desglosar un flujo paso a paso. Supongamos que tu DAG necesita procesar un archivo de ventas diario que un sistema externo sube a un bucket de S3. 1) Primero, defines un S3KeySensor que apunte a la ruta esperada, por ejemplo, s3://mi-bucket/ventas/2023-10-27/data.csv. 2) Configuras el sensor con un poke_interval de 5 minutos y un timeout de 6 horas. 3) Cuando el DAG se ejecuta, el sensor entra en estado "running". Cada 5 minutos, el worker ejecuta la lógica del sensor, que hace una llamada a la API de S3 para listar objetos en ese prefijo. 4) Si el archivo no existe, devuelve False y el sensor "duerme". 5) En el momento en que el sistema externo sube el archivo, en el próximo ciclo de poke, la verificación devuelve True, el sensor se marca como éxito, y el DAG procede con la siguiente tarea, por ejemplo, un PythonOperator para procesar el archivo.

Código en Acción: Implementando un DAG con Múltiples Sensores

A continuación, un ejemplo completo y funcional de un DAG que utiliza dos sensores para orquestar un pipeline que depende de un archivo en S3 y de la finalización de otro DAG. Este DAG se ejecuta diariamente, pero solo procede cuando sus dependencias externas están satisfechas.


from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator

default_args = {
    'owner': 'equipo_etl',
    'depends_on_past': False,
    'start_date': datetime(2023, 10, 1),
    'email_on_failure': True,
    'email': ['[email protected]'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def procesar_archivo():
    # Lógica simulada de procesamiento
    print("Archivo encontrado. Iniciando procesamiento ETL...")
    # Aquí iría el código para descargar, transformar y cargar los datos.
    pass

with DAG(
    'dag_sensores_avanzado',
    default_args=default_args,
    description='DAG que espera un archivo en S3 y la finalización de otro DAG',
    schedule_interval='0 8 * * *',  # Ejecución diaria a las 8 AM
    catchup=False,
    tags=['ETL', 'S3', 'monitoreo'],
) as dag:

    inicio = DummyOperator(task_id='inicio')

    # Sensor para esperar un archivo específico en S3
    # Usa el modo 'reschedule' para condiciones de larga espera.
    esperar_archivo_s3 = S3KeySensor(
        task_id='esperar_archivo_s3',
        bucket_key='s3://mi-bucket-datos/entrada/diaria/ventas_{{ ds_nodash }}.csv',
        aws_conn_id='aws_conexion_default',
        poke_interval=300,  # 5 minutos entre verificaciones
        timeout=86400,      # Timeout de 24 horas (en segundos)
        mode='reschedule',  # Libera el worker entre pokes
        soft_fail=False,    # Si falla por timeout, la tarea falla.
    )

    # Sensor para esperar que otro DAG haya terminado exitosamente
    esperar_dag_externo = ExternalTaskSensor(
        task_id='esperar_dag_externo',
        external_dag_id='dag_limpieza_maestra',  # ID del DAG que debemos esperar
        external_task_id=None,  # None significa que esperamos que todo el DAG esté completo
        allowed_states=['success'],  # Solo procedemos si el DAG externo tuvo éxito
        execution_delta=timedelta(hours=1),  # Esperamos la ejecución de hace 1 hora (para dar margen)
        poke_interval=600,  # Verifica cada 10 minutos
        timeout=7200,       # Timeout de 2 horas
        mode='poke',
    )

    procesar = PythonOperator(
        task_id='procesar_archivo',
        python_callable=procesar_archivo,
    )

    fin = DummyOperator(task_id='fin')

    # Definición de las dependencias del DAG
    inicio >> [esperar_archivo_s3, esperar_dag_externo] >> procesar >> fin

Este DAG demuestra un patrón poderoso. Los sensores esperar_archivo_s3 y esperar_dag_externo se ejecutan en paralelo. El DAG solo procederá a la tarea procesar cuando ambas condiciones se hayan cumplido. El uso de mode='reschedule' en el sensor de S3 es clave para un recurso de larga duración, mientras que el ExternalTaskSensor en modo poke es adecuado para una espera más corta y predecible. Nota el uso de la macro {{ ds_nodash }} para hacer la ruta del archivo dinámica según la fecha de ejecución del DAG.

Errores Comunes y Cómo Evitarlos

Al trabajar con sensores, varios errores sutiles pueden causar frustración. Aquí detallamos los más frecuentes y sus soluciones.

1. Timeouts Subestimados y DAGs Atascados: El error más común es configurar un timeout menor que el tiempo real que puede tomar el evento. Si el sensor falla por timeout, todo el DAG falla. Solución: Analiza el patrón histórico del evento externo. ¿A qué hora suele llegar el archivo? Agrega un margen de seguridad generoso (ej., 50%). Para eventos muy variables, considera lógica de reintento (retries) en el sensor o implementa un mecanismo de alerta antes del timeout.

2. Modo Poke vs. Reschedule Incorrecto: Usar el modo poke por defecto para un sensor que esperará 12 horas mantendrá un worker ocupado e inactivo la mayor parte del tiempo, agotando los recursos de tu pool. Solución: Para esperas superiores a ~1 hora, usa casi siempre mode='reschedule'. Esto hace que la tarea se libere y se reprograme, siendo mucho más eficiente.

3. Conexiones No Configuradas: Sensores como S3KeySensor o SqlSensor requieren una conexión de Airflow configurada (ej., aws_conn_id). Si la conexión no existe o tiene credenciales erróneas, el sensor fallará inmediatamente. Solución: Antes de desplegar el DAG, verifica en la interfaz de Airflow (Admin -> Connections) que la conexión referenciada existe y está probada.

4. Lógica de Poke Costosa o No Idempotente: Si la función poke que escribes (o que usa el sensor interno) realiza operaciones costosas o con efectos secundarios (como insertar un registro cada vez que verifica), puedes saturar el sistema externo o corromper datos. Solución: La verificación debe ser liviana y de solo lectura. Para sensores personalizados, asegúrate de que tu método poke sea idempotente (ejecutarlo múltiples veces tiene el mismo efecto que ejecutarlo una vez).

5. No Considerar la Ejecución de Catchup: Si tu DAG tiene catchup=True (por defecto en algunas versiones) y se activa después de unos días, Airflow programará múltiples ejecuciones pasadas. Cada una de estas ejecuciones lanzará su propio sensor, lo que puede generar una avalancha de peticiones al sistema externo para verificar archivos antiguos que quizás nunca existirán. Solución: Para DAGs basados en sensores de eventos externos, casi siempre debes establecer catchup=False en los argumentos del DAG. La orquestación debe ser reactiva al evento, no a la fecha del calendario.

Checklist de Dominio

Antes de considerar que dominas la configuración de sensores en Airflow, verifica que puedes realizar y comprender cada uno de estos puntos:

  • Puedo explicar la diferencia entre el modo poke y reschedule de un sensor y seleccionar el apropiado según el tiempo de espera esperado.
  • He configurado correctamente al menos dos sensores de proveedor diferentes (e.g., S3, SQL, HDFS, ExternalTask) en un DAG funcional.
  • Sé cómo y dónde configurar las conexiones de Airflow (AWS, Base de Datos, etc.) que los sensores requieren para funcionar.
  • Puedo calcular y definir parámetros razonables de poke_interval y timeout basándome en el SLA del evento externo y la política de recursos de mi clúster.
  • Comprendo el riesgo de usar catchup=True con DAGs sensibles a eventos y sé cómo desactivarlo.
  • Puedo diagnosticar y resolver un error común de sensor, como un timeout prematuro o un fallo de conexión.
  • Sé cómo crear un sensor personalizado heredando de BaseSensorOperator para un sistema interno que no tiene un sensor preconstruido.
  • He integrado sensores en un pipeline donde múltiples dependencias externas deben cumplirse antes de proceder, manejando correctamente sus dependencias en el DAG.
Falar no WhatsApp
De lección a portfolio

Convertí esta lección en evidencia para Data Analyst.

Sumá un mini caso con datos, una conclusión de negocio y una captura del resultado. Eso pesa más que decir que viste la herramienta.

Paso 1

Publicá una consulta, dashboard o notebook con una conclusión clara.

Paso 2

Agregá contexto: problema, dato usado, decisión recomendada y limitación.

Paso 3

Guardá el enlace en tu CV, LinkedIn o portfolio antes de postular.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión

Configurar Sensores para Esperar Eventos Externos | Cursalo