Introducción: El Pipeline API-to-Database
En el núcleo de la orquestación moderna de datos se encuentra la capacidad de conectar sistemas dispares de manera automatizada y confiable. Esta lección práctica te guiará en la construcción de un DAG de Apache Airflow que realiza una tarea fundamental: extraer datos de una API REST externa, transformarlos ligeramente para asegurar su calidad y estructura, y finalmente cargarlos en una base de datos relacional para su posterior análisis. Este patrón, conocido como ELT (Extract, Load, Transform) o ETL (Extract, Transform, Load) es la columna vertebral de la ingestión de datos.
Utilizaremos una API pública y gratuita para mantener la practicidad, específicamente la API de JSONPlaceholder, que simula datos de posts de un blog. Como base de datos, emplearemos SQLite por su simplicidad, aunque los principios aplican a PostgreSQL, MySQL, o cualquier otra base de datos compatible. El objetivo final es que comprendas no solo la sintaxis, sino la arquitectura de un DAG robusto, manejando consideraciones como el manejo de errores, la idempotencia y la conexión segura a servicios externos.
Concepto Clave: Idempotencia y Flujos de Datos Confiables
En el contexto de Airflow y los pipelines de datos, la idempotencia es un principio de diseño crítico. Una operación idempotente es aquella que puede ejecutarse múltiples veces sin cambiar el resultado más allá de la ejecución inicial. Imagina un interruptor de luz: presionarlo una vez enciende la luz, presionarlo de nuevo la apaga. No es idempotente. Ahora imagina un comando "pon la luz en ON": sin importar cuántas veces lo ejecutes, el resultado final será siempre "luz encendida". Eso es idempotencia.
En nuestro pipeline, esto se traduce en que si el DAG falla a mitad de camino y se re-ejecuta, no debería crear registros duplicados en la base de datos o causar inconsistencias. Lo logramos diseñando tareas que verifiquen el estado antes de actuar. Por ejemplo, nuestra tarea de carga podría implementar un mecanismo de "upsert" (insertar o actualizar) basado en una clave única, o podríamos diseñar el DAG para truncar la tabla de destino antes de una carga completa si el volumen de datos lo permite. La idempotencia, combinada con las capacidades de reintento de Airflow, es lo que convierte un simple script en un proceso de producción confiable.
Cómo Funciona en la Práctica: Arquitectura Paso a Paso
El DAG se estructurará en cuatro tareas principales, secuenciadas linealmente pero con lógica independiente. La primera tarea, crear_tabla, se encargará de establecer la estructura en la base de datos SQLite. Utilizará el Hook de SQLite de Airflow para ejecutar una sentencia CREATE TABLE IF NOT EXISTS. Esta es nuestra primera defensa hacia la idempotencia: la tabla se crea solo si no existe.
La segunda tarea, extraer_datos_api, consumirá la API HTTP. Usaremos el SimpleHttpOperator o, para mayor control, el PythonOperator con la librería requests. Esta tarea descargará los datos en formato JSON y los empujará al XCom de Airflow, un mecanismo de intercambio de mensajes entre tareas. Es crucial manejar aquí posibles errores HTTP (como 404 o 500) con excepciones adecuadas.
La tercera tarea, transformar_datos, recibirá el JSON crudo desde XCom. Aquí realizaremos limpieza: podríamos filtrar posts específicos, convertir tipos de datos, o aplanar estructuras anidadas. El resultado será una lista de diccionarios o tuplas lista para ser insertada en SQL. Finalmente, la tarea cargar_en_bd tomará los datos transformados y, utilizando el Hook de SQLite nuevamente, realizará una inserción eficiente, preferiblemente usando executemany para el lote completo, asegurando una transacción para mantener la integridad atómica.
Código en Acción: DAG Completo y Funcional
A continuación, se presenta la implementación completa del DAG. Asegúrate de tener instaladas las dependencias necesarias: apache-airflow-providers-http y apache-airflow-providers-sqlite. Crea un archivo Python en tu carpeta dags/ con el siguiente contenido.
Implementación del DAG Principal
from datetime import datetime, timedelta
import json
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
# Argumentos por defecto para el DAG
default_args = {
'owner': 'ingeniero_datos',
'depends_on_past': False,
'email_on_failure': True,
'email': ['[email protected]'],
'retries': 2,
'retry_delay': timedelta(minutes=3),
}
# Definición del DAG
with DAG(
'pipeline_api_a_sqlite',
default_args=default_args,
description='Un pipeline ETL que consume posts de una API y los carga en SQLite',
schedule_interval=timedelta(days=1), # Ejecución diaria
start_date=datetime(2023, 10, 1),
catchup=False, # Evita ejecuciones retroactivas
tags=['ETL', 'API', 'SQLite', 'Practica'],
) as dag:
# 1. Tarea: Crear la tabla en SQLite (idempotente)
crear_tabla = SqliteOperator(
task_id='crear_tabla',
sqlite_conn_id='sqlite_default', # Conexión definida en la UI de Airflow
sql='''
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY,
userId INTEGER NOT NULL,
title TEXT NOT NULL,
body TEXT NOT NULL,
fecha_carga TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
'''
)
# 2. Sensor: Verificar que la API esté disponible (opcional pero recomendado)
verificar_api_disponible = HttpSensor(
task_id='verificar_api_disponible',
http_conn_id='api_jsonplaceholder', # Conexión HTTP definida en Airflow
endpoint='/posts',
response_check=lambda response: response.status_code == 200,
poke_interval=5,
timeout=30,
)
# 3. Tarea: Extraer datos de la API
extraer_datos_api = SimpleHttpOperator(
task_id='extraer_datos_api',
http_conn_id='api_jsonplaceholder',
endpoint='/posts',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True,
)
# 4. Tarea: Transformar los datos (PythonOperator para flexibilidad)
def transformar_datos(**context):
# Extraer los datos del XCom de la tarea anterior
ti = context['ti']
datos_crudos = ti.xcom_pull(task_ids='extraer_datos_api')
datos_transformados = []
for post in datos_crudos:
# Ejemplo de transformación: seleccionar campos y limpiar título
registro_transformado = (
post['id'],
post['userId'],
post['title'].strip().capitalize(), # Limpieza básica
post['body'].replace('\n', ' '), # Quitar saltos de línea
)
datos_transformados.append(registro_transformado)
# Empujar los datos transformados al XCom para la siguiente tarea
context['ti'].xcom_push(key='posts_transformados', value=datos_transformados)
transformar = PythonOperator(
task_id='transformar_datos',
python_callable=transformar_datos,
)
# 5. Tarea: Cargar datos en SQLite
def cargar_en_bd(**context):
ti = context['ti']
datos_a_insertar = ti.xcom_pull(task_ids='transformar_datos', key='posts_transformados')
hook_sqlite = SqliteHook(sqlite_conn_id='sqlite_default')
conn = hook_sqlite.get_conn()
cursor = conn.cursor()
# Inserción por lotes. Usamos 'INSERT OR REPLACE' para idempotencia basada en clave primaria.
sql_insert = '''
INSERT OR REPLACE INTO posts (id, userId, title, body)
VALUES (?, ?, ?, ?)
'''
try:
cursor.executemany(sql_insert, datos_a_insertar)
conn.commit()
print(f"Se insertaron/actualizaron {cursor.rowcount} registros.")
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
cargar = PythonOperator(
task_id='cargar_en_bd',
python_callable=cargar_en_bd,
)
# Definición del flujo de tareas (dependencias)
crear_tabla >> verificar_api_disponible >> extraer_datos_api >> transformar >> cargar
Configuración de Conexiones en Airflow
Para que el DAG funcione, debes configurar las conexiones en la interfaz web de Airflow (Admin -> Connections).
# Ejemplo de cómo crear conexiones vía CLI (opcional, la UI es más común):
# airflow connections add 'sqlite_default' --conn-type 'sqlite' --conn-host '/tmp/airflow_db.sqlite'
# airflow connections add 'api_jsonplaceholder' --conn-type 'http' --conn-host 'https://jsonplaceholder.typicode.com'
Tip Importante: Nunca almacenes credenciales o URLs sensibles directamente en el código del DAG. Siempre utiliza el sistema de Connections y Variables de Airflow. Para APIs que requieran autenticación, puedes agregar login y password en la conexión HTTP, y el operador los usará automáticamente.
Errores Comunes y Cómo Evitarlos
1. Falta de Manejo de Errores en la Extracción: Asumir que la API siempre responderá con éxito es un error grave. Si la API devuelve un código 500 o se cae, tu DAG fallará.
Solución: Usa el HttpSensor para verificar disponibilidad antes de extraer. Además, encierra la lógica de requests en bloques try-except y considera el uso de retries en el DAG para errores transitorios.
2. Duplicación de Datos en la Carga (Falta de Idempotencia): Ejecutar el DAG múltiples veces puede llenar tu tabla de registros idénticos.
Solución: Diseña tu operación de carga para ser idempotente. Usa INSERT OR REPLACE (en SQLite), ON CONFLICT DO UPDATE (en PostgreSQL), o un patrón de "truncar y cargar" si es apropiado para tu caso de uso.
3. Pérdida de Datos en XCom por Tamaño Excesivo: XCom está diseñado para mensajes pequeños (por defecto, ~48KB en la base de datos de Airflow). Enviar un dataset enorme colapsará el sistema.
Solución: Para datasets grandes, evita XCom. Escribe los datos extraídos a un archivo temporal (por ejemplo, en S3 o el sistema de archivos local compartido) y pasa solo la ruta del archivo vía XCom. Las tareas siguientes leerán desde ese archivo.
4. Conexiones de Base de Datos no Gestionadas: Abrir conexiones en PythonOperator y no cerrarlas correctamente puede agotar los recursos del servidor de Airflow.
Solución: Utiliza siempre los Hooks de Airflow (como SqliteHook). Ellos manejan el ciclo de vida de la conexión de manera eficiente dentro del contexto de Airflow. En nuestro ejemplo, usamos get_conn() pero nos aseguramos de cerrar la conexión en un bloque finally.
5. Programación (Schedule) Mal Configurada con Catchup: Un start_date en el pasado combinado con catchup=True (valor por defecto) puede desencadenar una cascada de ejecuciones históricas no deseadas, saturando los recursos.
Solución: Para DAGs nuevos, establece explícitamente catchup=False en la definición del DAG. Comprende profundamente cómo Airflow calcula los execution_date y los intervalos de programación.
Checklist de Dominio
Antes de considerar esta lección completa, asegúrate de poder verificar los siguientes puntos:
- Puedo explicar la importancia de la idempotencia en un pipeline de Airflow y dar al menos un ejemplo de cómo implementarla en una tarea de carga.
- Sé configurar una Conexión de tipo HTTP y otra de tipo SQLite en la interfaz web de Airflow (o vía CLI) y referenciarlas correctamente en los operadores.
- Comprendo el rol del XCom para pasar datos entre tareas y conozco su limitación de tamaño, así como alternativas para datasets grandes.
- Puedo construir un DAG que integre al menos tres tipos diferentes de operadores (ej: SqliteOperator, PythonOperator, SimpleHttpOperator) definiendo correctamente sus dependencias.
- Sé implementar un mecanismo básico de manejo de errores en una función de PythonOperator (try-except) y aprovechar los reintentos (retries) a nivel de tarea y DAG.
- Puedo describir la diferencia entre un Sensor (como HttpSensor) y un Operador, y cuándo es apropiado usar cada uno.
- He ejecutado el DAG de ejemplo en un entorno de Airflow y he verificado que los datos aparecen correctamente en la tabla SQLite de destino.
- Sé cómo consultar los logs de las tareas en la UI de Airflow para depurar fallos durante la extracción, transformación o carga.