Concepto clave
El PythonOperator en Apache Airflow es una herramienta fundamental para ejecutar funciones de Python dentro de tus DAGs. Piensa en él como un trabajador especializado que puedes entrenar para realizar tareas específicas de extracción de datos, ya sea desde APIs o archivos. A diferencia de operadores predefinidos, el PythonOperator te da la flexibilidad de escribir código personalizado, lo que es esencial cuando trabajas con fuentes de datos no estandarizadas.
En el contexto de ETL, la extracción es el primer paso crítico. Imagina que estás construyendo un sistema de reportes para una tienda online: necesitas extraer datos de ventas desde una API REST y archivos CSV de inventario. El PythonOperator actúa como el puente entre Airflow y estas fuentes, permitiéndote orquestar extracciones complejas de manera confiable y programada. Su ventaja principal es que aprovechas todo el ecosistema de Python (librerías como requests, pandas, o boto3) dentro del flujo de trabajo de Airflow.
Cómo funciona en la práctica
Veamos un ejemplo paso a paso para extraer datos de una API pública y guardarlos en un archivo temporal. Supongamos que queremos obtener datos meteorológicos.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import json
def extract_weather_data():
url = "https://api.open-meteo.com/v1/forecast?latitude=40.41&longitude=-3.70&hourly=temperature_2m"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
with open('/tmp/weather_data.json', 'w') as f:
json.dump(data, f)
print("Datos extraídos exitosamente")
else:
raise Exception(f"Error en la API: {response.status_code}")
default_args = {
'owner': 'data_engineer',
'start_date': datetime(2023, 10, 1),
}
with DAG('extract_api_data', default_args=default_args, schedule_interval='@daily') as dag:
extract_task = PythonOperator(
task_id='extract_weather',
python_callable=extract_weather_data
)En este código: 1) Definimos una función extract_weather_data que usa la librería requests para llamar a la API. 2) Creamos un DAG con un PythonOperator que ejecuta esa función. 3) El operador maneja la lógica de extracción y error, integrando se perfectamente en el flujo de Airflow. Nota cómo la función se pasa como python_callable—esto es el corazón del PythonOperator.
Caso de estudio
Un data engineer en una fintech necesita extraer datos diarios de transacciones desde dos fuentes: una API bancaria y archivos CSV de socios. Implementemos esto con PythonOperator.
Objetivo: Extraer 1000 registros de transacciones diarias y consolidarlos en un solo dataset.
import pandas as pd
def extract_transaction_data():
# Extraer de API
api_url = "https://api.bank.com/transactions"
headers = {"Authorization": "Bearer TU_TOKEN"}
api_response = requests.get(api_url, headers=headers)
api_data = api_response.json()['transactions']
# Extraer de archivo CSV
csv_path = '/data/partner_transactions.csv'
csv_data = pd.read_csv(csv_path)
# Combinar
combined_data = pd.DataFrame(api_data).append(csv_data, ignore_index=True)
combined_data.to_parquet('/tmp/combined_transactions.parquet', index=False)
return '/tmp/combined_transactions.parquet'
with DAG('etl_transactions', default_args=default_args, schedule_interval='0 2 * * *') as dag:
extract_task = PythonOperator(
task_id='extract_and_combine',
python_callable=extract_transaction_data
)Este caso muestra cómo el PythonOperator permite manejar múltiples fuentes en una sola tarea, usando pandas para transformaciones simples durante la extracción. La salida es un archivo Parquet listo para la siguiente etapa del pipeline.
Errores comunes
- No manejar errores de conexión en APIs: Si la API falla, tu tarea fallará. Siempre incluye manejo de excepciones y reintentos configurados en el operador.
- Extraer datos demasiado grandes en memoria: Para archivos grandes, usa streaming o procesamiento por lotes para evitar desbordamientos de memoria.
- Olvidar limpiar archivos temporales: Los archivos creados en
/tmppueden acumularse. Considera usar hooks de Airflow o eliminar archivos después de usarlos. - No validar esquemas de datos: Extraer sin verificar la estructura puede causar errores en etapas posteriores. Añade validaciones básicas en tu función.
- Configurar mal los parámetros del PythonOperator: Por ejemplo, pasar argumentos incorrectos a
python_callable. Revisa la documentación para usarop_argsyop_kwargscorrectamente.
Checklist de dominio
- ¿Puedes escribir una función de Python que extraiga datos de una API REST y maneje errores HTTP?
- ¿Sabes configurar un PythonOperator en un DAG con parámetros como
task_idypython_callable? - ¿Eres capaz de extraer datos desde archivos CSV, JSON, o Parquet usando librerías como pandas dentro de Airflow?
- ¿Puedes combinar múltiples fuentes de datos en una sola tarea de extracción?
- ¿Entiendes cómo depurar fallos en PythonOperator revisando logs en la interfaz de Airflow?
- ¿Sabes optimizar la extracción para grandes volúmenes de datos (ej., usando paginación en APIs)?
- ¿Puedes integrar la extracción con hooks de Airflow (como S3Hook) para almacenamiento externo?
Extraer datos de una API de GitHub y un archivo JSON local
En este ejercicio, crearás un DAG en Airflow que extraiga datos de dos fuentes usando PythonOperator. Sigue estos pasos:
- Crea un nuevo archivo DAG en tu entorno de Airflow (ej.,
dags/extract_github_data.py). - Define una función llamada
extract_github_reposque:- Use la API pública de GitHub (
https://api.github.com/users/airflow/repos) para obtener una lista de repositorios. - Extraiga al menos 3 campos (ej., nombre, descripción, estrellas) de cada repositorio.
- Guarde los datos en un DataFrame de pandas.
- Use la API pública de GitHub (
- En la misma función, lee un archivo JSON local (
/data/local_projects.json) con una estructura similar y añádelo al DataFrame. - Guarda el DataFrame combinado como un archivo Parquet en
/tmp/combined_repos.parquet. - Configura un PythonOperator que ejecute esta función diariamente a las 8 AM.
- Prueba tu DAG ejecutándolo manualmente desde la interfaz de Airflow y verifica que el archivo Parquet se crea correctamente.
- Usa la librería requests para la API de GitHub y maneja el código de estado 200.
- Asegúrate de que el archivo JSON local existe en la ruta especificada o maneja el error.
- Considera usar pandas para combinar DataFrames con append o concat.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.