Introducción al Diseño de Extracción de Múltiples Fuentes
En el núcleo de cualquier pipeline ETL robusto se encuentra la fase de extracción, una etapa crítica donde la calidad, confiabilidad y estructura de los datos obtenidos sientan las bases para todo el proceso posterior. En un proyecto integrador con Apache Airflow, esta fase se vuelve significativamente más compleja cuando debemos orquestar la recolección de información desde orígenes heterogéneos, como APIs RESTful con límites de tasa, archivos planos en servidores remotos (SFTP), bases de datos y feeds en tiempo real. El diseño de esta extracción no es simplemente una cuestión de escribir scripts; es una cuestión de arquitectura que debe considerar la idempotencia, el manejo de errores, la eficiencia en el uso de recursos y la trazabilidad absoluta de cada dato.
Esta lección se enfoca en el diseño e implementación de un módulo de extracción dentro de un DAG de Airflow, capaz de consumir datos de manera concurrente o secuencial desde múltiples fuentes. Abordaremos estrategias para aislar la lógica de extracción en componentes reutilizables (como PythonOperators y TaskGroups), diseñar mecanismos de reintento inteligentes y estructurar el flujo de datos para que las etapas posteriores de transformación y carga (T y L del ETL) puedan proceder sin contratiempos. El objetivo final es crear un sistema de extracción que no solo funcione, sino que sea resiliente, mantenible y monitorizable.
Concepto Clave: Abstracción y Aislamiento en la Extracción
El principio fundamental para manejar múltiples fuentes es la abstracción. En lugar de tener un bloque monolítico de código que intente hablar con una API, luego con un servidor SFTP y después con una base de datos, debemos crear componentes aislados, cada uno especializado en una única fuente. Piensa en esto como un equipo de especialistas en una fábrica: tienes un experto en comunicación por satélite (APIs), un experto en logística de paquetes físicos (archivos) y un experto en archivos históricos (bases de datos). Cada uno sabe exactamente cómo interactuar con su sistema asignado, siguiendo protocolos estrictos. Airflow actúa como el gerente de planta que coordina a estos especialistas, asegurándose de que cada uno realice su trabajo en el momento correcto y en la secuencia adecuada, sin que un problema en un área detenga toda la producción.
Esta abstracción se logra en la práctica mediante el uso de funciones Python puras para cada tarea de extracción, encapsuladas dentro de PythonOperator. Cada función es responsable de una sola fuente, manejando su propia autenticación, paginación, descarga y escritura temporal de datos. El aislamiento garantiza que un fallo en la extracción de la API de Twitter no afecte la descarga del archivo de ventas diarias del servidor FTP. Además, permite probar, modificar y desplegar cada extractor de forma independiente, un aspecto crucial para el mantenimiento a largo plazo.
Cómo Funciona en la Práctica: Un Ejemplo Paso a Paso
Imaginemos que nuestro pipeline debe alimentar un dashboard de análisis de medios. Necesitamos datos de: 1) La API de NewsAPI para titulares, 2) Un archivo CSV alojado en un servidor SFTP con métricas de nuestro sitio web, y 3) Un archivo JSON estático desde una URL pública con información de geolocalización. El DAG se ejecutará diariamente a las 6 AM. El primer paso es definir las conexiones y variables dentro de la interfaz de Airflow (Admin -> Connections/Variables) para almacenar credenciales como claves de API, hosts SFTP y contraseñas, evitando así codificarlas.
El diseño del DAG creará tres tareas paralelas, cada una representando una fuente. Utilizaremos TaskGroup para agruparlas lógicamente bajo un nodo "Extracción" en la interfaz gráfica, mejorando la claridad. Cada tarea llamará a una función dedicada. Por ejemplo, extract_from_newsapi manejará la paginación, transformará la respuesta a un formato tabular (lista de diccionarios) y guardará los datos en un archivo temporal con una marca de tiempo en su nombre (ej., /tmp/news_headlines_20231027.json). La ruta de este archivo se pasará al siguiente paso (transformación) mediante XCom, pero con cuidado de no pasar los datos en sí, sino solo la ruta de referencia, para no sobrecargar la metabase de Airflow.
Un paso crítico posterior a todas las extracciones es una tarea de sensor o una tarea de verificación (check_extraction_success) que confirme que todos los archivos temporales esperados existen y tienen un tamaño mayor a cero. Esta tarea actuará como un punto de sincronización y control de calidad antes de proceder a la fase de transformación, implementando el patrón de barrera dentro del flujo del DAG.
Código en Acción: Implementación de un DAG de Extracción
A continuación, se presenta un esqueleto funcional y extensible de un DAG que implementa el diseño descrito. Nota la estructura de imports, la definición de argumentos por defecto, y el uso de context para obtener la fecha de ejecución.
from datetime import datetime, timedelta
import json
import pandas as pd
import requests
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.utils.task_group import TaskGroup
# Argumentos por defecto para el DAG
default_args = {
'owner': 'data_engineering',
'depends_on_past': False,
'email_on_failure': True,
'email': ['[email protected]'],
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
def extract_from_newsapi(**context):
"""
Extrae titulares de NewsAPI.
Los datos se guardan en un archivo temporal.
La ruta del archivo se empuja a XCom.
"""
import os
from airflow.hooks.base_hook import BaseHook
import requests
# 1. Obtener conexión desde Airflow
newsapi_conn = BaseHook.get_connection('newsapi_conn')
api_key = newsapi_conn.password # Se asume que la clave está en 'password'
execution_date = context['execution_date']
date_str = execution_date.strftime('%Y-%m-%d')
# 2. Configurar parámetros y hacer la solicitud
url = 'https://newsapi.org/v2/everything'
params = {
'q': 'tecnología',
'from': date_str,
'sortBy': 'publishedAt',
'apiKey': api_key,
'pageSize': 100,
'page': 1
}
all_articles = []
response = requests.get(url, params=params)
data = response.json()
if data.get('status') != 'ok':
raise ValueError(f"Error en API NewsAPI: {data.get('message')}")
all_articles.extend(data.get('articles', []))
# 3. Guardar datos en un archivo temporal
output_dir = '/tmp/airflow_extract'
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f'newsapi_{execution_date.isoformat()}.json')
with open(output_path, 'w') as f:
json.dump(all_articles, f)
# 4. Pasar la ruta del archivo vía XCom para la siguiente tarea
context['ti'].xcom_push(key='newsapi_file_path', value=output_path)
print(f"Datos de NewsAPI guardados en: {output_path}")
def extract_from_sftp_csv(**context):
"""
Descarga un archivo CSV desde un servidor SFTP.
"""
import os
from airflow.providers.sftp.hooks.sftp import SFTPHook
execution_date = context['execution_date']
date_str = execution_date.strftime('%Y%m%d')
# 1. Conectar al SFTP usando el hook (la conexión 'sftp_default' debe existir en Airflow)
sftp_hook = SFTPHook(ftp_conn_id='sftp_default')
remote_path = f'/incoming/metrics_website_{date_str}.csv'
local_dir = '/tmp/airflow_extract'
os.makedirs(local_dir, exist_ok=True)
local_path = os.path.join(local_dir, f'metrics_{execution_date.isoformat()}.csv')
# 2. Descargar el archivo
sftp_hook.retrieve_file(remote_path, local_path)
# 3. Pasar la ruta local vía XCom
context['ti'].xcom_push(key='sftp_file_path', value=local_path)
print(f"Archivo SFTP descargado a: {local_path}")
def extract_from_public_json(**context):
"""
Descarga un archivo JSON desde una URL pública.
"""
import os
import requests
execution_date = context['execution_date']
url = 'https://public-data.source.com/geolocation/countries.json'
response = requests.get(url)
response.raise_for_status() # Lanza error para códigos 4xx/5xx
data = response.json()
output_dir = '/tmp/airflow_extract'
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f'geodata_{execution_date.isoformat()}.json')
with open(output_path, 'w') as f:
json.dump(data, f)
context['ti'].xcom_push(key='public_json_file_path', value=output_path)
print(f"JSON público guardado en: {output_path}")
def verify_extraction(**context):
"""
Tarea de verificación: Confirma que todos los archivos extraídos existen.
"""
import os
ti = context['ti']
# Recuperar las rutas de los archivos desde XCom.
# Asumiendo que las tareas anteriores se llaman 'newsapi', 'sftp', 'public_json' dentro del TaskGroup.
paths = [
ti.xcom_pull(task_ids='extraction.newsapi', key='newsapi_file_path'),
ti.xcom_pull(task_ids='extraction.sftp', key='sftp_file_path'),
ti.xcom_pull(task_ids='extraction.public_json', key='public_json_file_path')
]
missing_files = []
for p in paths:
if not os.path.exists(p):
missing_files.append(p)
elif os.path.getsize(p) == 0:
missing_files.append(f"{p} (vacío)")
if missing_files:
raise FileNotFoundError(f"Fallo en la extracción. Archivos faltantes o vacíos: {missing_files}")
print("¡Verificación de extracción exitosa! Todos los archivos están presentes.")
# Definición del DAG
with DAG(
'proyecto_integrador_extraccion',
default_args=default_args,
description='DAG para extraer datos de múltiples fuentes (API, SFTP, URL)',
schedule_interval='0 6 * * *', # Diario a las 6 AM
start_date=datetime(2023, 10, 1),
catchup=False,
tags=['proyecto', 'etl', 'extraccion'],
) as dag:
with TaskGroup('extraction', tooltip='Grupo de tareas de extracción') as extraction_group:
task_newsapi = PythonOperator(
task_id='newsapi',
python_callable=extract_from_newsapi,
provide_context=True,
)
task_sftp = PythonOperator(
task_id='sftp',
python_callable=extract_from_sftp_csv,
provide_context=True,
)
task_public_json = PythonOperator(
task_id='public_json',
python_callable=extract_from_public_json,
provide_context=True,
)
# Las tareas dentro del grupo son paralelas por defecto.
# No definimos dependencias internas aquí, se ejecutan concurrentemente.
# Tarea de verificación que depende de la finalización de TODO el grupo de extracción.
verification_task = PythonOperator(
task_id='verify_extraction',
python_callable=verify_extraction,
provide_context=True,
)
# Establecer dependencias: El grupo completo se ejecuta primero, luego la verificación.
extraction_group >> verification_task
Errores Comunes y Cómo Evitarlos
1. No Manejar Límites de Tasa (Rate Limiting) en APIs: Muchas APIs públicas y privadas tienen límites estrictos de solicitudes por minuto/hora. Lanzar solicitudes en bucle sin pausas resultará en errores HTTP 429 (Too Many Requests) y el bloqueo potencial de tu clave. Cómo evitarlo: Implementa lógica de backoff exponencial en tus solicitudes, usando bibliotecas como tenacity o backoff. Respeta siempre los headers de respuesta como X-RateLimit-Remaining y Retry-After.
2. Almacenar Credenciales en el Código del DAG: Hardcodear claves de API, usuarios y contraseñas en los scripts de Python es un grave riesgo de seguridad y una pesadilla para la gestión. Cómo evitarlo: Utiliza siempre el sistema de Connections y Variables de Airflow. Accede a ellas en tu código mediante BaseHook.get_connection('conn_id') o Variable.get('var_name'). Esto centraliza y cifra la gestión de secretos.
3. Sobrecargar XCom con Grandes Volúmenes de Datos: XCom está diseñado para mensajes pequeños (como rutas de archivo, IDs, o banderas), no para datasets completos. Pasar un DataFrame de 500MB a través de XCom colapsará la base de datos metadatos de Airflow. Cómo evitarlo: Sigue el patrón demostrado: guarda los datos extraídos en un sistema de archivos temporal compartido (por ejemplo, /tmp en una ejecución local) o, mejor aún, en un almacenamiento intermedio como S3, GCS o HDFS, y pasa solo la ruta o URI del archivo a través de XCom.
4. Falta de Idempotencia en la Extracción: Un DAG puede reintentarse o reprogramarse. Si tu tarea de extracción simplemente descarga un archivo y lo sobrescribe, podrías perder datos o crear duplicados. Cómo evitarlo: Diseña tus funciones de extracción para que sean idempotentes. Usa la fecha de ejecución (execution_date) en el nombre del archivo de salida. Antes de descargar, verifica si el archivo destino ya existe y tiene el contenido esperado. Para APIs, utiliza parámetros de rango de fechas basados en execution_date para garantizar que siempre extraes el mismo conjunto de datos en un reintento.
5. No Aislar Fallos entre Fuentes Independientes: Si estructuras tu DAG como una secuencia lineal donde la extracción de la API debe terminar antes de iniciar la del SFTP, un fallo en la primera detendrá todo el pipeline, incluso si los datos del SFTP son críticos y estaban disponibles. Cómo evitarlo: Paraleliza las tareas de extracción que son independientes entre sí usando TaskGroups o simplemente definiendo dependencias adecuadas. Considera el uso de trigger_rules como 'all_done' o 'one_success' en tareas de agrupación posteriores para controlar el flujo incluso cuando algunas extracciones fallen.
Checklist de Dominio
Antes de considerar completado el diseño de tu módulo de extracción, verifica que cumples con los siguientes criterios:
- He definido y configurado todas las conexiones (APIs, SFTP, Bases de Datos) en la interfaz de Airflow, sin credenciales en el código.
- Cada fuente de datos tiene su propia función Python dedicada, manejando autenticación, errores y formato específico.
- Las tareas de extracción para fuentes no dependientes están configuradas para ejecutarse en paralelo, optimizando el tiempo total de ejecución.
- Los datos extraídos se escriben en archivos o almacenamiento temporal con nombres que incluyen la execution_date para garantizar idempotencia.
- Utilizo XCom solo para pasar metadatos (rutas, conteos, estados), nunca datasets completos.
- He implementado una tarea de verificación o sensor que actúa como barrera, confirmando la integridad de todos los datos extraídos antes de proceder a la transformación.
- Mi DAG incluye mecanismos de reintento (retries y retry_delay) configurados a nivel de tarea o DAG, y manejo explícito de excepciones comunes (conexión, timeout, formato).
- He considerado y, si es necesario, implementado lógica para manejar límites de tasa (rate limiting) y paginación en las APIs consumidas.
Tip de Producción: Para extracciones de archivos muy grandes o APIs con respuestas masivas, considera usar el PythonVirtualenvOperator o DockerOperator para aislar las dependencias de bibliotecas específicas (como SDKs de proveedores de nube) y evitar conflictos con el entorno principal de Airflow. Además, monitorea el uso de memoria de tus tareas de extracción; si son muy intensivas, podrían afectar al scheduler y otros DAGs en ejecución.