Implementar el DAG con Extracción de Múltiples Fuentes

Lectura
25 min~11 min lectura
Objetivo de la lección

Introducción a la Extracción de Múltiples Fuentes en un DAG de Airflow En lecciones anteriores, hemos construido DAGs que interactúan con una sola fuente de datos.

Puntos de control
  • Introducción a la Extracción de Múltiples Fuentes en un DAG de Airflow
  • Concepto Clave: Orquestación de Fuentes Heterogéneas
  • Cómo Funciona en la Práctica: Un Ejemplo Paso a Paso
  • Código en Acción: DAG para Extracción de Tres Fuentes

Introducción a la Extracción de Múltiples Fuentes en un DAG de Airflow

En lecciones anteriores, hemos construido DAGs que interactúan con una sola fuente de datos. Sin embargo, la realidad de los pipelines de datos en producción es considerablemente más compleja. Un pipeline ETL complejo típicamente consume información de diversos orígenes: bases de datos SQL y NoSQL, APIs RESTful, archivos planos en sistemas cloud storage, streams de datos, y más. La orquestación efectiva de estas extracciones concurrentes o secuenciales es donde Apache Airflow demuestra su verdadero valor.

Esta lección se centra en la implementación de un DAG diseñado para manejar la extracción de múltiples fuentes de manera robusta, eficiente y monitoreable. Abordaremos estrategias para estructurar las tareas, manejar dependencias, gestionar errores de forma granular y asegurar que el proceso completo sea tolerante a fallos. El objetivo es pasar de un DAG académico a uno listo para enfrentar las inconsistencias y latencias del mundo real, sentando las bases para las etapas de transformación y carga que cubriremos en el proyecto integrador.

La complejidad no reside solo en ejecutar múltiples tareas PythonOperator, sino en diseñar un flujo lógico que optimice el tiempo de ejecución, maneje credenciales de forma segura, y proporcione puntos de observabilidad claros. Un diseño pobre puede llevar a cuellos de botella, dependencias ocultas y una pesadilla en el debugging. Por ello, priorizaremos la claridad, la modularidad y las buenas prácticas de la comunidad.

Concepto Clave: Orquestación de Fuentes Heterogéneas

Imagina que eres el director de una orquesta sinfónica. Tu trabajo no es tocar cada instrumento (el violín, el oboe, el timbal), sino asegurarte de que cada sección comience en el momento preciso, con la partitura correcta y al tempo adecuado. Si la sección de vientos tiene un problema con sus partituras, tú, como director, no vas a arreglar las anotaciones en ese momento, pero sí debes decidir si la orquesta puede continuar sin ellos o si es necesario detener la ejecución. En este símil, Apache Airflow es el director, y cada tarea de extracción de una fuente (una consulta SQL, una llamada a una API, la lectura de un archivo CSV de S3) es una sección de la orquesta.

El concepto fundamental aquí es la orquestación declarativa. Nosotros, como ingenieros, declaramos en el código del DAG: "Estas son las fuentes, estas son las tareas que extraen de ellas, y este es el orden o las condiciones en que deben ejecutarse". Airflow se encarga de ejecutar esa partitura, supervisando cada "instrumento". Algunas extracciones pueden ejecutarse en paralelo (como los violines y los violonchelos tocando al mismo tiempo) si no dependen entre sí. Otras deben ser secuenciales (el solo de flauta que viene después de un crescendo general). La dependencia de datos es la regla que rige esta secuencia.

Gestionar múltiples fuentes añade capas de complejidad operativa: distintos tiempos de espera, formatos de error, requisitos de autenticación y ventanas de disponibilidad. Un buen DAG debe estar diseñado para aislar los fallos, de modo que un problema en la API del tiempo no bloquee la extracción de datos de ventas de la base de datos transaccional, a menos que la lógica de negocio lo exija explícitamente. Esta tolerancia a fallos granular se configura mediante mecanismos como retries, branching y el manejo adecuado de excepciones dentro de cada tarea.

Cómo Funciona en la Práctica: Un Ejemplo Paso a Paso

Vamos a desglosar el proceso de implementación para un caso concreto: un pipeline que alimenta un dashboard de análisis de negocio. Necesitamos datos de: 1) Una base de datos PostgreSQL de ventas, 2) Una API REST de un servicio de métricas de marketing, y 3) Un archivo JSON que llega diariamente a un bucket de Amazon S3 con datos de logística. El objetivo es extraer todos estos datos, depositarlos en una zona de landing o raw (por ejemplo, en S3 en formato Parquet) y luego proceder con la transformación unificada.

Paso 1: Diseño del Grafo de Dependencias. Primero, analizamos las dependencias. La extracción de la API de marketing y del archivo de S3 no dependen de nada más y pueden ejecutarse en paralelo. La extracción de PostgreSQL, aunque independiente, podría ser pesada y optamos por ejecutarla después de las dos primeras para no saturar recursos. Definimos el flujo: tarea_inicio -> [extraer_api, extraer_s3] -> extraer_postgres -> tarea_fin. Usaremos EmptyOperator para los puntos de sincronización.

Paso 2: Configuración de Conexiones y Hooks. En la interfaz web de Airflow, preconfiguramos las conexiones (Connections): una para PostgreSQL (tipo Postgres, con host, login, schema), otra para la API (tipo HTTP, con host y posiblemente auth), y otra para S3 (tipo S3, con credenciales). En nuestro código, usaremos los Hooks (PostgresHook, HttpHook, S3Hook). Los Hooks abstraen la gestión de conexiones y sesiones, son seguros y aprovechan el pool de conexiones de Airflow.

Paso 3: Implementación de Funciones Python Callable. Cada tarea de extracción será un PythonOperator que llama a una función dedicada. Dentro de cada función: inicializamos el Hook adecuado, realizamos la operación (consulta, llamada GET, descarga de archivo), aplicamos un preprocesamiento mínimo (como convertir el resultado de la API a DataFrame de Pandas) y guardamos el resultado en el sistema de archivos temporal de Airflow o lo subimos directamente a la zona de landing. Es crucial que cada función sea idempotente (ejecutarla múltiples veces produce el mismo resultado) y maneje sus propios errores de forma elegante, registrando información útil.

Paso 4: Construcción del DAG con Paralelismo y Manejo de Errores. En el cuerpo principal del script del DAG, instanciamos los operadores y definimos sus dependencias usando los operadores bitshift (>> y <<). Configuramos retries, retry_delay y email_on_failure a nivel de DAG o de tarea. Consideramos si alguna tarea es tan crítica que su fallo debe detener el pipeline (trigger_rule='all_done' por defecto es adecuado). Finalmente, probamos el DAG en el entorno de desarrollo mediante la vista Tree o ejecutando tareas manualmente.

Código en Acción: DAG para Extracción de Tres Fuentes

A continuación, presentamos un DAG funcional y completo que implementa el escenario descrito. Este código asume que las conexiones de Airflow 'postgres_warehouse', 'http_marketing_api' y 'aws_s3_landing' están correctamente configuradas. Utilizamos la TaskFlow API (introducida en Airflow 2.x) por su simplicidad y legibilidad, aunque el mismo flujo puede construirse con la API tradicional.


from datetime import datetime, timedelta
import json
import pandas as pd
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.exceptions import AirflowException

# Argumentos por defecto para el DAG
default_args = {
    'owner': 'data_engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['[email protected]'],
    'retries': 2,
    'retry_delay': timedelta(minutes=3),
}

# Definición del DAG
with DAG(
    dag_id='proyecto_integrador_extraccion_multifuente',
    default_args=default_args,
    description='DAG de extracción desde PostgreSQL, API HTTP y S3',
    schedule_interval='@daily',
    start_date=datetime(2023, 10, 1),
    catchup=False,
    tags=['proyecto_integrador', 'etl', 'extraccion'],
) as dag:

    @task(task_id='extraer_datos_postgres')
    def extraer_postgres(**context):
        """Extrae datos de ventas diarias desde PostgreSQL."""
        hook = PostgresHook(postgres_conn_id='postgres_warehouse')
        sql = """
            SELECT order_id, customer_id, product_id, quantity, amount, order_date
            FROM sales.orders
            WHERE order_date = CURRENT_DATE - INTERVAL '1 day'
        """
        try:
            df = hook.get_pandas_df(sql)
            execution_date = context['logical_date']
            file_path = f'/tmp/ventas_{execution_date.strftime("%Y%m%d")}.parquet'
            df.to_parquet(file_path, index=False)
            
            # Subir a S3 (zona raw)
            s3_hook = S3Hook(aws_conn_id='aws_s3_landing')
            s3_key = f'raw/ventas/{execution_date.strftime("%Y/%m/%d")}/datos.parquet'
            s3_hook.load_file(
                filename=file_path,
                key=s3_key,
                bucket_name='mi-bucket-datalake',
                replace=True
            )
            return {'status': 'success', 'rows_extracted': len(df), 's3_key': s3_key}
        except Exception as e:
            raise AirflowException(f"Fallo en extracción PostgreSQL: {str(e)}")

    @task(task_id='extraer_datos_api_marketing')
    def extraer_api(**context):
        """Extrae métricas de campañas desde una API REST externa."""
        http_hook = HttpHook(http_conn_id='http_marketing_api', method='GET')
        execution_date = context['logical_date']
        endpoint = f'/v1/metrics?date={execution_date.strftime("%Y-%m-%d")}'
        
        try:
            response = http_hook.run(endpoint)
            response.raise_for_status()
            data = response.json()
            
            # Normalizar la respuesta JSON anidada
            df = pd.json_normalize(data, 'campaigns', ['report_date'])
            file_path = f'/tmp/marketing_{execution_date.strftime("%Y%m%d")}.parquet'
            df.to_parquet(file_path, index=False)
            
            # Subir a S3
            s3_hook = S3Hook(aws_conn_id='aws_s3_landing')
            s3_key = f'raw/marketing/{execution_date.strftime("%Y/%m/%d")}/datos.parquet'
            s3_hook.load_file(
                filename=file_path,
                key=s3_key,
                bucket_name='mi-bucket-datalake',
                replace=True
            )
            return {'status': 'success', 'campaigns': len(df), 's3_key': s3_key}
        except Exception as e:
            raise AirflowException(f"Fallo en extracción de API: {str(e)}")

    @task(task_id='extraer_archivo_json_s3')
    def extraer_json_s3(**context):
        """Descarga y procesa un archivo JSON de logística subido a un bucket S3 fuente."""
        s3_hook = S3Hook(aws_conn_id='aws_s3_landing')
        execution_date = context['logical_date']
        source_key = f'incoming/logistics/logistics_data_{execution_date.strftime("%Y%m%d")}.json'
        local_path = f'/tmp/logistics_{execution_date.strftime("%Y%m%d")}.json'
        
        try:
            # Descargar desde el bucket fuente
            s3_hook.download_file(
                key=source_key,
                bucket_name='fuente-externa-logistica',
                local_path=local_path
            )
            
            with open(local_path, 'r') as f:
                data = json.load(f)
            
            df = pd.DataFrame(data['shipments'])
            parquet_path = local_path.replace('.json', '.parquet')
            df.to_parquet(parquet_path, index=False)
            
            # Subir a nuestro bucket de landing/raw
            target_key = f'raw/logistics/{execution_date.strftime("%Y/%m/%d")}/datos.parquet'
            s3_hook.load_file(
                filename=parquet_path,
                key=target_key,
                bucket_name='mi-bucket-datalake',
                replace=True
            )
            return {'status': 'success', 'shipments': len(df), 's3_key': target_key}
        except Exception as e:
            raise AirflowException(f"Fallo en extracción de JSON desde S3: {str(e)}")

    @task(task_id='validar_extracciones')
    def validar_extracciones(postgres_result, api_result, s3_result):
        """Tarea de validación que consolida los metadatos de las extracciones."""
        # Lógica de validación simple: verificar que cada extracción reportó 'success'
        results = [postgres_result, api_result, s3_result]
        for res in results:
            if res.get('status') != 'success':
                raise AirflowException(f"Una extracción falló: {res}")
        
        # Registrar un resumen
        print(f"Resumen de extracción:")
        print(f"  - Ventas (PostgreSQL): {postgres_result.get('rows_extracted')} filas.")
        print(f"  - Marketing (API): {api_result.get('campaigns')} campañas.")
        print(f"  - Logística (S3): {s3_result.get('shipments')} envíos.")
        return {'status': 'validated', 'message': 'Todas las extracciones fueron exitosas.'}

    # Orquestación del flujo: ejecutar extracciones en paralelo y luego validar
    datos_postgres = extraer_postgres()
    datos_api = extraer_api()
    datos_s3 = extraer_json_s3()
    
    # La tarea de validación se ejecuta sólo cuando las tres extracciones han finalizado (éxito o fallo).
    resultado_validacion = validar_extracciones(datos_postgres, datos_api, datos_s3)

Este DAG define claramente tres tareas de extracción independientes que se ejecutan en paralelo. Cada función utiliza el Hook apropiado, maneja errores específicos y carga los datos procesados (en formato Parquet) a una zona raw en S3, organizada por fecha y fuente. La tarea final validar_extracciones recoge los resultados de las tres tareas previas (gracias al paso de argumentos automático de la TaskFlow API) y realiza una comprobación de integridad básica. Si alguna extracción falla, su excepción será propagada y la validación fallará, marcando el DAG como fallido.

Tip Importante: Observa el uso de context['logical_date'] para obtener la fecha de ejecución programada del DAG. Es fundamental usar este parámetro en lugar de datetime.now() para garantizar la idempotencia y el correcto funcionamiento durante los backfills. Además, la organización de las claves en S3 con el patrón raw/<fuente>/<año>/<mes>/<dia>/ es una buena práctica para la gobernanza de datos y el procesamiento posterior.

Errores Comunes y Cómo Evitarlos

1. No Aislar los Fallos entre Fuentes: Un error común es envolver toda la extracción en un gran bloque try-except o hacer que el fallo de una fuente detenga inmediatamente las demás sin que la lógica de negocio lo requiera. Cómo evitarlo: Diseña cada tarea de extracción como una unidad independiente con su propio manejo de errores. Usa trigger_rule adecuadas en tareas aguas abajo (como 'all_done' o 'one_success') para controlar el flujo posterior a los fallos.

2. Credenciales y Conexiones Hardcodeadas en el Código: Incluir contraseñas, API keys o URLs completas en el script del DAG es un grave riesgo de seguridad y una pesadilla de mantenimiento. Cómo evitarlo: Utiliza siempre el mecanismo de Connections de Airflow para almacenar credenciales. En tu código, referencia solo el conn_id. Para variables adicionales, usa Airflow Variables.

3. No Considerar la Idempotencia: Si un DAG se re-ejecuta (backfill o retry), una extracción no idempotente podría duplicar datos o causar inconsistencias. Cómo evitarlo: Asegúrate de que tus operaciones de extracción y escritura sean idempotentes. Usa claves únicas en los destinos, emplea operaciones de "upsert" o "replace", y diseña consultas y llamadas a API que, para una misma fecha de ejecución, siempre devuelvan el mismo conjunto de datos.

4. Paralelismo Excesivo que Satura los Sistemas Fuente: Lanzar 50 conexiones concurrentes a una base de datos operacional puede degradar su rendimiento. Cómo evitarlo: Utiliza los parámetros de pool de Airflow para limitar la concurrencia a un mismo recurso. Asigna un pool específico (ej: 'postgres_extraccion') con, por ejemplo, 5 slots, y asígnalo a las tareas que consulten esa base de datos.

5. Mala Gestión de los Archivos Temporales: Las tareas que crean archivos grandes en /tmp pueden llenar el disco del worker. Cómo evitarlo: Limpia siempre los archivos temporales después de subirlos al almacenamiento definitivo (S3, GCS, HDFS). Puedes hacerlo en un bloque finally o crear una pequeña función de limpieza. Considera usar el almacenamiento temporal proporcionado por el contexto (context['ti'].xcom_push para datos pequeños) o sistemas de archivos efímeros montados.

Checklist de Dominio

Antes de considerar esta lección completa, asegúrate de poder verificar los siguientes puntos en tu implementación:

  • Puedo diseñar un grafo de dependencias para al menos tres fuentes de datos distintas, combinando ejecución en paralelo y en serie según requiera el caso de uso.
  • Sé configurar y utilizar correctamente Hooks de Airflow (PostgresHook, HttpHook, S3Hook/GCSHook) para abstraer la gestión de conexiones dentro de mis tareas.
  • Implemento funciones de extracción idempotentes que manejan sus excepciones de forma específica y registran información útil para el debugging.
  • Organizo los datos extraídos en una estructura de almacenamiento persistente (como S3) con un patrón de nombres que incluya fuente, fecha y que facilite el consumo posterior.
  • Utilizo el parámetro context['logical_date'] en mis consultas y rutas de archivo para garantizar la correcta ejecución en backfills.
  • He configurado retries, retry_delay y notificaciones por correo electrónico a nivel de tarea o DAG para manejar fallos transitorios.
  • Comprendo cómo usar pools para limitar la carga concurrente sobre un sistema fuente y he asignado pools a mis tareas donde sea necesario.
  • Puedo explicar la diferencia entre trigger_rule='all_success' y trigger_rule='all_done' y sé cuál aplicar en una tarea de validación o consolidación.
Falar no WhatsApp
Laboratorio de práctica

Antes de marcar esta lección como completa, escribí una evidencia breve para Apache Airflow: Orquestación de Pipelines ETL Complejos con DAGs: un ejemplo, una decisión, una captura, una mini demo o una nota que puedas reutilizar en portfolio.

Reflexión rápida

¿Qué cambiarías en tu forma de trabajar después de aplicar implementar el dag con extracción de múltiples fuentes?

De lección a portfolio

Convertí esta lección en evidencia para Data Analyst.

Sumá un mini caso con datos, una conclusión de negocio y una captura del resultado. Eso pesa más que decir que viste la herramienta.

Paso 1

Publicá una consulta, dashboard o notebook con una conclusión clara.

Paso 2

Agregá contexto: problema, dato usado, decisión recomendada y limitación.

Paso 3

Guardá el enlace en tu CV, LinkedIn o portfolio antes de postular.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión

Implementar el DAG con Extracción de Múltiples... | Cursalo