Concepto clave
En Apache Airflow, los operadores son las unidades básicas de trabajo que ejecutan tareas específicas, mientras que los sensores monitorean condiciones externas antes de permitir que el flujo continúe. Para un pipeline ETL que extrae datos de una API, necesitas combinar ambos: un sensor para verificar que la API esté disponible y operadores para realizar la extracción y transformación.
Imagina que estás construyendo una línea de ensamblaje en una fábrica. Los operadores son como las máquinas que realizan acciones concretas (cortar, soldar, pintar), y los sensores son como inspectores que verifican que cada pieza esté lista antes de pasar a la siguiente estación. En nuestro caso, la "pieza" son los datos de la API, y necesitamos asegurarnos de que estén accesibles antes de extraerlos.
Cómo funciona en la práctica
Vamos a construir un DAG que extrae datos de una API pública y los guarda en un archivo CSV. Sigue estos pasos:
- Define un sensor HTTP para verificar que la API esté respondiendo (por ejemplo, con un código de estado 200).
- Usa un operador Python para hacer la solicitud HTTP y extraer los datos en formato JSON.
- Transforma los datos si es necesario (por ejemplo, filtrando campos o convirtiendo tipos).
- Utiliza otro operador Python para guardar los datos en un archivo CSV en un directorio específico.
Ejemplo de código para el operador de extracción:
def extract_from_api(**kwargs):
import requests
response = requests.get('https://api.example.com/data')
response.raise_for_status()
return response.json()Caso de estudio
Supongamos que trabajas para una empresa de comercio electrónico que necesita monitorear precios de competidores. La API de un competidor proporciona datos en tiempo real sobre productos, pero solo está disponible durante ciertas horas. Construyes un pipeline con Airflow que:
- Usa un HttpSensor para verificar cada hora si la API está activa.
- Extrae los datos con un PythonOperator que llama a la API y obtiene un JSON con productos y precios.
- Transforma los datos para incluir solo productos relevantes y calcular promedios.
- Guarda el resultado en un CSV que luego se carga a una base de datos para análisis.
Datos de ejemplo en una tabla:
| Producto | Precio | Disponibilidad |
|---|---|---|
| Laptop X | 1200 | En stock |
| Smartphone Y | 800 | Agotado |
Nota: En un escenario real, podrías manejar miles de productos, por lo que la automatización con Airflow es crucial para eficiencia y confiabilidad.
Errores comunes
- No manejar errores de API: Si la API falla, el pipeline puede detenerse sin notificación. Usa excepciones y reintentos en los operadores.
- Olvidar timeouts en sensores: Un sensor sin timeout puede bloquear el DAG indefinidamente. Configura un tiempo máximo de espera.
- Guardar datos en rutas incorrectas: Asegúrate de que el directorio de salida para el CSV exista y tenga permisos adecuados.
- No validar el formato de datos: La API podría devolver datos inesperados. Incluye validaciones en el código de extracción.
Checklist de dominio
- ¿Puedes configurar un HttpSensor para monitorear una API específica?
- ¿Sabes escribir un operador Python que extraiga datos de una API y los devuelva en un formato usable?
- ¿Eres capaz de transformar datos JSON a un DataFrame de pandas para facilitar el guardado en CSV?
- ¿Puedes manejar errores comunes como timeouts o respuestas vacías de la API?
- ¿Has probado el pipeline completo en un entorno de desarrollo antes de llevarlo a producción?
- ¿Entiendes cómo programar el DAG para ejecutarse en intervalos regulares (por ejemplo, cada hora)?
- ¿Sabes verificar los logs de Airflow para depurar problemas en la extracción o guardado de datos?
Construye un pipeline que extraiga datos de una API de clima y los guarde en CSV
En este ejercicio, crearás un DAG en Apache Airflow que automatice la extracción de datos de una API pública de clima y los guarde en un archivo CSV. Sigue estos pasos:
- Configura un entorno de Airflow local o usa un sandbox disponible.
- Crea un nuevo archivo Python para el DAG, por ejemplo,
clima_pipeline.py. - Define un HttpSensor que verifique la disponibilidad de la API
https://api.open-meteo.com/v1/forecast(puedes usar otra API pública si prefieres). - Implementa un PythonOperator que haga una solicitud GET a la API para obtener el pronóstico del clima de una ciudad específica (por ejemplo, Madrid).
- Transforma la respuesta JSON para extraer solo temperatura, humedad y fecha.
- Usa otro PythonOperator para guardar estos datos en un archivo CSV en
/tmp/clima_data.csv. - Programa el DAG para ejecutarse diariamente a las 8 AM.
- Prueba el DAG activándolo y verificando que el archivo CSV se genere correctamente.
- Usa la biblioteca requests en el operador Python para hacer la solicitud HTTP.
- Asegúrate de manejar posibles errores en la respuesta de la API, como códigos de estado no 200.
- Para guardar en CSV, considera usar pandas DataFrame o el módulo csv de Python.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.