Crear tu Primer DAG para un Pipeline Simple

Video
25 min~5 min lectura

Reproductor de video

Concepto clave

Un DAG (Directed Acyclic Graph) en Apache Airflow es la unidad fundamental para definir workflows de datos. Imagina que estás organizando una cadena de producción en una fábrica: cada paso (como cortar, ensamblar, pintar) es una tarea, y el DAG es el plano que define el orden y las dependencias entre ellas, asegurando que no haya ciclos (como pintar antes de ensamblar). En el contexto de pipelines ETL, un DAG orquesta extracciones, transformaciones y cargas de datos de manera automatizada y programable.

La clave de Airflow es que los DAGs son definidos en código Python, lo que permite versionamiento, pruebas y reutilización. A diferencia de herramientas visuales donde arrastras bloques, aquí escribes la lógica del pipeline como un script, dando control total sobre la ejecución. Esto es especialmente útil para pipelines complejos que requieren manejo de errores, reintentos y monitoreo detallado.

Cómo funciona en la práctica

Para crear tu primer DAG, sigue estos pasos en un entorno de Airflow configurado:

  1. Define un archivo Python en la carpeta dags/ de Airflow (por ejemplo, mi_primer_dag.py).
  2. Importa los módulos necesarios: from airflow import DAG y from airflow.operators.python import PythonOperator.
  3. Crea un objeto DAG con parámetros como dag_id, schedule_interval (ej., @daily), y start_date.
  4. Define funciones Python que representen tareas, como una función que simule extraer datos de una API.
  5. Instancia operadores (como PythonOperator) para cada tarea, vinculándolos al DAG y a las funciones.
  6. Establece dependencias usando el operador >> (ej., tarea1 >> tarea2) para indicar el flujo.

Ejemplo básico de código:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extraer_datos():
    print("Extrayendo datos...")

def transformar_datos():
    print("Transformando datos...")

with DAG(
    dag_id="primer_pipeline",
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False
) as dag:
    tarea_extraer = PythonOperator(
        task_id="extraer",
        python_callable=extraer_datos
    )
    tarea_transformar = PythonOperator(
        task_id="transformar",
        python_callable=transformar_datos
    )
    tarea_extraer >> tarea_transformar

Caso de estudio

Supongamos que trabajas en una empresa de e-commerce y necesitas un pipeline diario para analizar ventas. El DAG debe:

  1. Extraer datos de ventas de una base de datos SQL (ej., PostgreSQL) cada día a las 6 AM.
  2. Transformar los datos calculando métricas como ventas totales y promedio por producto.
  3. Cargar los resultados en un almacén de datos (ej., Amazon Redshift) para reportes.

En la práctica, esto se implementa con operadores específicos (como PostgresOperator para extracción y RedshiftOperator para carga), pero el concepto de DAG permanece igual. Un error común aquí es no manejar conexiones de base de datos correctamente en Airflow, lo que se soluciona configurando Connections en la interfaz web.

En proyectos reales, los DAGs suelen manejar terabytes de datos, por lo que la eficiencia en las tareas es crítica. Usa operadores nativos de Airflow cuando sea posible para aprovechar características como reintentos automáticos.

Errores comunes

  • Definir start_date en el futuro o de forma dinámica: Esto puede causar que el DAG no se ejecute. Usa fechas fijas pasadas y evita funciones como datetime.now().
  • Olvidar catchup=False: Si no se establece, Airflow intentará ejecutar todas las instancias pasadas desde start_date, saturando el sistema. Configúralo según la necesidad del pipeline.
  • No probar tareas localmente: Antes de desplegar, ejecuta airflow tasks test para verificar que cada tarea funcione sin errores de sintaxis o lógica.
  • Usar recursos intensivos en el scheduler: Evita tareas pesadas en la definición del DAG (fuera de los operadores), ya que el scheduler lee el archivo frecuentemente, lo que puede ralentizar Airflow.
  • Ignorar el manejo de dependencias externas: Si una tarea depende de un archivo o API externa, incluye verificaciones o use sensores (Sensor operators) para esperar disponibilidad.

Checklist de dominio

  • Puedo crear un archivo DAG básico en Python con al menos dos tareas y dependencias.
  • Entiendo cómo configurar schedule_interval para ejecuciones periódicas (ej., diarias o horarias).
  • Sé cómo probar una tarea individual usando el comando airflow tasks test en la terminal.
  • Puedo identificar y evitar ciclos en las dependencias de tareas (asegurando que el grafo sea acíclico).
  • Conozco la diferencia entre operadores (como PythonOperator vs. BashOperator) y cuándo usar cada uno.
  • Sé cómo monitorear la ejecución del DAG en la interfaz web de Airflow (ej., ver logs y estados).
  • Puedo explicar el impacto de catchup en la ejecución histórica de pipelines.

Implementa un DAG para procesar logs de servidor

En este ejercicio, crearás un DAG simple que simula un pipeline ETL para logs de servidor. Sigue estos pasos:

  1. Asegúrate de tener Apache Airflow instalado y corriendo localmente (puedes usar Docker o una instalación directa).
  2. En la carpeta dags/ de tu instalación de Airflow, crea un nuevo archivo Python llamado procesar_logs.py.
  3. Define un DAG con ID procesamiento_logs, programado para ejecutarse cada hora (schedule_interval='@hourly'), con start_date fijado a ayer y catchup=False.
  4. Crea tres tareas usando PythonOperator:
    • leer_logs: Una función que imprime "Leyendo archivo de logs..." y simula leer un archivo de texto.
    • filtrar_errores: Una función que imprime "Filtrando errores del log..." y simula extraer líneas con la palabra 'ERROR'.
    • guardar_resultados: Una función que imprime "Guardando resultados en base de datos..." y simula insertar datos en SQL.
  5. Establece las dependencias para que las tareas se ejecuten en orden: primero leer_logs, luego filtrar_errores, y finalmente guardar_resultados.
  6. Guarda el archivo y verifica en la interfaz web de Airflow que el DAG aparezca y esté activo. Ejecuta una instancia manualmente para probarlo.

Entrega: Proporciona el código completo del archivo procesar_logs.py y una captura de pantalla de la ejecución exitosa en Airflow.

Pistas
  • Recuerda importar los módulos necesarios al inicio del archivo, como DAG y PythonOperator.
  • Usa el operador >> para definir dependencias entre tareas, por ejemplo: tarea1 >> tarea2 >> tarea3.
  • Si el DAG no aparece en la interfaz web, verifica que el archivo esté en la carpeta correcta y que Airflow esté recargando DAGs (default cada 30 segundos).

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.