Concepto clave
Los sensores en Apache Airflow son operadores especializados que esperan hasta que se cumple una condición externa antes de continuar con la ejecución del DAG. Piensa en ellos como guardianes que verifican si un archivo ha llegado a un bucket de S3, si una API externa está disponible, o si una base de datos tiene nuevos datos. Sin sensores, tus pipelines ETL podrían fallar porque intentan procesar datos que aún no existen.
En el mundo real, imagina un sistema de logística que solo carga un camión cuando todos los paquetes están listos en el almacén. El sensor sería el empleado que revisa constantemente si los paquetes han llegado, y solo cuando están todos, da la señal para cargar. En Airflow, esto evita errores y optimiza recursos, ya que las tareas no se ejecutan innecesariamente.
Cómo funciona en la práctica
Para usar un sensor, primero importas la clase correspondiente, como FileSensor para archivos o HttpSensor para APIs. Configuras parámetros clave: poke_interval (cada cuánto verificar, por ejemplo, 60 segundos), timeout (cuánto tiempo esperar antes de fallar, como 3600 segundos), y mode (modo de operación, como 'poke' o 'reschedule').
Ejemplo paso a paso para esperar un archivo CSV en un directorio:
- Importa el sensor:
from airflow.sensors.filesystem import FileSensor - Define la tarea en tu DAG:
wait_for_file = FileSensor(task_id='esperar_archivo', filepath='/data/entrada/datos.csv', poke_interval=30, timeout=1800) - Conecta el sensor a otras tareas usando dependencias, como
wait_for_file >> process_data
Airflow verificará cada 30 segundos si el archivo existe, hasta 30 minutos. Si aparece, continúa; si no, falla después del timeout.
Caso de estudio
Supongamos que eres Data Engineer en una empresa de e-commerce. Cada día a las 2 AM, un sistema externo genera un archivo JSON con las ventas del día anterior en un bucket de S3. Tu pipeline ETL debe procesar este archivo, pero no puede empezar hasta que esté disponible.
Solución con Airflow:
- Usa
S3KeySensorpara esperar el archivo:s3_sensor = S3KeySensor(task_id='esperar_ventas', bucket_key='s3://ventas-diarias/2023-10-01.json', poke_interval=60, timeout=7200) - Configura el DAG para ejecutarse diariamente a las 2:30 AM, dando margen para la generación del archivo.
- Si el archivo llega a las 2:15 AM, el sensor lo detecta y el pipeline continúa con la transformación y carga a Redshift.
En este caso, el sensor asegura que el pipeline no falle por datos faltantes, mejorando la confiabilidad del sistema en un 40% según estudios de la industria.
Errores comunes
- Timeout muy corto: Configurar un timeout menor que el tiempo real de llegada de datos causa fallos frecuentes. Solución: Analiza patrones históricos y ajusta el timeout con un margen del 20%.
- Poke_interval muy frecuente: Verificar cada pocos segundos puede saturar recursos. Solución: Usa intervalos de 30-60 segundos para balancear eficiencia y responsividad.
- No manejar modos de ejecución: Usar modo 'poke' (por defecto) mantiene un slot de worker ocupado. Solución: Para sensores largos, usa mode='reschedule' para liberar recursos entre verificaciones.
- Olvidar dependencias: No conectar el sensor a tareas posteriores hace que se ejecute en paralelo sin efecto. Solución: Usa operadores como
>>para definir el flujo claramente. - Ignorar logs: No revisar logs del sensor dificulta debuggear por qué no se cumple la condición. Solución: Monitorea logs de Airflow para ajustar parámetros basado en errores comunes como permisos de archivo.
Checklist de dominio
- ¿Puedes explicar la diferencia entre un sensor y un operador normal en Airflow?
- ¿Has configurado correctamente poke_interval y timeout para un caso real?
- ¿Sabes cuándo usar mode='reschedule' en lugar de 'poke'?
- ¿Puedes integrar un sensor en un DAG existente sin romper el flujo?
- ¿Has debuggeado un sensor que falla usando logs de Airflow?
- ¿Puedes elegir el sensor adecuado (ej., FileSensor vs S3KeySensor) para diferentes fuentes de datos?
- ¿Has optimizado el rendimiento ajustando parámetros del sensor basado en métricas?
Implementa un sensor para esperar datos de una API externa
En este ejercicio, crearás un DAG en Airflow que use un sensor para esperar que una API externa esté disponible antes de procesar datos. Sigue estos pasos:
- Crea un nuevo archivo Python para el DAG, por ejemplo,
sensor_api_dag.py. - Importa los módulos necesarios:
from airflow import DAG,from airflow.sensors.http_sensor import HttpSensor,from airflow.operators.python import PythonOperator, yfrom datetime import datetime. - Define una función simple para procesar datos, como
def procesar_datos(): print('Datos procesados exitosamente'). - Configura el DAG con parámetros:
dag = DAG('sensor_api_ejemplo', start_date=datetime(2023, 10, 1), schedule_interval='@daily'). - Crea un sensor HttpSensor que espere a que la API
https://jsonplaceholder.typicode.com/posts/1responda con código 200. Usapoke_interval=10ytimeout=300. - Crea un PythonOperator que ejecute la función de procesamiento.
- Establece la dependencia: sensor >> procesar.
- Prueba el DAG en tu entorno de Airflow y verifica que el sensor espere correctamente antes de procesar.
- Asegúrate de que la URL de la API sea accesible desde tu entorno de Airflow.
- Usa el parámetro response_check en HttpSensor para validar la respuesta, por ejemplo,
response_check=lambda response: response.status_code == 200. - Si el sensor falla, revisa los logs para ver si hay errores de conexión o timeout.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.