Conectar Airflow a Bases de Datos (PostgreSQL, MySQL)

Lectura
20 min~10 min lectura
Objetivo de la lección

Introducción: La Importancia de la Conexión a Bases de Datos en Airflow Apache Airflow, en su esencia, es un orquestador.

Puntos de control
  • Introducción: La Importancia de la Conexión a Bases de Datos en Airflow
  • Concepto Clave: Connections, Hooks y Providers - La Trinidad de la Conexión
  • Cómo Funciona en la Práctica: Configuración y Flujo Paso a Paso
  • Código en Acción: DAG de Ejemplo Completo con PostgreSQL y MySQL

Introducción: La Importancia de la Conexión a Bases de Datos en Airflow

Apache Airflow, en su esencia, es un orquestador. Su poder no reside en procesar datos por sí mismo, sino en coordinar de manera elegante y confiable dónde, cuándo y cómo se ejecutan las tareas que sí lo hacen. En el contexto de los pipelines ETL y ELT, el destino y origen más común de esos datos son las bases de datos relacionales. Por lo tanto, la capacidad de conectar Airflow de forma segura, eficiente y manejable a sistemas como PostgreSQL y MySQL no es una característica más; es la piedra angular sobre la que se construyen la mayoría de los flujos de trabajo de datos complejos.

En esta lección, nos sumergiremos más allá del simple uso de un operador PythonOperator con una librería como psycopg2. Exploraremos el paradigma nativo de Airflow para gestionar conexiones y hooks, que proporciona una capa de abstracción, seguridad y reutilización fundamental para entornos de producción. Aprenderás a configurar conexiones en la interfaz web, a utilizar los hooks específicos para cada base de datos dentro de tus tareas, y a diseñar DAGs que interactúen con tus bases de datos de manera robusta y mantenible. Dominar este patrón es lo que separa a un usuario básico de Airflow de un ingeniero capaz de construir orquestaciones empresariales.

Concepto Clave: Connections, Hooks y Providers - La Trinidad de la Conexión

Para entender cómo Airflow interactúa con sistemas externos, es crucial internalizar tres conceptos entrelazados: Connections, Hooks y Providers. Imagina que estás construyendo una red de tuberías en una gran planta industrial (tu DAG). Las Connections son los puntos de acoplamiento estandarizados en la pared, cada uno con una etiqueta (un `conn_id`) que define el tipo de fluido (PostgreSQL, MySQL), la presión (host, puerto) y las credenciales de acceso (usuario, contraseña). Tú, como ingeniero, no soldas una tubería nueva cada vez; usas ese acoplamiento.

Los Hooks son las mangueras inteligentes que se conectan a esos puntos. Un hook para PostgreSQL (PostgresHook) sabe exactamente cómo acoplarse al punto "PostgreSQL", abrir la válvula (establecer la sesión) y manejar el flujo de datos (ejecutar consultas, cargar datos). Te abstraen de los detalles de bajo nivel de la librería cliente. Finalmente, los Providers son los paquetes de software que suministran estos acoplamientos y mangueras especializadas. Para bases de datos, el paquete apache-airflow-providers-postgres y apache-airflow-providers-mysql contienen los hooks y operadores específicos. Esta separación de responsabilidades es genial: los ingenieros de plataforma configuran los Connections (secretos) de forma segura, y los desarrolladores de DAGs usan los Hooks en su código sin tocar nunca una credencial.

Tip Clave: Nunca codifiques credenciales (host, usuario, contraseña) directamente en el código de tu DAG. Siempre utiliza el mecanismo de Connections de Airflow. Esto es seguridad básica y permite que tu DAG funcione en diferentes entornos (desarrollo, staging, producción) solo cambiando la conexión a la que apunta.

Cómo Funciona en la Práctica: Configuración y Flujo Paso a Paso

El proceso para integrar una base de datos en tu DAG sigue un patrón claro. Primero, debes asegurarte de que el provider necesario está instalado en tu entorno de Airflow. Esto se hace típicamente con pip, por ejemplo, pip install apache-airflow-providers-postgres. Luego, a través de la interfaz web de Airflow (en Admin -> Connections), creas una nueva conexión. Seleccionas el tipo (Postgres o MySQL), y rellenas los campos críticos: Host, Schema (base de datos), Login (usuario), Password y Port. Asignas un Conn ID único y descriptivo, como `mi_postgres_warehouse`.

Una vez configurada la conexión, el flujo dentro de tu DAG es sencillo. En la función de una tarea (usando PythonOperator, TaskFlow API, o dentro de un operador dedicado), instancias el hook correspondiente (PostgresHook o MySqlHook), pasándole el `conn_id`. Este hook se encarga automáticamente de recuperar las credenciales del metastore de Airflow y establecer la conexión. A través del hook, puedes obtener un objeto de conexión nativo (como un objeto psycopg2) para ejecutar SQL complejo, o usar métodos helper del hook como `get_records`, `run` o `bulk_load`. Al final de la tarea, Airflow y el hook se encargan de cerrar la conexión adecuadamente, gestionando los recursos.

Para operaciones muy comunes, como ejecutar una consulta SQL específica, Airflow ofrece operadores dedicados como PostgresOperator o MySqlOperator. Estos operadores encapsulan la lógica del hook y te permiten definir la SQL directamente en el DAG, lo que es muy declarativo. La elección entre usar un operador dedicado o un PythonOperator con un hook depende de la complejidad de la lógica: para SQL simple, el operador; para lógica de negocio compleja con Python, el hook dentro de un PythonOperator.

Código en Acción: DAG de Ejemplo Completo con PostgreSQL y MySQL

A continuación, un DAG funcional que demuestra múltiples patrones. Este DAG simula un proceso simple de extracción de datos de una base de datos transaccional MySQL, una transformación básica en Python, y la carga en una tabla de un almacén de datos PostgreSQL. Nota cómo se utilizan tanto operadores específicos como hooks dentro de PythonOperator.


from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
import pandas as pd

default_args = {
    'owner': 'equipo_datos',
    'retries': 2,
    'retry_delay': timedelta(minutes=3),
}

def extract_from_mysql(**context):
    """
    Extrae datos de MySQL usando un Hook.
    """
    mysql_hook = MySqlHook(mysql_conn_id='mi_mysql_oltp')
    sql = "SELECT customer_id, order_date, amount FROM sales WHERE order_date >= %s"
    # Nota: Los hooks manejan el escape de parámetros de forma segura.
    min_date = context['data_interval_start'].date()  # Fecha de inicio de la ejecución del DAG
    records = mysql_hook.get_records(sql, parameters=(min_date,))
    
    # Convertir a DataFrame para facilitar la transformación. En un caso real, evalúa el volumen.
    df = pd.DataFrame(records, columns=['customer_id', 'order_date', 'amount'])
    
    # Empujar el DataFrame a XCom para que la siguiente tarea lo use.
    # ¡Cuidado con el tamaño de los datos en XCom! Solo para ejemplos pequeños.
    context['ti'].xcom_push(key='extracted_data', value=df.to_json(orient='records'))

def transform_data(**context):
    """
    Recibe datos de XCom, aplica una transformación y prepara para carga.
    """
    ti = context['ti']
    extracted_data_json = ti.xcom_pull(key='extracted_data', task_ids='extract_task')
    df = pd.read_json(extracted_data_json, orient='records')
    
    # Transformación de ejemplo: Agregar una columna con el importe con impuesto.
    df['amount_with_tax'] = df['amount'] * 1.21
    
    # Agrupar por cliente para el resumen diario.
    df_daily = df.groupby(['customer_id', 'order_date']).agg({
        'amount': 'sum',
        'amount_with_tax': 'sum'
    }).reset_index()
    
    # Preparar lista de tuplas para inserción masiva.
    records_to_insert = list(df_daily.itertuples(index=False, name=None))
    context['ti'].xcom_push(key='transformed_records', value=records_to_insert)

def load_to_postgres(**context):
    """
    Carga los datos transformados en PostgreSQL usando el Hook para inserción masiva.
    """
    postgres_hook = PostgresHook(postgres_conn_id='mi_postgres_warehouse')
    ti = context['ti']
    records = ti.xcom_pull(key='transformed_records', task_ids='transform_task')
    
    # Definir los nombres de las columnas de la tabla destino.
    target_fields = ['customer_id', 'order_date', 'daily_amount', 'daily_amount_with_tax']
    
    # Usar bulk_load (COPY) para máxima eficiencia. Requiere permisos y acceso al filesystem.
    # Alternativa: usar run() con INSERT múltiple.
    sql_insert = """
        INSERT INTO public.daily_customer_sales (customer_id, sale_date, amount, amount_with_tax)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (customer_id, sale_date) DO UPDATE SET
            amount = EXCLUDED.amount,
            amount_with_tax = EXCLUDED.amount_with_tax;
    """
    # Ejecutar para cada lote de registros. En producción, usa ejecución por lotes.
    if records:
        postgres_hook.run(sql_insert, parameters=records)

with DAG(
    dag_id='etl_database_integration',
    default_args=default_args,
    description='Un DAG que integra MySQL y PostgreSQL',
    start_date=datetime(2023, 10, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['etl', 'postgres', 'mysql'],
) as dag:

    # 1. Crear la tabla de destino en PostgreSQL si no existe (usando PostgresOperator).
    create_table = PostgresOperator(
        task_id='create_target_table',
        postgres_conn_id='mi_postgres_warehouse',
        sql="""
            CREATE TABLE IF NOT EXISTS public.daily_customer_sales (
                customer_id INT NOT NULL,
                sale_date DATE NOT NULL,
                amount DECIMAL(10,2),
                amount_with_tax DECIMAL(10,2),
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                PRIMARY KEY (customer_id, sale_date)
            );
        """
    )

    # 2. Extraer datos de MySQL (usando PythonOperator con Hook).
    extract_task = PythonOperator(
        task_id='extract_task',
        python_callable=extract_from_mysql,
    )

    # 3. Transformar los datos (usando PythonOperator).
    transform_task = PythonOperator(
        task_id='transform_task',
        python_callable=transform_data,
    )

    # 4. Cargar datos a PostgreSQL (usando PythonOperator con Hook).
    load_task = PythonOperator(
        task_id='load_task',
        python_callable=load_to_postgres,
    )

    # 5. Tarea de validación (usando PostgresOperator para una consulta simple).
    validate_load = PostgresOperator(
        task_id='validate_load',
        postgres_conn_id='mi_postgres_warehouse',
        sql="""
            SELECT
                COUNT(*) as total_registros,
                MIN(sale_date) as fecha_minima,
                MAX(sale_date) as fecha_maxima
            FROM public.daily_customer_sales
            WHERE sale_date = '{{ ds }}';
        """,
        do_xcom_push=True  # Para enviar el resultado a XCom y poder inspeccionarlo.
    )

    # Definición del flujo de tareas.
    create_table >> extract_task >> transform_task >> load_task >> validate_load

Errores Comunes y Cómo Evitarlos

Al trabajar con conexiones a bases de datos en Airflow, varios errores recurrentes pueden interrumpir tus pipelines. El primero y más común es el "Connection not found" o errores de autenticación. Esto sucede cuando el `conn_id` escrito en el código no coincide exactamente con el configurado en la interfaz web, o cuando los parámetros (host, puerto, usuario, contraseña) son incorrectos. La solución es verificar meticulosamente la ortografía del `conn_id` y probar la conexión con el botón "Test" en la UI de Airflow.

El segundo error es el agotamiento de conexiones en la base de datos. Cada hook, por defecto, crea y cierra una conexión. En DAGs con muchas tareas paralelas que usan el mismo `conn_id`, puedes saturar el límite de conexiones máximas de tu BD. Para evitarlo, reutiliza la conexión dentro de una misma tarea si es posible, y considera ajustar los parámetros de pool de conexiones en la configuración de la base de datos y en el lado del hook (algunos permiten configurar un pool).

Un tercer error sutil es el manejo incorrecto de los contextos y la inyección de parámetros SQL. No concatenes directamente valores variables en un string SQL; esto abre la puerta a inyecciones SQL y a errores de sintaxis. Siempre utiliza el parámetro `parameters` de los métodos `run()` o `get_records()` de los hooks, que realiza el escape seguro. En el ejemplo anterior, vimos `parameters=(min_date,)`.

Finalmente, el cuarto error es el uso abusivo de XCom para transferir grandes volúmenes de datos entre tareas, como se hizo de forma didáctica en el ejemplo. XCom está diseñado para metadatos pequeños (IDs, rutas, conteos), no para datasets completos. Para grandes volúmenes, utiliza almacenamiento intermedio (como un archivo en S3 o GCS) o pasa solo referencias. Transferir un DataFrame de 1GB a través de XCom colapsará la base de datos metastore de Airflow.

Checklist de Dominio

Para verificar que has comprendido y puedes aplicar los conceptos de esta lección, asegúrate de poder marcar las siguientes afirmaciones:

  • Puedo explicar la diferencia y el propósito de una Connection, un Hook y un Provider en Airflow.
  • He configurado con éxito al menos una conexión a PostgreSQL y otra a MySQL en la interfaz web de Airflow y la he probado con el botón "Test".
  • Puedo escribir una tarea usando PythonOperator que utilice PostgresHook o MySqlHook para ejecutar una consulta SELECT y procesar sus resultados.
  • He utilizado al menos una vez los operadores específicos PostgresOperator y MySqlOperator en un DAG para ejecutar sentencias SQL DDL o DML.
  • Sé cómo y por qué debo usar el argumento `parameters` en los métodos de los hooks para pasar valores a las consultas SQL de forma segura.
  • Puedo describir dos estrategias para evitar el agotamiento de conexiones en la base de datos cuando muchas tareas de Airflow acceden concurrentemente.
  • Reconozco el riesgo de usar XCom para datos grandes y puedo proponer una alternativa para pasar datos voluminosos entre tareas (e.g., rutas en S3).
  • He depurado un error común de conexión verificando el `conn_id`, los parámetros de la conexión y los logs de ejecución de la tarea en Airflow.
Falar no WhatsApp
Laboratorio de práctica

Antes de marcar esta lección como completa, escribí una evidencia breve para Apache Airflow: Orquestación de Pipelines ETL Complejos con DAGs: un ejemplo, una decisión, una captura, una mini demo o una nota que puedas reutilizar en portfolio.

Reflexión rápida

¿Qué cambiarías en tu forma de trabajar después de aplicar conectar airflow a bases de datos (postgresql, mysql)?

De lección a portfolio

Convertí esta lección en evidencia para Data Analyst.

Sumá un mini caso con datos, una conclusión de negocio y una captura del resultado. Eso pesa más que decir que viste la herramienta.

Paso 1

Publicá una consulta, dashboard o notebook con una conclusión clara.

Paso 2

Agregá contexto: problema, dato usado, decisión recomendada y limitación.

Paso 3

Guardá el enlace en tu CV, LinkedIn o portfolio antes de postular.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión

Conectar Airflow a Bases de Datos (PostgreSQL,... | Cursalo