Introducción: Construyendo Nuestro Primer Pipeline de Datos
En esta lección práctica, pasaremos de la teoría a la acción, configurando nuestro primer DAG (Grafo Acíclico Dirigido) funcional en Apache Airflow. El objetivo es crear un pipeline automatizado que se conecte a una API pública, descargue datos en un formato estructurado y los prepare para su posterior procesamiento. Este es el núcleo de la orquestación ETL: automatizar la extracción de datos de fuentes externas de manera confiable y programada.
Utilizaremos la API pública de OpenWeatherMap como nuestro caso de estudio, descargando datos meteorológicos actuales para una lista de ciudades. Este ejemplo es ideal porque representa un escenario real: una fuente de datos externa, autenticación mediante una clave API, y datos en formato JSON que necesitan ser capturados y almacenados. Al final de esta lección, tendrás un DAG completamente operativo que puedes ejecutar, monitorear y extender.
Configuraremos el entorno paso a paso, definiremos las tareas utilizando el PythonOperator y el PythonVirtualenvOperator, y aprenderemos a manejar dependencias, conexiones y variables dentro de Airflow. Este ejercicio sentará las bases para pipelines más complejos que involucren transformación y carga de datos.
Concepto Clave: El DAG como Blueprint de Tu Pipeline
Imagina que estás construyendo una línea de ensamblaje en una fábrica. No comienzas a mover piezas al azar; primero dibujas un plano detallado que especifica el orden de las operaciones: "Primero se suelda A a B, luego se pinta, después se ensambla con C". Un DAG en Airflow es exactamente ese plano para tu pipeline de datos. Es una definición escrita en Python que describe qué se debe hacer (las tareas) y en qué orden (las dependencias), pero no ejecuta nada por sí mismo. Airflow's scheduler lee este plano y orquesta la ejecución real de las tareas según la programación definida.
Una analogía más cotidiana sería una receta de cocina. La receta (el DAG) lista los ingredientes (los datos de entrada y parámetros), los pasos a seguir (las tareas como picar, mezclar, hornear), y el orden crítico (no puedes hornear antes de mezclar). El chef (Airflow) sigue la receta al pie de la letra. La potencia de Airflow radica en que este "plano" o "receta" es código, lo que nos permite agregar lógica, manejar errores y reutilizar componentes con una flexibilidad enorme.
Dos conceptos son fundamentales aquí: Operadores y Flujos. Los operadores (como PythonOperator) son las unidades de trabajo, las acciones concretas (descargar de una API, ejecutar un query SQL). Los flujos se definen usando los operadores bitshift `>>` y `<<`, estableciendo relaciones como "la tarea A debe completarse antes de que la tarea B pueda comenzar". Esta separación entre la definición (el DAG) y la ejecución es lo que permite a Airflow gestionar el estado, reintentos y logging de manera tan robusta.
Cómo Funciona en la Práctica: Paso a Paso
Vamos a desglosar la construcción de nuestro DAG en pasos concretos. Primero, necesitamos preparar nuestro entorno de Airflow. Asegúrate de tener una instalación funcional de Airflow (por ejemplo, usando Docker o una instalación local). Nuestro DAG vivirá en la carpeta `DAGS_FOLDER` configurada en Airflow (típicamente `~/airflow/dags` o `/opt/airflow/dags`). Crearemos un nuevo archivo Python llamado `weather_api_pipeline.py` dentro de esa carpeta.
El primer paso dentro del archivo es importar los módulos necesarios. Luego, definimos argumentos por defecto que se aplicarán a todas las tareas del DAG, como el propietario, los reintentos en caso de fallo y el intervalo de reintento. Después, instanciamos el objeto DAG, dándole un identificador único (`dag_id`), una descripción y la programación (usando un cron expression o un timedelta). Para este ejemplo, lo programaremos para ejecutar diariamente a las 6 AM.
A continuación, definimos las funciones Python que contendrán la lógica de nuestras tareas. La función principal, `extract_weather_data`, se encargará de realizar la petición HTTP a la API de OpenWeatherMap, manejar la respuesta y guardar los datos en un archivo JSON con una marca de tiempo. Finalmente, creamos una tarea usando `PythonOperator` que apunta a esa función, y definimos el flujo del DAG (en este caso, un flujo lineal simple con una sola tarea). Una vez guardado el archivo, Airflow detectará automáticamente el nuevo DAG en su interfaz web en unos segundos.
Código en Acción: DAG Completo y Funcional
A continuación, se presenta el código completo para nuestro DAG de descarga de datos meteorológicos. Este código es funcional y puedes copiarlo directamente a tu carpeta de DAGs, ajustando los parámetros necesarios como la `API_KEY`. Nota el uso de `PythonVirtualenvOperator` para aislar las dependencias, una práctica recomendada.
from datetime import datetime, timedelta
import json
import requests
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator
from airflow.operators.dummy import DummyOperator
# Argumentos por defecto para el DAG
default_args = {
'owner': 'equipo_datos',
'depends_on_past': False,
'email_on_failure': True,
'email': ['[email protected]'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
# Definir el objeto DAG
with DAG(
dag_id='descarga_datos_climaticos',
default_args=default_args,
description='DAG que descarga datos actuales del clima para una lista de ciudades desde OpenWeatherMap API',
schedule_interval='0 6 * * *', # Ejecuta diariamente a las 6 AM
start_date=datetime(2023, 10, 1),
catchup=False, # Importante: Evita ejecuciones retroactivas
tags=['ETL', 'API', 'clima'],
) as dag:
# Función que contendrá la lógica de extracción, aislada en un entorno virtual
def extraer_datos_climaticos(**context):
"""
Función que se ejecuta en un entorno virtual aislado.
Descarga datos de clima y los guarda en un archivo JSON.
"""
# Librerías necesarias - se instalarán en el venv automáticamente
import requests
import json
from datetime import datetime
from pathlib import Path
# Configuración
API_KEY = 'tu_api_key_aqui' # ¡DEBES REEMPLAZAR ESTO!
BASE_URL = "http://api.openweathermap.org/data/2.5/weather"
CIUDADES = ["London,uk", "New York,us", "Tokyo,jp", "Madrid,es", "Sydney,au"]
DIRECTORIO_SALIDA = Path("/tmp/weather_data") # Ruta persistente en el worker
# Asegurar que el directorio de salida existe
DIRECTORIO_SALIDA.mkdir(parents=True, exist_ok=True)
datos_totales = []
fecha_ejecucion = context['ds'] # Fecha de ejecución del DAG en formato YYYY-MM-DD
for ciudad in CIUDADES:
try:
parametros = {
'q': ciudad,
'appid': API_KEY,
'units': 'metric' # Para obtener temperaturas en Celsius
}
respuesta = requests.get(BASE_URL, params=parametros, timeout=30)
respuesta.raise_for_status() # Lanza excepción para códigos 4xx/5xx
datos_ciudad = respuesta.json()
# Enriquecer los datos con metadatos útiles
datos_ciudad['_extraido_en'] = datetime.utcnow().isoformat()
datos_ciudad['_ciudad_solicitada'] = ciudad
datos_totales.append(datos_ciudad)
print(f"Datos descargados exitosamente para: {ciudad}")
except requests.exceptions.RequestException as e:
print(f"Error al descargar datos para {ciudad}: {e}")
# En un caso real, podrías decidir fallar el DAG o continuar
continue
# Guardar todos los datos en un archivo JSON con marca de tiempo
if datos_totales:
nombre_archivo = DIRECTORIO_SALIDA / f"datos_clima_{fecha_ejecucion}.json"
with open(nombre_archivo, 'w') as f:
json.dump(datos_totales, f, indent=2)
print(f"Datos guardados en: {nombre_archivo}")
return str(nombre_archivo) # Se puede pasar a tareas downstream via XCom
else:
raise ValueError("No se pudo descargar datos para ninguna ciudad.")
# Tarea de inicio (opcional, buena práctica)
inicio = DummyOperator(
task_id='inicio'
)
# Tarea principal usando PythonVirtualenvOperator para aislamiento
descargar_datos = PythonVirtualenvOperator(
task_id='descargar_datos_climaticos',
python_callable=extraer_datos_climaticos,
requirements=[
'requests==2.28.2', # Especificamos versión para reproducibilidad
],
system_site_packages=False, # Entorno limpio
op_kwargs={}, # Argumentos para la función callable
provide_context=True, # Para acceder al contexto (como 'ds')
)
# Tarea de finalización (opcional)
fin = DummyOperator(
task_id='fin'
)
# Definir el flujo del DAG
inicio >> descargar_datos >> fin
Este DAG define un flujo simple pero completo. La tarea `descargar_datos_climaticos` es la que realiza el trabajo pesado, ejecutando nuestra función `extraer_datos_climaticos` dentro de un entorno virtual Python aislado. Esto garantiza que las dependencias (`requests`) no entren en conflicto con otras tareas del sistema. Las tareas `DummyOperator` de inicio y fin son útiles para organizar visualmente el grafo y para puntos de anclaje si en el futuro agregamos más tareas paralelas o en serie.
Observa el uso del parámetro `provide_context=True`. Esto permite que nuestra función acceda al contexto de Airflow, un diccionario rico con metadatos de la ejecución, como la fecha lógica (`ds`), el ID de la ejecución (`run_id`) y más. Aquí lo usamos para nombrar el archivo de salida con la fecha de ejecución del DAG, evitando sobrescrituras y manteniendo un historial claro.
Errores Comunes y Cómo Evitarlos
Al configurar tu primer DAG, es fácil tropezar con ciertos errores recurrentes. Aquí te presentamos los más comunes y estrategias para evitarlos desde el principio.
1. No Gestionar la API_KEY de Forma Segura: Hardcodear la clave API en el código del DAG es un grave riesgo de seguridad y una mala práctica. Si el código se sube a un repositorio, la clave queda expuesta.
Solución: Utiliza las Connections de Airflow. Ve a Admin -> Connections en la interfaz web, crea una nueva conexión de tipo HTTP, y almacena la API Key en el campo "Password" o "Extra" como JSON. Luego, en tu función, recupera la conexión usando `BaseHook.get_connection('openweathermap_conn')`.
2. Olvidar `catchup=False` en DAGs Programados: Si defines un `start_date` en el pasado y una `schedule_interval`, Airflow, por defecto, intentará ejecutar ("hacer catchup") todas las ejecuciones no realizadas desde esa fecha de inicio, lo que puede saturar tu sistema.
Solución: Siempre establece `catchup=False` en la definición del DAG a menos que tengas un requerimiento explícito de procesamiento retroactivo. Puedes también configurarlo a nivel de la configuración de Airflow (`catchup_by_default = False`).
3. No Manejar Excepciones en las Peticiones HTTP: Las APIs pueden fallar por tiempo de espera, límites de tasa, o errores del servidor. Un código que no captura excepciones de `requests` fallará la tarea en el primer problema, sin oportunidad de reintento inteligente para otras ciudades.
Solución: Implementa un bloque try-except robusto alrededor de cada petición, como se muestra en el código. Registra el error, pero considera si quieres que la tarea falle completamente (`raise`) o si puede continuar con los demás elementos (como en nuestro bucle de ciudades).
4. Usar `PythonOperator` con Dependencias Complejas en Lugar de `PythonVirtualenvOperator`: Si tu función necesita un paquete con una versión específica que entra en conflicto con otras tareas, usar `PythonOperator` puede causar errores difíciles de diagnosticar.
Solución: Adopta el `PythonVirtualenvOperator` como estándar para tareas con dependencias externas. Crea un entorno virtual aislado para cada ejecución de la tarea. Aunque tiene una sobrecarga de rendimiento, la reproducibilidad y confiabilidad que ofrece son invaluables.
5. No Validar la Estructura de los Datos Descargados: Asumir que la API siempre devuelve el JSON esperado es peligroso. Los esquemas de las APIs pueden cambiar.
Solución: Agrega validación básica dentro de tu función. Verifica que la respuesta contenga las claves esenciales (ej., `'main'`, `'weather'`) antes de proceder. Puedes usar una librería como `jsonschema` para una validación más estricta en entornos de producción.
Checklist de Dominio
Antes de considerar esta lección completamente interiorizada, asegúrate de poder verificar los siguientes puntos. Cada uno representa una habilidad práctica clave para la orquestación de pipelines con Airflow.
- Puedo crear un archivo DAG desde cero que sea detectado automáticamente por la interfaz web de Airflow.
- Comprendo la diferencia entre `PythonOperator` y `PythonVirtualenvOperator` y sé cuándo es recomendable usar cada uno.
- Sé cómo definir y utilizar argumentos por defecto (`default_args`) para configurar el comportamiento de reintento y notificación de todas las tareas de un DAG.
- Puedo recuperar credenciales de forma segura utilizando las Connections de Airflow en lugar de escribirlas en el código.
- Sé cómo acceder y utilizar el contexto de ejecución (por ejemplo, `ds`, `run_id`) dentro de mis funciones Python para parametrizar el procesamiento.
- Implemento manejo de errores básico en mis tareas, capturando excepciones específicas y decidiendo si fallar la tarea o continuar.
- Puedo definir dependencias entre tareas utilizando los operadores bitshift (`>>` y `<<`) para crear flujos lineales, paralelos o condicionales.
- Sé dónde buscar los logs de mis tareas en la interfaz web de Airflow para depurar fallos.