Concepto clave
Los operadores de bases de datos SQL en Apache Airflow son componentes especializados que permiten ejecutar comandos SQL en sistemas de bases de datos relacionales como PostgreSQL, MySQL, o SQL Server. Piensa en ellos como conectores inteligentes que traducen las tareas de tu DAG en instrucciones específicas que la base de datos puede entender y ejecutar.
En el contexto de un pipeline ETL, estos operadores actúan como puentes automatizados entre Airflow y tus almacenes de datos. Imagina que estás construyendo una cadena de montaje donde cada estación necesita comunicarse con un sistema central de inventario: los operadores SQL son los trabajadores especializados que saben exactamente cómo consultar y actualizar ese inventario sin que tengas que escribir el código de comunicación manualmente cada vez.
Cómo funciona en la práctica
Para usar operadores SQL en Airflow, primero debes configurar una conexión en la interfaz web o mediante variables de entorno. Esta conexión almacena credenciales y parámetros de conexión de forma segura. Luego, en tu DAG, importas el operador específico para tu base de datos y lo configuras con la conexión y la consulta SQL a ejecutar.
Ejemplo básico con PostgreSQL:
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
default_args = {
'owner': 'data_engineer',
'start_date': datetime(2023, 1, 1)
}
with DAG('ejemplo_sql', default_args=default_args, schedule_interval='@daily') as dag:
crear_tabla = PostgresOperator(
task_id='crear_tabla_ventas',
postgres_conn_id='mi_postgres',
sql="""
CREATE TABLE IF NOT EXISTS ventas_diarias (
fecha DATE,
producto VARCHAR(50),
cantidad INT,
total DECIMAL(10,2)
);
"""
)
insertar_datos = PostgresOperator(
task_id='insertar_ventas',
postgres_conn_id='mi_postgres',
sql="""
INSERT INTO ventas_diarias (fecha, producto, cantidad, total)
VALUES ('2023-10-01', 'Laptop', 5, 7500.00);
"""
)
crear_tabla >> insertar_datosEn este ejemplo, postgres_conn_id referencia la conexión configurada previamente, y el parámetro sql contiene las instrucciones a ejecutar. Airflow se encarga de establecer la conexión, ejecutar el SQL, y manejar los resultados o errores.
Caso de estudio
Considera un escenario real: una empresa de retail necesita actualizar diariamente su tabla de inventario consolidado a partir de múltiples fuentes. El pipeline ETL utiliza Airflow con los siguientes operadores SQL:
- PostgresOperator para extraer datos de ventas de la base de datos transaccional
- MySQLOperator para obtener niveles de stock del sistema de almacén
- MsSqlOperator para consultar datos de proveedores en SQL Server
- Un operador final que consolida todos los datos en una tabla de PostgreSQL para reportes
Tabla de ejemplo de datos procesados:
| Producto | Ventas 24h | Stock Actual | Proveedor | Reordenar |
|---|---|---|---|---|
| Laptop X1 | 15 | 42 | TecnoSupply | NO |
| Mouse Gamer | 87 | 23 | PeriTech | SI |
| Monitor 4K | 12 | 8 | DisplayPro | SI |
En pipelines de producción, los operadores SQL suelen combinarse con sensores para esperar datos disponibles, y con hooks para operaciones más complejas que requieren manipulación programática de resultados.
Errores comunes
- No gestionar conexiones correctamente: Usar credenciales hardcodeadas en el DAG en lugar de conexiones configuradas. Solución: Siempre usa el sistema de conexiones de Airflow y almacena secretos en herramientas como HashiCorp Vault o AWS Secrets Manager.
- Consultas SQL sin manejo de errores: Ejecutar operaciones DDL sin verificar si objetos existen. Solución: Usa patrones como
CREATE TABLE IF NOT EXISTSo implementa tareas de verificación previa. - Problemas de rendimiento con grandes volúmenes: Ejecutar consultas que devuelven millones de filas directamente en memoria. Solución: Usa paginación, limita resultados, o considera operadores específicos como
PostgresToGCSOperatorpara exportaciones grandes. - Falta de transaccionalidad: En operaciones multi-step, no considerar rollback automático. Solución: Para operaciones críticas, usa bloques transaccionales en SQL o implementa mecanismos de compensación en el DAG.
- Ignorar la idempotencia: Pipelines que fallan y al reintentar crean datos duplicados. Solución: Diseña consultas idempotentes usando claves únicas o marcas temporales.
Checklist de dominio
- Puedo configurar una conexión de base de datos en Airflow usando la UI o variables de entorno
- Sé seleccionar el operador correcto para cada sistema SQL (PostgreSQL, MySQL, SQL Server, etc.)
- Puedo escribir consultas SQL parametrizadas para usar con operadores
- Entiendo cómo manejar resultados de consultas SELECT dentro de un DAG
- Sé implementar patrones de idempotencia en mis operaciones SQL
- Puedo diagnosticar y resolver errores comunes de conexión y sintaxis SQL
- Conozco las limitaciones de rendimiento y cómo optimizar consultas en operadores
Implementar un pipeline ETL con múltiples operadores SQL
En este ejercicio, crearás un DAG que extrae datos de una tabla de ventas, los transforma, y los carga en una tabla de reportes. Sigue estos pasos:
- Configuración inicial: Crea una conexión en Airflow llamada 'ejercicio_postgres' con credenciales para una base de datos PostgreSQL (puedes usar un contenedor Docker local).
- Crear tablas de origen y destino: Implementa dos tareas usando PostgresOperator:
- Task 1: Crear tabla
ventas_rawcon columnas: id (SERIAL), fecha (DATE), producto_id (INT), cantidad (INT), precio_unitario (DECIMAL) - Task 2: Crear tabla
ventas_diariascon columnas: fecha (DATE), total_ventas (DECIMAL), productos_vendidos (INT)
- Task 1: Crear tabla
- Poblar datos de prueba: Agrega una tarea que inserte 5 registros de ejemplo en
ventas_rawcon datos coherentes. - Transformación y carga: Implementa la tarea principal que:
- Lee de
ventas_rawagrupando por fecha - Calcula total_ventas = SUM(cantidad * precio_unitario)
- Calcula productos_vendidos = SUM(cantidad)
- Inserta los resultados en
ventas_diarias
- Lee de
- Verificación: Añade una tarea final que consulte
ventas_diariasy registre el número de filas procesadas. - Ejecución y prueba: Ejecuta el DAG y verifica que todas las tareas completen exitosamente.
Requisitos técnicos: Usa al menos 3 operadores PostgresOperator diferentes, implementa dependencias entre tareas, y maneja posibles errores de conexión.
Pistas- Recuerda que puedes usar parámetros SQL como {{ ds }} para la fecha de ejecución del DAG
- Para la transformación, considera usar una CTE (Common Table Expression) en tu consulta SQL para claridad
- Prueba tu DAG con 'airflow tasks test' antes de la ejecución completa para verificar sintaxis
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.