Uso de Operadores PythonOperator y BashOperator en ETL

Lectura
20 min~4 min lectura

Concepto clave

En Apache Airflow, los operadores son los componentes fundamentales que ejecutan tareas especificas dentro de un DAG. Piensalos como las herramientas en una caja de un ingeniero de datos: cada una tiene una funcion unica. El PythonOperator te permite ejecutar codigo Python arbitrario, ideal para transformaciones de datos complejas o integraciones con APIs. El BashOperator ejecuta comandos de shell, perfecto para mover archivos, ejecutar scripts externos o interactuar con sistemas de archivos.

Una analogia del mundo real seria un restaurante automatizado. El BashOperator seria como el robot que trae ingredientes crudos de la nevera (ejecutando comandos como cp o wget). El PythonOperator seria el chef que cocina esos ingredientes (transformando datos con pandas o logica de negocio). Juntos, orquestan la preparacion de un plato completo (tu pipeline ETL).

Como funciona en la practica

Veamos un ejemplo paso a paso de un DAG simple que usa ambos operadores. Supongamos que necesitas descargar un archivo CSV, procesarlo y guardar los resultados.

Paso 1: Definir el DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def procesar_datos():
    import pandas as pd
    df = pd.read_csv('/tmp/datos.csv')
    df['nueva_columna'] = df['valor'] * 2
    df.to_csv('/tmp/datos_procesados.csv', index=False)

default_args = {
    'owner': 'data_engineer',
    'start_date': datetime(2023, 1, 1)
}

dag = DAG(
    'etl_ejemplo',
    default_args=default_args,
    schedule_interval='@daily'
)

Paso 2: Agregar tareas con operadores

descargar = BashOperator(
    task_id='descargar_archivo',
    bash_command='wget -O /tmp/datos.csv https://ejemplo.com/datos.csv',
    dag=dag
)

procesar = PythonOperator(
    task_id='procesar_datos',
    python_callable=procesar_datos,
    dag=dag
)

guardar = BashOperator(
    task_id='guardar_resultados',
    bash_command='aws s3 cp /tmp/datos_procesados.csv s3://bucket/datos/',
    dag=dag
)

Paso 3: Establecer dependencias

descargar >> procesar >> guardar

Este DAG ejecutara secuencialmente: descarga un archivo, lo procesa en Python y sube el resultado a S3.

Caso de estudio

Imagina que trabajas para un e-commerce que necesita actualizar diariamente las recomendaciones de productos. Tu pipeline ETL debe:

  1. Descargar logs de ventas del dia anterior desde un servidor SFTP.
  2. Procesar los logs para calcular productos mas vendidos.
  3. Actualizar una base de datos de recomendaciones.

Implementacion con operadores:

TareaOperadorImplementacion
Descargar logsBashOperatorsftp usuario@servidor:/logs/ventas_$(date -d yesterday +%Y%m%d).csv /tmp/
Procesar datosPythonOperatorFuncion Python que lee el CSV, agrupa por producto y calcula metricas
Actualizar DBPythonOperatorFuncion que usa SQLAlchemy para insertar en PostgreSQL
En produccion, siempre usa variables de Airflow para credenciales y rutas, nunca las codifiques directamente.

Errores comunes

  • No manejar errores en PythonOperator: Si tu funcion Python lanza una excepcion no capturada, la tarea fallara. Siempre incluye try-except y logging.
  • BashOperator con comandos complejos sin pruebas: Un comando bash con errores de sintaxis fallara silenciosamente. Prueba primero en una terminal.
  • Olvidar dependencias de Python: Si tu funcion PythonOperator usa pandas, asegurate de que este instalado en el entorno de Airflow.
  • No limpiar archivos temporales: BashOperator puede crear archivos en /tmp que acumulan espacio. Agrega tareas de limpieza o usa directorios con timestamp.
  • Mal manejo de fechas en BashOperator: Usar $(date) directamente puede causar inconsistencias. Mejor usa las macros de Airflow como {{ ds }}.

Checklist de dominio

  1. Puedo crear un DAG que combine al menos un PythonOperator y un BashOperator con dependencias.
  2. Se como pasar parametros entre tareas usando XComs cuando es necesario.
  3. Entiendo cuando usar cada operador: Bash para comandos de sistema, Python para logica compleja.
  4. Puedo manejar errores en ambos operadores (excepciones en Python, exit codes en Bash).
  5. Se configurar el entorno para que PythonOperator tenga acceso a las librerias necesarias.
  6. Puedo usar variables de Airflow en lugar de valores hardcodeados en los operadores.
  7. Entiendo el impacto en rendimiento: PythonOperator en el scheduler vs BashOperator en workers.

Construye un pipeline ETL para datos meteorologicos

Crea un DAG que automatice la descarga y procesamiento de datos meteorologicos diarios.

  1. Define un DAG llamado etl_clima que se ejecute diariamente.
  2. Agrega una tarea con BashOperator que descargue un archivo CSV desde https://datos-ejemplo.com/clima_$(date +%Y-%m-%d).csv a /tmp/clima.csv.
  3. Agrega una tarea con PythonOperator que:
    • Lea el archivo CSV descargado
    • Calcule la temperatura promedio del dia
    • Guarde el resultado en un archivo JSON en /tmp/clima_promedio.json
  4. Agrega una tarea final con BashOperator que mueva el archivo JSON a un directorio de backup: /backup/clima/.
  5. Establece las dependencias para que las tareas se ejecuten en orden.
  6. Prueba tu DAG ejecutandolo manualmente desde la UI de Airflow.
Pistas
  • Usa pandas en la funcion Python para leer el CSV y calcular promedios
  • Asegurate de que el directorio /backup/clima/ exista antes de ejecutar
  • Considera usar try-except en tu funcion Python para manejar archivos faltantes

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.