Práctica: Implementar un DAG con Manejo de Errores y Notificaciones

Lectura
30 min~11 min lectura

Práctica: Implementar un DAG con Manejo de Errores y Notificaciones

En esta lección práctica, integraremos todos los conceptos teóricos sobre robustez en Apache Airflow para construir un DAG de producción. Un pipeline ETL no es útil si falla silenciosamente en mitad de la noche. Por lo tanto, la capacidad de manejar errores de forma elegante y notificar a los equipos responsables es tan crítica como la lógica de transformación de datos en sí. Pasaremos de un DAG básico a uno resiliente, que no solo realiza un trabajo, sino que también se comunica su estado y se recupera de contratiempos de manera predefinida.

Implementaremos un caso de uso realista: un pipeline que extrae datos de ventas diarias de una API simulada, los transforma calculando métricas agregadas, los carga en un sistema de destino (simulado) y finalmente genera un reporte. Cada uno de estos pasos tiene puntos de fallo potenciales: la API puede no estar disponible, los datos pueden llegar en un formato inesperado, la conexión al almacén de destino puede caerse. Nuestro objetivo es anticipar estos escenarios, definir respuestas claras (como reintentos, rutas alternativas o fallos controlados) y asegurar que los humanos sean informados a través de canales como Slack o email cuando su intervención sea necesaria.

Concepto Clave: La Pirámide de Resiliencia en un DAG

Piensa en un DAG robusto como un sistema de seguridad de un edificio de alta tecnología. No solo tiene cerraduras en las puertas (la lógica básica), sino también sensores de movimiento (monitoreo), alarmas que suenan ante una intrusión (notificaciones), y protocolos para que el personal de seguridad sepa exactamente qué hacer y a quién llamar si la alarma se activa (manejo de errores). La resiliencia no es una característica única, sino una pirámide construida con varias capas. En la base están los reintentos y timeouts, que manejan fallos transitorios automáticamente. La siguiente capa son los callbacks, funciones que se disparan en eventos específicos como éxito o fallo, actuando como los sensores centrales. En la cúspide están las notificaciones externas y la lógica condicional dentro del DAG (como BranchPythonOperator), que permiten una respuesta compleja y humana al error.

La analogía clave es la diferencia entre un empleado que, ante un problema, se queda bloqueado mirando la pantalla, y un empleado que sigue un manual de procedimientos: primero intenta solucionarlo él mismo tres veces (reintentos), si no puede, registra el incidente en un sistema (log de error), activa una alerta para su supervisor (callback de fallo), y si es crítico, envía un mensaje al celular del responsable del área (notificación Slack/Email). Nuestro DAG debe ser ese empleado protocolizado y automatizado.

Cómo Funciona en la Práctica: Flujo de un DAG a Prueba de Fallos

Vamos a desglosar el flujo de ejecución de nuestro DAG práctico, paso a paso, destacando dónde y cómo se implementa la resiliencia. El proceso comienza con el scheduler de Airflow activando el DAG según su programación. Inmediatamente, se evalúan las dependencias y se pone en cola la primera tarea, extract_sales_data. Esta tarea tiene configurados retries=3 y un retry_delay de 5 minutos. Si la llamada a la API falla por un error de conexión (transitorio), Airflow no marcará la tarea como fallida inmediatamente; en su lugar, la reintentará hasta tres veces, esperando 5 minutos entre cada intento. Solo si los tres reintentos fracasan, la tarea pasa al estado 'failed'.

El cambio de estado a 'failed' es un evento crítico. Aquí es donde entran en juego los callbacks. Hemos definido una función personalizada alert_on_failure en el nivel del DAG, que se adjunta al parámetro on_failure_callback. Cuando cualquier tarea del DAG falle, Airflow ejecutará automáticamente esta función. Su trabajo es recopilar información del contexto (como el ID de la tarea, la fecha de ejecución y el log de error) y enviarla a un canal de Slack usando un Webhook. Paralelamente, el DAG puede tener configurados alertas por email a nivel de tarea o DAG, proporcionando una segunda capa de notificación.

Pero el manejo de errores no solo se trata de fallos catastróficos. ¿Qué pasa si la extracción es exitosa, pero el archivo de datos viene vacío? Para esto, implementamos una comprobación de calidad de datos dentro de la propia tarea de transformación. Si los datos no superan un umbral mínimo (por ejemplo, menos de 10 registros), la tarea lanzará una excepción de tipo ValueError de manera intencionada. Esto provocará un fallo controlado, que a su vez activará los mecanismos de reintento y notificación, alertándonos de un problema en los datos de origen, no en la infraestructura. Este es un patrón esencial: usar excepciones para gobernar el flujo de negocio ante condiciones inaceptables.

Código en Acción: DAG de Ventas con Slack Alerts

A continuación, presentamos el DAG completo, listo para ser desplegado. Incluye operadores Python, configuración de reintentos, callbacks, y la integración con Slack. Asegúrate de tener configurada la conexión de Slack (conn_id: 'slack_default') en tu entorno de Airflow previamente.


from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.exceptions import AirflowException
import requests
import pandas as pd
import json
import logging

# Configuración de alerta por fallo (Callback)
def alert_on_failure(context):
    """Envía una notificación a Slack cuando una tarea falla."""
    slack_msg = f"""
    :red_circle: *Tarea Fallida en DAG de Ventas*
    *DAG:* {context.get('task_instance').dag_id}
    *Tarea:* {context.get('task_instance').task_id}
    *Fecha de Ejecución:* {context.get('execution_date')}
    *Log URL:* {context.get('task_instance').log_url}
    """
    failed_alert = SlackWebhookOperator(
        task_id='slack_failed_alert',
        slack_webhook_conn_id='slack_default',
        message=slack_msg
    )
    return failed_alert.execute(context=context)

# Argumentos por defecto para el DAG
default_args = {
    'owner': 'equipo_etl',
    'depends_on_past': False,
    'start_date': datetime(2023, 10, 1),
    'email_on_failure': True,
    'email': ['[email protected]'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'on_failure_callback': alert_on_failure  # Callback a nivel de DAG
}

# Definición del DAG
with DAG(
    'ventas_diarias_robusto',
    default_args=default_args,
    description='ETL diario de ventas con manejo de errores y notificaciones',
    schedule_interval='0 2 * * *',  # Ejecución diaria a las 2 AM
    catchup=False,
    tags=['ventas', 'produccion'],
) as dag:

    # Tarea 1: Extracción
    def extract(**kwargs):
        logging.info("Iniciando extracción de datos de ventas...")
        api_url = "https://api.ejemplo.com/v1/sales"
        try:
            response = requests.get(api_url, timeout=30)
            response.raise_for_status()  # Lanza excepción para códigos 4xx/5xx
            data = response.json()
            if not data.get('sales'):
                raise ValueError("La API respondió con una lista de ventas vacía.")
            # Empujamos los datos al XCom para la siguiente tarea
            kwargs['ti'].xcom_push(key='sales_data', value=data)
            logging.info(f"Extracción exitosa. {len(data['sales'])} registros obtenidos.")
        except requests.exceptions.RequestException as e:
            logging.error(f"Error de conexión con la API: {e}")
            raise AirflowException(f"Fallo en extracción: {e}")
        except ValueError as e:
            logging.error(f"Error en los datos recibidos: {e}")
            raise

    task_extract = PythonOperator(
        task_id='extract_sales_data',
        python_callable=extract,
        retries=2,  # Sobreescribe los 3 globales, solo 2 reintentos para esta tarea
        retry_delay=timedelta(minutes=2),
        execution_timeout=timedelta(seconds=45),  # Timeout específico para esta tarea
    )

    # Tarea 2: Transformación con Validación
    def transform(**kwargs):
        ti = kwargs['ti']
        raw_data = ti.xcom_pull(task_ids='extract_sales_data', key='sales_data')
        df = pd.DataFrame(raw_data['sales'])

        # VALIDACIÓN CRÍTICA DE CALIDAD DE DATOS
        if df.empty:
            raise ValueError("El DataFrame de ventas está completamente vacío. No se puede proceder.")
        if df['amount'].isnull().sum() > 0:
            logging.warning("Se encontraron valores nulos en 'amount'. Serán rellenados con 0.")
            df['amount'].fillna(0, inplace=True)

        # Lógica de transformación
        df['sale_date'] = pd.to_datetime(df['timestamp'])
        daily_metrics = df.groupby(df['sale_date'].dt.date).agg(
            total_ventas=('amount', 'sum'),
            promedio_venta=('amount', 'mean'),
            transacciones=('transaction_id', 'count')
        ).reset_index()

        # Otra validación: volumen mínimo de datos
        if len(daily_metrics) < 1:
            raise ValueError("Las métricas diarias no generaron resultados. Posible fallo en la agrupación.")

        metrics_dict = daily_metrics.to_dict('records')
        ti.xcom_push(key='daily_metrics', value=metrics_dict)
        logging.info(f"Transformación completada. Métricas para {len(metrics_dict)} días calculadas.")

    task_transform = PythonOperator(
        task_id='transform_and_validate',
        python_callable=transform,
        on_failure_callback=alert_on_failure,  # Callback adicional a nivel de tarea
    )

    # Tarea 3: Carga
    def load(**kwargs):
        ti = kwargs['ti']
        metrics = ti.xcom_pull(task_ids='transform_and_validate', key='daily_metrics')
        logging.info(f"Simulando carga de {len(metrics)} registros en el data warehouse...")
        # Simulación de una carga que puede fallar aleatoriamente
        import random
        if random.random() < 0.1:  # 10% de probabilidad de fallo simulado
            raise ConnectionError("Error simulado de conexión al warehouse.")
        logging.info("Carga simulada exitosa.")

    task_load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load,
        retries=1,
    )

    # Tarea 4: Notificación de Éxito (Slack)
    def success_notification(**kwargs):
        ti = kwargs['ti']
        metrics = ti.xcom_pull(task_ids='transform_and_validate', key='daily_metrics')
        total_ventas = sum(day['total_ventas'] for day in metrics)
        slack_msg = f"""
        :white_check_mark: *DAG de Ventas Completado Exitosamente*
        *Fecha de Ejecución:* {kwargs['execution_date']}
        *Total Ventas del Día:* ${total_ventas:,.2f}
        *Días Procesados:* {len(metrics)}
        """
        success_alert = SlackWebhookOperator(
            task_id='slack_success_alert',
            slack_webhook_conn_id='slack_default',
            message=slack_msg
        )
        return success_alert.execute(context=kwargs)

    task_notify_success = PythonOperator(
        task_id='notify_on_success',
        python_callable=success_notification,
        trigger_rule='all_success',  # Solo se ejecuta si TODAS las tareas anteriores tienen éxito
    )

    # Definición de las dependencias del DAG
    task_extract >> task_transform >> task_load >> task_notify_success

Este DAG demuestra una arquitectura de defensa en profundidad. La tarea de extracción tiene su propio manejo de excepciones para errores de API y timeouts. La transformación incluye validaciones de negocio que pueden provocar un fallo intencionado. La carga tiene una probabilidad de fallo simulada para probar los reintentos. Y, lo más importante, tanto el éxito como el fracasso activan notificaciones relevantes en Slack, manteniendo al equipo informado sin necesidad de consultar la UI de Airflow constantemente.

Errores Comunes y Cómo Evitarlos

Al implementar estos mecanismos, es fácil caer en ciertas trampas que pueden reducir su efectividad o incluso causar nuevos problemas. Aquí detallamos los más frecuentes:

1. Callbacks que Fallan Silenciosamente: Un error común es asumir que la función de callback (como alert_on_failure) siempre funcionará. Si esta función tiene un bug (por ejemplo, una referencia a una variable no definida) o la conexión de Slack no está configurada, el callback mismo fallará. Esta falla no detendrá el DAG, pero dejará al equipo sin la notificación crítica. Cómo evitarlo: Siempre incluye un bloque try-except robusto dentro de tu función de callback y registra (log) cualquier error interno. Considera enviar un email de último recurso si el canal principal (Slack) falla.

2. Reintentos Infinitos o Mal Configurados: Configurar retries sin un retry_delay adecuado o un execution_timeout puede llevar a tareas que consumen recursos indefinidamente. Por ejemplo, una tarea atascada en un bucle interno podría reiniciarse una y otra vez durante horas. Cómo evitarlo: Usa siempre un execution_timeout para limitar el tiempo máximo de ejecución de una tarea por intento. Combina un número de reintentos razonable (3-5) con un delay exponencial (usando retry_exponential_backoff) para no saturar sistemas externos que pueden estar teniendo problemas.

3. No Diferenciar entre Fallos Transitorios y Permanentes: Tratar un error de sintaxis en el código (permanente) de la misma manera que un timeout de red (transitorio) es ineficaz. El primero nunca se solucionará con reintentos, solo generará ruido. Cómo evitarlo: En tu lógica Python, captura excepciones específicas. Usa requests.exceptions.ConnectionError o Timeout para errores de red que justifiquen un reintento. Para errores de lógica o datos (ValueError, KeyError), decide si un reintento tiene sentido (a veces no lo tiene) o si debes fallar inmediatamente y notificar.

4. Olvidar el Trigger Rule en Notificaciones de Éxito: En nuestro DAG, la tarea notify_on_success usa trigger_rule='all_success'. Si se omite esta regla y una tarea anterior falla pero tiene un camino de bypass (por ejemplo, un BranchOperator), la notificación de éxito podría ejecutarse incorrectamente. Cómo evitarlo: Siempre especifica explícitamente el trigger_rule para las tareas de notificación. Para notificaciones de éxito, usa 'all_success'. Para notificaciones de fallo que deben enviarse incluso si algunas tareas se saltaron, podrías necesitar 'all_done'.

5. Sobrecarga de Alertas (Alert Fatigue): Configurar notificaciones por email para cada fallo de cada tarea en un DAG complejo puede inundar la bandeja de entrada del equipo, llevándolos a ignorar las alertas. Cómo evitarlo: Estratifica tus alertas. Usa callbacks a nivel de DAG para fallos críticos que detengan todo el pipeline. Para fallos en tareas individuales menos críticas, considera notificaciones solo después de que se agoten todos los reintentos, o agrega un sistema de agregación que envíe un reporte de salud diario en lugar de alertas en tiempo real por cada incidente menor.

Checklist de Dominio

Antes de considerar esta lección completamente asimilada y lista para aplicar en producción, verifica que puedes afirmar y demostrar cada uno de los siguientes puntos:

  • Puedo configurar parámetros de reintento (retries, retry_delay) tanto a nivel de DAG (default_args) como a nivel de tarea individual, y explicar cuál tiene precedencia.
  • Sé definir y adjuntar una función de callback (usando on_failure_callback o on_success_callback) que capture el contexto de ejecución y envíe una notificación útil a un canal externo como Slack o Email.
  • Comprendo la importancia de establecer un execution_timeout para las tareas y puedo implementarlo para prevenir tareas "zombie" que consumen recursos indefinidamente.
  • Puedo escribir código dentro de una PythonOperator que realice validaciones de calidad de datos (como verificar que un DataFrame no esté vacío) y lance excepciones estándar de Python (ValueError, AirflowException) para forzar un fallo controlado del task.
  • Sé configurar correctamente el parámetro trigger_rule en una tarea de notificación para asegurar que se ejecute solo bajo las condiciones deseadas (por ejemplo, solo si todas las tareas anteriores tuvieron éxito).
  • Puedo explicar la diferencia entre un error transitorio (como un timeout de red) y uno permanente (como un error de sintaxis en el código) y diseñar la estrategia de reintentos y notificaciones apropiada para cada caso.
  • He probado mi DAG en un entorno controlado, forzando fallos deliberados (por ejemplo, desconectando la red o proporcionando datos corruptos) para verificar que los mecanismos de alerta y reintento funcionan como se espera.
  • Sé dónde y cómo revisar los logs detallados de Airflow cuando una tarea falla, para diagnosticar la causa raíz más allá de la notificación inicial.

Tip Crítico: La resiliencia no es "set and forget". Los mecanismos de manejo de errores y notificaciones deben ser probados periódicamente. Programa una "prueba de fuego" mensual en un entorno de staging donde puedas simular fallos en APIs, bases de datos y red para asegurar que las alertas llegan al equipo correcto y la documentación de respuesta a incidentes está actualizada. Un sistema de alertas que no se prueba, eventualmente fallará cuando más lo necesites.

De lección a portfolio

Convertí esta lección en una habilidad visible para entrevistas.

Guardá el curso, completá los ejercicios y conectá esta habilidad con una ruta de empleo, data, IA, programación o marketing.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión