Práctica: Desplegar y Ejecutar el Pipeline en un Entorno Simulado

Lectura
40 min~10 min lectura
Objetivo de la lección

Introducción al Despliegue en un Entorno Simulado Llegó el momento de integrar todo el conocimiento adquirido.

Puntos de control
  • Introducción al Despliegue en un Entorno Simulado
  • Concepto Clave: Entorno Simulado vs. Producción
  • Cómo Funciona en la Práctica: Paso a Paso
  • Código en Acción: Docker Compose y DAG de Ejemplo

Introducción al Despliegue en un Entorno Simulado

Llegó el momento de integrar todo el conocimiento adquirido. Esta lección práctica te guiará en el despliegue y ejecución de un pipeline ETL complejo en un entorno simulado, un paso crucial antes de llevarlo a producción. Un entorno simulado replica las condiciones de un sistema real, pero de forma aislada y controlada, permitiéndote probar la orquestación, las dependencias, el manejo de errores y el monitoreo sin riesgo. Utilizaremos Docker Compose para levantar una instancia completa de Airflow, junto con una base de datos PostgreSQL para los metadatos y un contenedor con un sistema de archivos que simulará nuestro almacenamiento de datos de origen y destino.

El objetivo final es que puedas observar el ciclo de vida completo de un DAG: desde su carga en el scheduler, pasando por la ejecución secuencial y paralela de sus tareas, hasta la verificación de los resultados y el análisis de los logs. Este proceso te dará la confianza para diseñar y operar pipelines robustos. La simulación no solo valida la lógica, sino también la infraestructura, incluyendo la configuración de conexiones, pools y el correcto funcionamiento de los operadores personalizados que hayamos desarrollado en lecciones anteriores.

Concepto Clave: Entorno Simulado vs. Producción

Imagina que eres un director de orquesta. Antes del concierto final en el gran teatro (producción), realizas varios ensayos en una sala de ensayo (entorno simulado). En esta sala, todos los instrumentos (servicios como la base de datos, el scheduler) están presentes y son reales, pero el público no. Puedes detener la música, repetir un pasaje complicado o probar una nueva partitura sin que nadie fuera del ensayo se entere. De la misma manera, un entorno simulado para Airflow contiene los mismos componentes clave: el webserver, el scheduler, el executor (usualmente Local o Celery) y las bases de datos de soporte. La diferencia radica en que los datos son de prueba, los sistemas externos (como APIs de pago o data warehouses corporativos) son "mockeados" o reemplazados por versiones ligeras, y la carga es artificial.

La analogía se extiende al monitoreo. Durante el ensayo, el director escucha atentamente a cada sección y toma notas. En nuestro entorno simulado, usamos las vistas de Airflow, los logs detallados y herramientas como Metrics o incluso un Grafana local para observar el rendimiento. Este enfoque permite identificar cuellos de botella, como un pool de ejecución saturado, o errores de permisos en la escritura de archivos, que serían catastróficos si ocurrieran por primera vez en producción con datos reales y usuarios esperando informes.

Cómo Funciona en la Práctica: Paso a Paso

El primer paso es la preparación del entorno. Asumiremos que tienes Docker y Docker Compose instalados. Crearemos una estructura de directorios que refleje una organización profesional. La carpeta principal contendrá nuestro archivo docker-compose.yml, una carpeta dags/ donde colocaremos nuestro DAG, una carpeta logs/ para persistirlos, una carpeta plugins/ si los usamos, y una carpeta data/ que montaremos en los contenedores para simular almacenamientos como S3 o HDFS. Dentro de data/, crearemos subcarpetas como input/, output/ y processed/.

A continuación, configuraremos el archivo docker-compose.yml. Utilizaremos la imagen oficial de Apache Airflow, pero la personalizaremos para instalar dependencias Python adicionales (como pandas, boto3, o psycopg2) necesarias para nuestro pipeline. Definiremos servicios para el postgres, el webserver, el scheduler, y un worker si usamos CeleryExecutor. También definiremos volúmenes para persistir datos y variables de entorno para configurar Airflow, como el executor a usar y la conexión a la base de datos. Un paso crítico es inicializar la base de datos de Airflow con el comando airflow db init, que se ejecuta una sola vez.

Finalmente, desplegaremos los contenedores con docker-compose up -d. Una vez todos los servicios estén en estado "Up", accederemos a la interfaz web en localhost:8080. Las credenciales por defecto son airflow/airflow. Aquí cargaremos nuestro DAG, activaremos su ejecución, y usaremos la vista de árbol y de gráfico para monitorear su progreso en tiempo real. También examinaremos los logs de cada tarea para depurar cualquier problema y verificaremos que los archivos de salida se generen correctamente en la carpeta data/output/ de nuestro sistema host.

Código en Acción: Docker Compose y DAG de Ejemplo

A continuación, un archivo docker-compose.yml funcional para un entorno simulado con LocalExecutor, ideal para pruebas en una sola máquina.

version: '3'
x-airflow-common: &airflow-common
  image: apache/airflow:2.7.1
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./data:/opt/airflow/data
    - ./requirements.txt:/opt/airflow/requirements.txt
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 30s

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    restart: on-failure

volumes:
  postgres-db-volume:

Ahora, un DAG de ejemplo que simula un pipeline ETL complejo. Este DAG lee un archivo CSV de entrada, realiza una transformación con Pandas, y escribe un archivo de salida, todo dentro de nuestro entorno simulado.

from datetime import datetime, timedelta
import pandas as pd
import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['[email protected]'],
    'retries': 2,
    'retry_delay': timedelta(minutes=3),
}

def extract_data(**context):
    """Simula la extracción de datos desde un archivo en el volumen montado."""
    input_path = '/opt/airflow/data/input/ventas.csv'
    output_path = '/opt/airflow/data/processed/ventas_raw.parquet'
    
    # Leer datos
    df = pd.read_csv(input_path, sep=';')
    # Guardar en formato intermedio (simulando un staging)
    df.to_parquet(output_path, index=False)
    # Empujar la ruta del archivo al XCom para la siguiente tarea
    context['ti'].xcom_push(key='raw_data_path', value=output_path)
    print(f"Datos extraídos y guardados en: {output_path}")

def transform_data(**context):
    """Aplica transformaciones de negocio."""
    # Pull del path desde XCom
    ti = context['ti']
    input_path = ti.xcom_pull(task_ids='extract', key='raw_data_path')
    
    df = pd.read_parquet(input_path)
    
    # Transformaciones de ejemplo
    df['fecha'] = pd.to_datetime(df['fecha'])
    df['venta_neta'] = df['monto'] - df['descuento']
    df['categoria_agrupada'] = df['categoria'].apply(lambda x: 'ALTA' if x in ['Electrónica', 'Muebles'] else 'BAJA')
    
    output_path = '/opt/airflow/data/processed/ventas_transformado.parquet'
    df.to_parquet(output_path, index=False)
    context['ti'].xcom_push(key='transformed_data_path', value=output_path)
    print(f"Datos transformados guardados en: {output_path}")

def load_data(**context):
    """Carga los datos finales a un 'data warehouse' simulado (un directorio de salida)."""
    ti = context['ti']
    input_path = ti.xcom_pull(task_ids='transform', key='transformed_data_path')
    df = pd.read_parquet(input_path)
    
    # Simular carga: generar un reporte agregado y un archivo listo para consumo
    reporte = df.groupby('categoria_agrupada').agg({'venta_neta': 'sum', 'id': 'count'}).reset_index()
    reporte_path = '/opt/airflow/data/output/reporte_ventas.csv'
    reporte.to_csv(reporte_path, index=False)
    
    datos_finales_path = '/opt/airflow/data/output/ventas_final.parquet'
    df.to_parquet(datos_finales_path, index=False)
    
    print(f"Reporte generado en: {reporte_path}")
    print(f"Dataset final en: {datos_finales_path}")

with DAG(
    'etl_complejo_simulado',
    default_args=default_args,
    description='Un pipeline ETL complejo para ejecutar en entorno simulado',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 10, 1),
    catchup=False,
    tags=['simulacion', 'etl', 'proyecto'],
) as dag:

    inicio = DummyOperator(task_id='inicio')
    
    validar_entorno = BashOperator(
        task_id='validar_entorno',
        bash_command='echo "Entorno simulado activo. Fecha: $(date)" && ls -la /opt/airflow/data/input/',
    )
    
    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )
    
    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )
    
    load = PythonOperator(
        task_id='load',
        python_callable=load_data,
    )
    
    notificar_exito = BashOperator(
        task_id='notificar_exito',
        bash_command='echo "Pipeline ETL completado exitosamente el $(date)"',
    )
    
    inicio >> validar_entorno >> extract >> transform >> load >> notificar_exito

Para completar la simulación, aquí está un script bash simple para generar un archivo CSV de datos de prueba en la carpeta correcta antes de ejecutar el DAG.

#!/bin/bash
# crear_datos_prueba.sh
mkdir -p ./data/input
cat > ./data/input/ventas.csv << EOF
id;fecha;categoria;monto;descuento
1;2023-10-01;Electrónica;1000.50;50.25
2;2023-10-01;Ropa;250.00;10.00
3;2023-10-02;Muebles;1500.00;0.00
4;2023-10-02;Electrónica;750.30;30.00
5;2023-10-03;Ropa;120.00;5.50
6;2023-10-03;Muebles;2000.00;100.00
EOF
echo "Archivo de datos de prueba creado en ./data/input/ventas.csv"

Errores Comunes y Cómo Evitarlos

Error 1: Permisos de Volumen Incorrectos. El usuario dentro del contenedor Airflow (por defecto, uid 50000) no tiene permisos para escribir en las carpetas montadas del host. Esto provoca fallos silenciosos en tareas que escriben archivos. Solución: Asegúrate de que las carpetas locales (dags, logs, data) tengan permisos adecuados (chmod 777 para pruebas, o mejor, ajusta la propiedad con chown). La configuración user: "${AIRFLOW_UID:-50000}:0" en el compose ayuda a mapear el usuario.

Error 2: DAG No Aparece en la Interfaz Web. Has colocado el archivo .py en la carpeta local ./dags, pero el DAG no se lista. Solución: Verifica los logs del scheduler. Posibles causas: errores de sintaxis en el DAG que impiden su carga, el archivo no está en la ruta correcta dentro del contenedor (/opt/airflow/dags), o el scheduler aún no ha escaneado la carpeta (puede tardar unos segundos). Usa docker-compose logs airflow-scheduler para depurar.

Error 3: Dependencias Python Faltantes. Tu DAG importa la librería 'pandas', pero el contenedor Airflow base no la tiene instalada, causando ModuleNotFoundError. Solución: Crea un archivo requirements.txt en la raíz del proyecto con las librerías necesarias (ej., pandas==1.5.3) y asegúrate de que el volumen esté mapeado en el docker-compose.yml. Luego, reconstruye la imagen o usa comandos exec para instalarlas manualmente durante el desarrollo.

Error 4: Conexiones y Variables no Configuradas en el Entorno Simulado. Tu DAG en producción usa una conexión a S3 o a una base de datos, pero en la simulación falla porque esa conexión no existe. Solución: Define estas conexiones en tu entorno simulado. Puedes hacerlo mediante la interfaz web de Airflow (Admin -> Connections), usando un script de inicialización que ejecute comandos de la CLI de Airflow, o mediante variables de entorno en el docker-compose para conexiones simples.

Error 5: Falta de Limpieza en Ejecuciones Repetidas. Ejecutar el mismo DAG múltiples veces en la simulación puede llevar a duplicados de datos o errores porque los archivos de salida ya existen. Solución: Diseña tus tareas para que sean idempotentes. En nuestro ejemplo, las tareas sobrescriben los archivos. Otra estrategia es incluir una marca temporal en el nombre del archivo de salida o añadir una tarea inicial de limpieza que borre los directorios de processed/ y output/ antes de comenzar un nuevo ciclo ETL.

Tip crucial: Nunca asumas que el entorno simulado es idéntico a producción. Siempre documenta las diferencias (versiones de software, tamaños de datos, configuraciones de red) y ten un plan para validar el pipeline en un entorno de staging lo más parecido posible a producción antes del lanzamiento final.

Checklist de Dominio

Antes de considerar esta lección completa y estar listo para pasar a un entorno de staging o producción, verifica que puedes realizar y comprender cada uno de los siguientes puntos:

  • Puedo configurar y levantar un stack de Airflow funcional usando Docker Compose, con servicios para base de datos, webserver y scheduler.
  • He personalizado la imagen de Airflow para incluir dependencias Python específicas de mi proyecto, ya sea vía requirements.txt o extendiendo el Dockerfile.
  • Soy capaz de montar volúmenes correctamente para persistir DAGs, logs y datos de prueba, y he resuelto problemas de permisos entre el contenedor y el host.
  • He desplegado y ejecutado exitosamente un DAG complejo con múltiples tareas y dependencias en el entorno simulado, y he verificado la generación de los artefactos de salida.
  • Utilizo la interfaz web de Airflow para monitorear la ejecución del DAG, inspeccionar el estado de las tareas y analizar los logs de error cuando ocurren fallos.
  • He simulado y manejado errores comunes (ej., archivo de entrada faltante, error de sintaxis en una transformación) y he verificado que los mecanismos de reintento (retries) y notificación por email (configurada o simulada) funcionan.
  • He configurado al menos una conexión (por ejemplo, a un PostgreSQL local dentro del compose) o una Variable de Airflow desde la interfaz y mi DAG puede consumirla.
  • Puedo explicar la diferencia clave entre una ejecución en este entorno simulado y una en producción, listando al menos tres aspectos que deberían ser validados antes del despliegue final (ej., performance con volumen real, seguridad de credenciales, alta disponibilidad).
Falar no WhatsApp
Laboratorio de práctica

Antes de marcar esta lección como completa, escribí una evidencia breve para Apache Airflow: Orquestación de Pipelines ETL Complejos con DAGs: un ejemplo, una decisión, una captura, una mini demo o una nota que puedas reutilizar en portfolio.

Reflexión rápida

¿Qué cambiarías en tu forma de trabajar después de aplicar práctica: desplegar y ejecutar el pipeline en un entorno simulado?

De lección a portfolio

Convertí esta lección en evidencia para Data Analyst.

Sumá un mini caso con datos, una conclusión de negocio y una captura del resultado. Eso pesa más que decir que viste la herramienta.

Paso 1

Publicá una consulta, dashboard o notebook con una conclusión clara.

Paso 2

Agregá contexto: problema, dato usado, decisión recomendada y limitación.

Paso 3

Guardá el enlace en tu CV, LinkedIn o portfolio antes de postular.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión

Práctica: Desplegar y Ejecutar el Pipeline en u... | Cursalo