Práctica: Implementar y Ejecutar el Pipeline Completo en tu Entorno Local

Lectura
40 min~10 min lectura

Práctica: Implementar y Ejecutar el Pipeline Completo en tu Entorno Local

Has llegado al momento culminante del curso. En esta lección, pasarás de los conceptos teóricos y los ejercicios fragmentados a la construcción y ejecución de un pipeline ETL completo utilizando Apache Airflow en tu propia máquina. Este proyecto integrador simulará un escenario realista: procesar datos de ventas de una tienda online, enriquecerlos con información de clientes, realizar agregaciones y finalmente cargar los resultados en un sistema de reportes. El objetivo es que experimentes de primera mano el flujo de trabajo end-to-end, desde la definición del DAG hasta la monitorización de su ejecución, consolidando todo el conocimiento adquirido.

Implementarás un DAG que orquestará múltiples tareas interdependientes, manejará errores, utiliza XComs para comunicación entre tareas y se apoya en Hooks para interactuar con sistemas externos de manera eficiente. Configurarás conexiones en la interfaz web de Airflow y validarás cada paso del proceso. Esta práctica está diseñada para el nivel intermedio, asumiendo que ya tienes un entorno de Airflow funcionando localmente (por ejemplo, usando el Astro CLI, Docker Compose, o una instalación directa) y comprendes los fundamentos de Python, DAGs y operadores.

Concepto Clave: Orquestación como un Director de Orquesta

En el núcleo de esta práctica está el concepto de orquestación. Airflow no es una herramienta de procesamiento de datos per se (como Spark o Pandas), sino un orquestador. Su función principal es programar, ejecutar y monitorizar un conjunto de tareas, asegurándose de que se ejecuten en el orden correcto, con las dependencias adecuadas, en el momento programado y manejando los reintentos en caso de fallo. Piensa en Airflow como el director de una gran orquesta sinfónica.

Cada músico (una tarea como extraer_datos, transformar_archivo o cargar_en_redshift) es un experto en su instrumento. El director (Airflow) no le dice al violinista cómo tocar cada nota, pero sí cuándo debe entrar la sección de cuerdas, coordina su tempo con los vientos y se asegura de que todos sigan la partitura (el DAG). Si un trompetista falla (una tarea falla), el director puede indicarle que reintente el pasaje (retry). El resultado final es una sinfonía armoniosa (el pipeline ETL exitoso), producto de una coordinación impecable.

En nuestro proyecto, el DAG es la partitura que define la melodía del pipeline. Los operadores son los músicos individuales. Los sensores y hooks son los asistentes que esperan a que el escenario esté listo. Comprender esta distinción es vital: tu código en las tareas hace el trabajo pesado, pero Airflow se encarga de la logística compleja de ejecutarlo de manera fiable y programada.

Cómo Funciona en la Práctica: Paso a Paso del Pipeline

Vamos a desglosar el flujo de nuestro pipeline complejo. Primero, necesitamos un objetivo claro: "Generar un reporte diario de ventas por categoría de producto y segmento de cliente". Los datos brutos residen en dos fuentes: un archivo CSV simulado de ventas en un servidor SFTP y una tabla en una base de datos PostgreSQL con información de clientes. El destino será una tabla en esa misma PostgreSQL para consumo de un dashboard.

El proceso paso a paso es el siguiente: 1) Una tarea sensora verifica que el archivo de ventas del día esté disponible en el SFTP. 2) Una vez disponible, una tarea de extracción descarga ese archivo CSV a un directorio local temporal. 3) En paralelo, otra tarea extrae los datos de clientes de PostgreSQL. 4) Una tarea de transformación recibe ambos conjuntos de datos (usando XComs), los limpia, une (join) y agrega (group by) para calcular las ventas por categoría y segmento. 5) Una tarea de validación comprueba la calidad de los datos transformados (por ejemplo, que no haya valores negativos). 6) Finalmente, una tarea de carga inserta los resultados en la tabla de reportes. Todo esto está envuelto en un DAG con una programación diaria.

La magia de Airflow se ve en cómo maneja este flujo. Si la extracción de PostgreSQL falla, la tarea de transformación no se ejecutará porque depende de ella. Si la validación falla, la carga se omitirá. Airflow mantiene el estado de cada tarea (éxito, fallo, en ejecución) y proporciona logs detallados para cada una. Al finalizar, puedes ver en la interfaz una gráfica de dependencias (Graph View) que muestra el camino verde del éxito a través de todos los nodos, una confirmación visual de que tu orquesta tocó a la perfección.

Código en Acción: El DAG Completo del Proyecto Integrador

A continuación, presentamos el código completo del DAG para nuestro pipeline. Este es un ejemplo funcional que puedes adaptar y ejecutar en tu entorno. Necesitarás configurar previamente las conexiones de Airflow para sftp_default y postgres_default.


from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.sftp.hooks.sftp import SFTPHook
import pandas as pd
import os

# Argumentos por defecto para el DAG
default_args = {
    'owner': 'equipo_etl',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['[email protected]'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

# Definición del DAG
with DAG(
    'proyecto_integrador_pipeline_ventas',
    default_args=default_args,
    description='Pipeline ETL diario para reporte de ventas',
    schedule_interval='0 2 * * *',  # Ejecución diaria a las 2 AM
    start_date=datetime(2023, 10, 1),
    catchup=False,
    tags=['ventas', 'reporte', 'etl'],
) as dag:

    inicio = DummyOperator(task_id='inicio')
    fin = DummyOperator(task_id='fin')

    # 1. Sensor para archivo en SFTP
    esperar_archivo = SFTPSensor(
        task_id='esperar_archivo_ventas',
        sftp_conn_id='sftp_default',
        path='/datos_entrada/ventas_{{ ds_nodash }}.csv',  # ds_nodash es la fecha de ejecución en formato YYYYMMDD
        mode='reschedule',
        poke_interval=60,  # Revisa cada 60 segundos
        timeout=3600,      # Timeout después de 1 hora
    )

    # 2. Función para extraer datos del SFTP
    def extraer_datos_sftp(**context):
        sftp_hook = SFTPHook(ftp_conn_id='sftp_default')
        fecha_exec = context['ds_nodash']
        archivo_remoto = f'/datos_entrada/ventas_{fecha_exec}.csv'
        archivo_local = f'/tmp/ventas_{fecha_exec}.csv'
        sftp_hook.retrieve_file(archivo_remoto, archivo_local)
        context['ti'].xcom_push(key='ruta_archivo_ventas', value=archivo_local)
        return archivo_local

    tarea_extraer_sftp = PythonOperator(
        task_id='extraer_datos_sftp',
        python_callable=extraer_datos_sftp,
        provide_context=True,
    )

    # 3. Función para extraer datos de clientes de PostgreSQL
    def extraer_datos_clientes(**context):
        pg_hook = PostgresHook(postgres_conn_id='postgres_default')
        sql = "SELECT id_cliente, nombre, email, segmento FROM clientes WHERE estado = 'activo';"
        connection = pg_hook.get_conn()
        df_clientes = pd.read_sql(sql, connection)
        archivo_clientes = f'/tmp/clientes_{context["ds_nodash"]}.parquet'
        df_clientes.to_parquet(archivo_clientes, index=False)
        context['ti'].xcom_push(key='ruta_archivo_clientes', value=archivo_clientes)
        return archivo_clientes

    tarea_extraer_clientes = PythonOperator(
        task_id='extraer_datos_clientes',
        python_callable=extraer_datos_clientes,
        provide_context=True,
    )

    # 4. Función para transformar y unir datos
    def transformar_y_unir(**context):
        ti = context['ti']
        ruta_ventas = ti.xcom_pull(task_ids='extraer_datos_sftp', key='ruta_archivo_ventas')
        ruta_clientes = ti.xcom_pull(task_ids='extraer_datos_clientes', key='ruta_archivo_clientes')

        df_ventas = pd.read_csv(ruta_ventas)
        df_clientes = pd.read_parquet(ruta_clientes)

        # Limpieza y transformación
        df_ventas['fecha'] = pd.to_datetime(df_ventas['fecha'])
        df_ventas['total'] = df_ventas['cantidad'] * df_ventas['precio_unitario']

        # Unión (join) y agregación
        df_completo = pd.merge(df_ventas, df_clientes, on='id_cliente', how='inner')
        df_agregado = df_completo.groupby(['categoria_producto', 'segmento'], as_index=False).agg({
            'total': 'sum',
            'id_venta': 'count'
        }).rename(columns={'id_venta': 'numero_ventas', 'total': 'venta_total'})

        # Guardar resultado intermedio
        archivo_resultado = f'/tmp/reporte_agregado_{context["ds_nodash"]}.parquet'
        df_agregado.to_parquet(archivo_resultado, index=False)
        ti.xcom_push(key='ruta_reporte_agregado', value=archivo_resultado)
        return archivo_resultado

    tarea_transformar = PythonOperator(
        task_id='transformar_y_unir_datos',
        python_callable=transformar_y_unir,
        provide_context=True,
    )

    # 5. Función para validar datos
    def validar_datos(**context):
        ti = context['ti']
        ruta_reporte = ti.xcom_pull(task_ids='transformar_y_unir_datos', key='ruta_reporte_agregado')
        df = pd.read_parquet(ruta_reporte)

        # Reglas de validación
        errores = []
        if (df['venta_total'] < 0).any():
            errores.append("Existen valores negativos en 'venta_total'.")
        if df.isnull().values.any():
            errores.append("Existen valores nulos en el dataset.")
        if df.empty:
            errores.append("El DataFrame resultante está vacío.")

        if errores:
            raise ValueError(f"Validación fallida: {' '.join(errores)}")
        else:
            print("Validación superada exitosamente.")
            return "Datos válidos"

    tarea_validar = PythonOperator(
        task_id='validar_datos_transformados',
        python_callable=validar_datos,
        provide_context=True,
    )

    # 6. Función para cargar a PostgreSQL
    def cargar_a_postgres(**context):
        ti = context['ti']
        ruta_reporte = ti.xcom_pull(task_ids='transformar_y_unir_datos', key='ruta_reporte_agregado')
        df = pd.read_parquet(ruta_reporte)

        pg_hook = PostgresHook(postgres_conn_id='postgres_default')
        engine = pg_hook.get_sqlalchemy_engine()

        # Cargar en la tabla de reportes. 'replace' para idempotencia diaria.
        df.to_sql('reporte_ventas_diario', engine, if_exists='append', index=False, method='multi')
        print(f"Datos cargados exitosamente. {len(df)} registros insertados.")

    tarea_cargar = PythonOperator(
        task_id='cargar_a_postgres',
        python_callable=cargar_a_postgres,
        provide_context=True,
    )

    # Definición de las dependencias del DAG
    inicio >> esperar_archivo >> tarea_extraer_sftp
    inicio >> tarea_extraer_clientes

    [tarea_extraer_sftp, tarea_extraer_clientes] >> tarea_transformar >> tarea_validar >> tarea_cargar >> fin

Este DAG integra múltiples componentes clave. Observa el uso de provide_context=True para pasar el contexto de ejecución, que incluye variables como ds_nodash (la fecha de ejecución) y el objeto ti (task instance) para manejar XComs. Las tareas de extracción de SFTP y PostgreSQL se ejecutan en paralelo después del inicio, gracias a la definición de dependencias. El sensor SFTPSensor es crucial para esperar de manera eficiente a que el archivo fuente esté disponible sin consumir recursos constantemente.

Errores Comunes y Cómo Evitarlos

Al implementar un pipeline de esta complejidad, es habitual encontrarse con ciertos obstáculos. Identificarlos de antemano te ahorrará horas de depuración.

1. Conexiones no configuradas: El error "airflow.exceptions.AirflowNotFoundException: The conn_id `sftp_default` is not defined" es el más común. Airflow necesita que las conexiones a bases de datos, SFTP, APIs, etc., estén definidas en su metadatos. Cómo evitarlo: Antes de ejecutar el DAG, ve a la interfaz web de Airflow (Admin -> Connections) y crea las conexiones necesarias con los parámetros correctos (host, login, puerto, contraseña). Usa el mismo conn_id que en tu código.

2. Problemas con rutas de archivos y permisos: Las tareas que escriben en /tmp/ pueden fallar si el directorio no existe o el usuario bajo el que corre Airflow no tiene permisos de escritura. Cómo evitarlo: Usa rutas absolutas y verifica los permisos del usuario de Airflow. Considera usar un volumen compartido si ejecutas Airflow en Docker. Siempre limpia los archivos temporales en tareas posteriores para no llenar el disco.

3. Mal manejo de XComs para datos grandes: XComs está diseñado para mensajes pequeños (como rutas de archivo o flags), no para DataFrames completos. Si intentas hacer xcom_push con un DataFrame de 1GB, excederás el límite de la base de datos de Airflow y la tarea fallará. Cómo evitarlo: Sigue el patrón del ejemplo: usa XComs solo para pasar rutas de archivo o parámetros clave. Los datos pesados deben persistirse en sistemas de archivos o almacenamiento de objetos (S3, GCS) y solo la referencia viaja por XCom.

4. Falta de idempotencia: Un pipeline debe poder ejecutarse múltiples veces sobre los mismos datos de entrada y producir el mismo resultado sin duplicados o efectos secundarios. Un error común es usar INSERT sin una clave que evite duplicados, lo que genera registros repetidos en cada re-ejecución. Cómo evitarlo: Diseña tus tareas para ser idempotentes. En la carga, puedes usar TRUNCATE + INSERT o INSERT ... ON CONFLICT en PostgreSQL. En las transformaciones, asegúrate de que los archivos de salida se sobrescriban.

5. No considerar el tiempo de zona horaria en schedule_interval: Si defines start_date como datetime(2023, 10, 1) y schedule_interval='@daily', Airflow, por defecto, usa UTC. Si tu negocio opera en otra zona horaria, las ejecuciones se programarán en horarios inesperados. Cómo evitarlo: Sé explícito con las zonas horarias. Usa pendulum para definir start_date con zona horaria: pendulum.datetime(2023, 10, 1, tz="America/Mexico_City"). También puedes ajustar la hora en el schedule_interval cron, por ejemplo, '0 2 * * *' para las 2 AM UTC.

Checklist de Dominio

Antes de considerar esta lección completamente interiorizada y el proyecto integrador como exitoso, verifica que puedes realizar y comprender cada uno de los siguientes puntos:

  • Configurar correctamente al menos dos conexiones diferentes (ej., PostgreSQL y SFTP) en la interfaz web de Airflow.
  • Explicar la diferencia entre un operador y un sensor, y cuándo usar cada uno en un caso práctico.
  • Implementar un DAG que ejecute tareas en paralelo y luego converja en una tarea dependiente de todas ellas.
  • Utilizar XComs para pasar información entre tareas, entendiendo sus limitaciones de tamaño y mejores prácticas.
  • Manejar correctamente el contexto de ejecución (ds_nodash, ti) en funciones PythonOperator.
  • Escribir una tarea de validación de datos que falle explícitamente si no se cumplen ciertas reglas de calidad.
  • Ejecutar el DAG completo desde la interfaz, interpretar la Graph View y analizar los logs de una tarea fallida para diagnosticar el problema.
  • Modificar el DAG para hacerlo idempotente, asegurando que una re-ejecución manual no corrompa los datos de destino.

Tip Crítico: No subestimes el poder de los logs de Airflow. Cuando una tarea falle, haz clic en ella en la Graph View y luego en "Log". Los mensajes de error de Python y los prints de tu código estarán allí. Es tu principal herramienta de depuración. Además, activa el email_on_failure en los default_args para recibir alertas inmediatas en producción.

La implementación exitosa de este pipeline en tu entorno local es un hito significativo. Demuestra que posees las habilidades para orquestar flujos de trabajo de datos complejos, robustos y programados. Has pasado de entender piezas sueltas a ensamblar una máquina funcional. Este es el núcleo del valor de Apache Airflow: permitirte enfocarte en la lógica de negocio de cada tarea, mientras él se encarga de la confiabilidad operacional. El siguiente paso natural es llevar este conocimiento a un entorno de pre-producción, explorando conceptos como pools, colas de ejecución, y despliegue en servidores remotos o en la nube.

De lección a portfolio

Convertí esta lección en una habilidad visible para entrevistas.

Guardá el curso, completá los ejercicios y conectá esta habilidad con una ruta de empleo, data, IA, programación o marketing.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión