Concepto clave
En Apache Airflow, la integración con bases de datos es fundamental para pipelines ETL automatizados. Dos componentes esenciales son el PostgresOperator y el MySqlHook. El PostgresOperator es un operador que ejecuta comandos SQL directamente en PostgreSQL, ideal para tareas como crear tablas, insertar datos o ejecutar procedimientos almacenados. En cambio, MySqlHook es un hook (gancho) que proporciona una conexión programática a MySQL, permitiendo mayor flexibilidad para operaciones complejas o personalizadas.
Imagina que estás construyendo una línea de ensamblaje en una fábrica. El PostgresOperator sería como una máquina especializada que realiza una tarea específica (ejecutar una consulta SQL) de forma automática, mientras que MySqlHook sería como una herramienta manual que te permite intervenir y ajustar procesos según sea necesario. Ambos son complementarios: el operador para tareas estandarizadas y el hook para situaciones que requieren más control.
Cómo funciona en la práctica
Para usar PostgresOperator, primero debes configurar una conexión en Airflow. Veamos un ejemplo paso a paso:
- Define la conexión en la interfaz web de Airflow o mediante variables de entorno. Para PostgreSQL, el tipo de conexión es 'postgres'.
- Importa el operador en tu DAG:
from airflow.providers.postgres.operators.postgres import PostgresOperator. - Crea una tarea que ejecute una consulta SQL. Por ejemplo, para crear una tabla:
create_table_task = PostgresOperator(
task_id='crear_tabla_ventas',
postgres_conn_id='mi_conexion_postgres',
sql='''
CREATE TABLE IF NOT EXISTS ventas (
id SERIAL PRIMARY KEY,
fecha DATE,
monto DECIMAL(10,2)
);
'''
)Para MySqlHook, el proceso es similar pero más flexible. Primero, importa el hook: from airflow.providers.mysql.hooks.mysql import MySqlHook. Luego, úsalo en una función Python personalizada dentro de tu DAG:
def extraer_datos_mysql():
hook = MySqlHook(mysql_conn_id='mi_conexion_mysql')
connection = hook.get_conn()
cursor = connection.cursor()
cursor.execute('SELECT * FROM productos WHERE stock < 10')
resultados = cursor.fetchall()
# Procesar resultados aquí
cursor.close()
connection.close()Caso de estudio
Supongamos que trabajas en una empresa de e-commerce que necesita actualizar diariamente un dashboard de ventas. Los datos provienen de una base MySQL transaccional y se cargan en un data warehouse en PostgreSQL para análisis. Tu DAG en Airflow podría incluir:
- Una tarea con MySqlHook para extraer ventas del día desde MySQL.
- Una tarea de transformación (usando PythonOperator) para limpiar y agregar los datos.
- Una tarea con PostgresOperator para insertar los datos transformados en una tabla de PostgreSQL.
Ejemplo de datos en una tabla HTML:
| ID Venta | Fecha | Producto | Cantidad | Total |
|---|---|---|---|---|
| 101 | 2023-10-01 | Laptop | 2 | 2000.00 |
| 102 | 2023-10-01 | Mouse | 5 | 250.00 |
En este flujo, MySqlHook maneja la extracción flexible desde la fuente operacional, mientras que PostgresOperator garantiza una carga eficiente en el destino analítico.
Errores comunes
- No configurar correctamente las conexiones: Olvidar definir el
conn_iden Airflow o usar credenciales incorrectas. Solución: Verifica las conexiones en la interfaz web y prueba con herramientas como psql o mysql client. - Ejecutar consultas largas sin timeouts: En PostgresOperator, consultas complejas pueden bloquear el DAG. Solución: Usa parámetros como
execution_timeouto divide la consulta en tareas más pequeñas. - No manejar conexiones en MySqlHook: Dejar conexiones abiertas puede agotar recursos. Solución: Siempre cierra cursor y conexión con
close(), como en el ejemplo anterior. - Confundir operadores con hooks: Usar PostgresOperator para tareas que requieren lógica personalizada, o viceversa. Solución: Elige operadores para SQL directo y hooks para operaciones programáticas.
Checklist de dominio
- Configurar una conexión a PostgreSQL y MySQL en Airflow.
- Crear una tarea con PostgresOperator que ejecute una consulta CREATE o INSERT.
- Usar MySqlHook en una función Python para leer datos desde una tabla.
- Combinar ambos en un DAG que extraiga de MySQL y cargue en PostgreSQL.
- Manejar errores de conexión con retries y alertas.
- Optimizar consultas SQL para evitar timeouts en operadores.
- Documentar el flujo de datos entre bases en tu pipeline.
Construye un DAG que sincronice datos entre MySQL y PostgreSQL
En este ejercicio, crearás un DAG en Airflow que automatice la sincronización diaria de datos de productos desde MySQL a PostgreSQL. Sigue estos pasos:
- Configura dos conexiones en Airflow: una para MySQL (conn_id: 'mysql_source') y otra para PostgreSQL (conn_id: 'postgres_dest').
- Crea un DAG llamado 'sincronizar_productos' con una programación diaria.
- Define una tarea usando MySqlHook que extraiga todos los productos con stock menor a 50 desde una tabla llamada 'productos' en MySQL.
- Implementa una función de transformación (con PythonOperator) que convierta los datos a un formato adecuado para PostgreSQL.
- Usa PostgresOperator para crear una tabla 'productos_bajo_stock' en PostgreSQL si no existe, con columnas: id, nombre, stock, ultima_actualizacion.
- Agrega una tarea final que inserte los datos transformados en la tabla de PostgreSQL.
- Prueba tu DAG ejecutándolo manualmente y verifica los datos en ambas bases.
- Recuerda que MySqlHook requiere manejar la conexión explícitamente con get_conn() y close().
- Usa parámetros como 'IF NOT EXISTS' en el SQL de PostgresOperator para evitar errores si la tabla ya existe.
- Considera agregar logs en tus funciones para depurar el flujo de datos.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.