Monitoreo con Logs y Métricas en la Interfaz Web

Video
15 min~12 min lectura

Reproductor de video

Monitoreo con Logs y Métricas en la Interfaz Web de Airflow

La capacidad de monitorear el comportamiento de tus DAGs y tasks es lo que separa un pipeline experimental de uno listo para producción. Apache Airflow no solo automatiza la ejecución de flujos de trabajo, sino que proporciona una interfaz web rica en herramientas de observabilidad. Esta lección se adentra en el uso práctico de los logs y métricas integrados, enseñándote a transformar datos crudos de ejecución en información accionable para diagnosticar problemas, optimizar rendimiento y garantizar la confiabilidad de tus procesos ETL.

Mientras que la configuración y programación de DAGs establecen el "qué" y "cuándo", el monitoreo responde al "cómo" y "por qué". ¿Por qué falló esta tarea? ¿Cómo está afectando el rendimiento de mi base de datos al pipeline? La interfaz web de Airflow centraliza esta información, ofreciendo vistas detalladas de logs de ejecución, métricas de duración, estados en tiempo real y un histórico completo. Dominar esta interfaz te permitirá pasar de reaccionar ante fallos a predecirlos y prevenirlos, asegurando que tus pipelines de datos sean robustos y transparentes.

Concepto Clave: Los Pilares de la Observabilidad en Airflow

En el contexto de Airflow, la observabilidad se sustenta en tres pilares principales: logs, métricas y el estado. Los logs son el registro textual detallado de lo que sucedió durante la ejecución de una tarea. Son tu primera línea de defensa para la depuración, mostrando mensajes de error de Python, salidas de comandos de shell, y prints de tu propio código. Las métricas son datos numéricos agregados sobre el comportamiento del sistema y los DAGs, como el tiempo de ejecución, la latencia o el número de tareas exitosas. Finalmente, el estado (success, failed, running, queued) proporciona una visión general instantánea de la salud del pipeline.

Una analogía útil es pensar en un coche de carreras. El estado es el tablero principal: te dice si el motor está encendido, la velocidad y las revoluciones. Las métricas son el análisis post-carrera: el tiempo por vuelta, la velocidad media, el consumo de combustible. Los logs son la grabación detallada de la caja negra y las comunicaciones por radio del equipo: cada instrucción del ingeniero, cada sonido del motor, cada ajuste realizado. Para entender por qué no ganaste la carrera (o por qué falló tu DAG), necesitas combinar las tres perspectivas: el estado te alertó del problema, las métricas te muestran su impacto en el rendimiento, y los logs te revelan la causa raíz.

Tip del Experto: No confundas monitoreo con alertas. El monitoreo es la capacidad de observar; las alertas son la notificación proactiva basada en umbrales definidos en ese monitoreo. Primero domina la interfaz de observación, luego automatiza las alertas.

Cómo Funciona en la Práctica: Un Recorrido por la Interfaz

La interfaz web de Airflow (accesible normalmente en http://localhost:8080) está organizada en menús superiores. Para el monitoreo, los más críticos son DAGs, Browse y Admin. En la vista principal de DAGs, obtienes una visión de conjunto: estado del último run, frecuencia de ejecución y un gráfico de latencia. Al hacer clic en un DAG específico, accedes a su vista detallada con pestañas clave: Tree View (una representación cronológica de todas las ejecuciones), Graph View (la dependencia lógica de las tareas para una ejecución concreta), Code (para revisar la definición actual) y Gantt (que visualiza la duración de cada tarea, identificando cuellos de botella).

El verdadero poder diagnóstico se encuentra en la vista de tareas individuales. Desde el Tree View o el Graph View, haz clic en un cuadro de tarea. Esto abre un menú contextual con acciones como "Log", "Task Instance", "Rendered Template" y "Run". Al seleccionar Log, se cargará la interfaz de logs. Aquí podrás ver los logs generados por el ejecutor, desglosados por intento (si la tarea se reintentó). Puedes cambiar el nivel de log (INFO, WARNING, ERROR) y la visualización (por archivo plano o con saltos de línea). Para métricas más avanzadas, como el uso de CPU o memoria durante la ejecución de una tarea, necesitarás integrar Airflow con herramientas externas como StatsD/Prometheus o utilizar hooks de logging personalizados.

Código en Acción: Instrumentando un DAG para un Monitoreo Efectivo

Un monitoreo efectivo comienza con un código bien instrumentado. Airflow captura automáticamente logs estándar, pero puedes enriquecerlos enormemente utilizando el contexto de la tarea y emitiendo logs estructurados. El siguiente ejemplo muestra un DAG ETL que extrae datos de una API, los transforma y los carga en una base de datos, incorporando buenas prácticas de logging y generando métricas personalizadas.


import json
import requests
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.log.logging_mixin import LoggingMixin
import logging

# Configurar logger específico para el DAG
task_logger = logging.getLogger("airflow.task")

default_args = {
    'owner': 'equipo_etl',
    'depends_on_past': False,
    'start_date': datetime(2023, 10, 1),
    'email_on_failure': True,
    'email': ['[email protected]'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def extract_data(**context):
    """
    Tarea de extracción con logging detallado y métrica de conteo.
    """
    task_logger.info("Iniciando extracción de datos desde API externa.")
    api_url = "https://api.ejemplo.com/data"
    try:
        response = requests.get(api_url, timeout=30)
        response.raise_for_status()  # Lanza error para códigos 4xx/5xx
        data = response.json()
        record_count = len(data)
        task_logger.info(f"Extracción exitosa. Se obtuvieron {record_count} registros.")
        
        # Log estructurado para fácil parsing (ej. para sistemas como ELK)
        task_logger.info(json.dumps({
            "event": "extraction_complete",
            "record_count": record_count,
            "api_url": api_url,
            "dag_run_id": context['run_id']
        }))
        
        # Empujar el dato y el conteo al XCom para uso en downstream tasks
        context['ti'].xcom_push(key='raw_data', value=data)
        context['ti'].xcom_push(key='record_count', value=record_count)
        
    except requests.exceptions.RequestException as e:
        task_logger.error(f"Fallo en la extracción de la API: {e}", exc_info=True)
        raise  # Relanzar la excepción para que Airflow marque la tarea como failed

def transform_data(**context):
    """
    Tarea de transformación con logging de progreso y detección de anomalías.
    """
    task_logger.info("Iniciando transformación de datos.")
    raw_data = context['ti'].xcom_pull(task_ids='extract', key='raw_data')
    
    if not raw_data:
        task_logger.error("No se recibieron datos para transformar. Fallando tarea.")
        raise ValueError("Datos de entrada vacíos.")
    
    transformed_data = []
    for i, record in enumerate(raw_data):
        # Ejemplo de transformación: limpieza y cálculo
        try:
            record['valor_limpio'] = float(record.get('valor', 0))
            record['fecha_procesada'] = datetime.now().isoformat()
            transformed_data.append(record)
        except (ValueError, KeyError) as e:
            task_logger.warning(f"Registro {i} con ID {record.get('id')} no pudo ser transformado: {e}")
            continue  # Skipea registros problemáticos, decide según tu caso
    
    final_count = len(transformed_data)
    dropped_records = len(raw_data) - final_count
    task_logger.info(f"Transformación completada. {final_count} registros procesados, {dropped_records} descartados.")
    
    # Métrica de calidad de datos
    if dropped_records > len(raw_data) * 0.1:  # Si se descarta más del 10%
        task_logger.warning(f"Alta tasa de descarte: {dropped_records/len(raw_data):.1%}. Revisar calidad de fuente.")
    
    context['ti'].xcom_push(key='transformed_data', value=transformed_data)

with DAG(
    'etl_monitoreo_avanzado',
    default_args=default_args,
    description='Un DAG ETL con logging y métricas robustas',
    schedule_interval='@daily',
    catchup=False,
    tags=['etl', 'monitoring'],
) as dag:

    create_staging_table = PostgresOperator(
        task_id='create_staging_table',
        postgres_conn_id='mi_postgres',
        sql="""
            CREATE TABLE IF NOT EXISTS staging_ventas (
                id SERIAL PRIMARY KEY,
                producto_id VARCHAR,
                valor_limpio DECIMAL,
                fecha_procesada TIMESTAMP,
                fecha_carga TIMESTAMP DEFAULT NOW()
            );
        """
    )

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
        provide_context=True,
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
        provide_context=True,
    )

    load = PostgresOperator(
        task_id='load',
        postgres_conn_id='mi_postgres',
        sql="""
            INSERT INTO staging_ventas (producto_id, valor_limpio, fecha_procesada)
            VALUES (
                '{{ ti.xcom_pull(task_ids='transform', key='transformed_data')[0]['producto_id'] }}',
                {{ ti.xcom_pull(task_ids='transform', key='transformed_data')[0]['valor_limpio'] }},
                '{{ ti.xcom_pull(task_ids='transform', key='transformed_data')[0]['fecha_procesada'] }}'
            );
            -- NOTA: En un caso real, iterarías sobre todos los registros en transformed_data.
        """
    )

    create_staging_table >> extract >> transform >> load

Este DAG demuestra varias técnicas clave. Primero, utiliza un logger específico (task_logger) en lugar de prints simples. Segundo, incluye logs estructurados en JSON, que son invaluable para sistemas centralizados de logs. Tercero, valida la entrada en las tareas y registra advertencias (logging.warning) para anomalías sin fallar inmediatamente. Cuarto, utiliza XCom para pasar no solo los datos, sino también métricas como el conteo de registros entre tareas, lo que puede ser inspeccionado en la interfaz web bajo la pestaña "XCom" de la instancia de la tarea.

Navegando y Interpretando los Logs en la Interfaz Web

Una vez que el DAG se ejecuta, la riqueza de la información generada por nuestro código se materializa en la interfaz. Supongamos que la tarea transform emite una advertencia por alta tasa de descarte. Para investigar, iríamos al Tree View del DAG, haríamos clic en el cuadrado correspondiente a la tarea transform en la ejecución deseada, y seleccionaríamos "Log". La vista mostrará los logs agrupados por intento. Veríamos nuestras líneas INFO con los mensajes de inicio y finalización, y la línea WARNING destacada.

La interfaz permite cambiar la visualización a "Mostrar en formato de archivo plano", útil para copiar y pegar, y también permite descargar el log completo. Un patrón crítico a buscar son los tracebacks de Python, que aparecen en nivel ERROR. Airflow los formatea claramente, mostrando el archivo, línea y el stack completo de la excepción. Además, si has configurado un backend remoto de logs (como S3, GCS, o Elasticsearch), la interfaz integrará un enlace para acceder a los logs completos almacenados allí, lo cual es esencial para logs muy largos o para retención a largo plazo.


# Ejemplo de cómo se vería un error crítico en los logs (extraído de la vista web):
"""
[2023-10-05, 10:15:33 UTC] {taskinstance.py:1135} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 171, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 189, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/etl_dag.py", line 42, in extract_data
    response.raise_for_status()
  File "/usr/local/lib/python3.9/site-packages/requests/models.py", line 1021, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 404 Client Error: Not Found for url: https://api.ejemplo.com/data
"""

Métricas y Visualizaciones: Más Allá de los Logs de Texto

Los logs te dicen el "qué", pero las métricas te ayudan a entender el "cuánto" y la tendencia. La interfaz web de Airflow ofrece varias vistas preconstruidas de métricas. La pestaña Gantt en la vista del DAG es una de las más poderosas. Visualiza la duración de cada tarea en una ejecución específica como una barra horizontal. De un vistazo, puedes identificar tareas que son consistentemente más lentas que otras (cuellos de botella), o detectar si una tarea comenzó con un retraso inusual porque su dependencia tardó más de lo esperado.

Otra fuente valiosa es la vista DAG Runs bajo el menú "Browse". Esta tabla muestra todas las ejecuciones de un DAG, incluyendo su estado, fecha de inicio/fin y duración total. Puedes ordenar por duración para encontrar las ejecuciones más lentas y correlacionarlas con eventos externos. Para métricas en tiempo real del scheduler y los ejecutores, Airflow puede integrarse con StatsD. Cuando está configurado, métricas como dagbag.size, scheduler.heartbeat, y executor.open_slots se emiten y pueden ser recogidas por Prometheus y visualizadas en Grafana, dando una visión de la salud de la plataforma Airflow en sí misma, no solo de los DAGs.


# Ejemplo de configuración en airflow.cfg para habilitar métricas StatsD (fragmento)
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

Errores Comunes y Cómo Evitarlos

1. Logs excesivamente verbosos o vacíos: Un error común es loguear cada iteración de un bucle sobre miles de elementos, generando logs ilegibles de megabytes, o por el contrario, no loguear nada y solo confiar en las excepciones. Solución: Implementa un nivel de log adecuado. Usa DEBUG para detalles finos (activado solo durante troubleshooting), INFO para eventos importantes del proceso (inicio, fin, conteos), WARNING para situaciones anómalas recuperables, y ERROR solo para fallos. Emplea logueo estructurado para eventos complejos.

2. No contextualizar los logs: Encontrar un mensaje de error como "Connection failed" en un log sin saber a qué conexión se refiere o en qué ejecución del DAG ocurrió es frustrante. Solución: Siempre incluye variables de contexto clave en tus mensajes de log. Usa el run_id, el task_id, nombres de archivos, IDs de registros o parámetros de conexión. El contexto de Airflow está disponible en el parámetro **context de las funciones PythonOperator.

3. Ignorar las métricas de duración (Gantt Chart): Muchos ingenieros solo revisan los logs cuando algo falla, pero pasan por alto la pestaña Gantt. Solución: Haz del Gantt una parte regular de tu revisión post-ejecución. Establece una línea base de duración para tus tareas críticas. Si una tarea comienza a desviarse consistentemente de su duración habitual (más lenta o más rápida), investiga. Puede ser un indicador de degradación de rendimiento en la fuente de datos, la red o la base de datos destino.

4. No configurar un backend de logs remoto: Confiar únicamente en los logs locales del contenedor o pod de Airflow es un riesgo. Si el contenedor se reinicia o la tarea se re-ejecuta en un worker diferente, los logs del intento anterior pueden perderse. Solución: Configura un backend de logs remoto desde el inicio, incluso en desarrollo. Usa S3, Google Cloud Storage, Azure Blob Storage o Elasticsearch. Esto garantiza persistencia, centralización y facilita el análisis a largo plazo.

5. No utilizar etiquetas (Tags) en los DAGs: En un entorno con decenas o cientos de DAGs, encontrar los relevantes para monitorear un proceso de negocio específico se vuelve difícil. Solución: Asigna etiquetas estratégicas a tus DAGs (ej: ['finance', 'monthly_report', 'postgres']). Luego, en la vista principal de DAGs, puedes filtrar por estas etiquetas para crear "vistas" lógicas de monitoreo agrupadas por dominio, criticidad o tecnología.

Checklist de Dominio

Antes de considerar que dominas el monitoreo con logs y métricas en la interfaz web de Airflow, verifica que puedes realizar las siguientes acciones de manera consistente:

  • Localizar y descargar los logs de cualquier tarea fallida en una ejecución histórica de un DAG, identificando la línea de código y la excepción que causó el error.
  • Interpretar un gráfico de Gantt para identificar el cuello de botella de rendimiento en una ejecución de DAG y proponer una optimización basada en esa visualización.
  • Configurar y utilizar un logger personalizado dentro de un PythonOperator para emitir mensajes en niveles DEBUG, INFO, WARNING y ERROR, incluyendo datos estructurados en JSON.
  • Explicar la diferencia entre la pestaña "Log" y la pestaña "XCom" de una instancia de tarea, y saber extraer información valiosa de ambas para la depuración.
  • Filtrar la vista de DAGs por etiquetas para aislar un subconjunto de pipelines relacionados para su monitoreo.
  • Configurar correctamente el parámetro provide_context=True en un PythonOperator y utilizar al menos dos variables del contexto (como run_id o ds) dentro de los mensajes de log.
  • Describir al menos un beneficio de configurar un backend de logs remoto (S3, GCS, Elasticsearch) frente a depender solo de los logs locales del worker.
De lección a portfolio

Convertí esta lección en una habilidad visible para entrevistas.

Guardá el curso, completá los ejercicios y conectá esta habilidad con una ruta de empleo, data, IA, programación o marketing.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión