Crear tu Primer DAG con Operadores Básicos

Video
25 min~4 min lectura

Reproductor de video

Concepto clave

Un DAG (Grafo Acíclico Dirigido) en Apache Airflow es la unidad fundamental de orquestación que define un flujo de trabajo de datos. Piensa en él como un plano arquitectónico para tu pipeline ETL: especifica qué tareas deben ejecutarse, en qué orden y bajo qué condiciones. La clave está en "acíclico" - las tareas no pueden crear bucles infinitos, asegurando que el flujo siempre avance hacia una conclusión.

Los operadores son los componentes que ejecutan el trabajo real dentro de un DAG. Cada operador representa una tarea única, como ejecutar un script Python, consultar una base de datos o copiar archivos. Airflow viene con operadores predefinidos para tareas comunes, pero también puedes crear los tuyos. La magia ocurre cuando conectas estos operadores con dependencias, creando un flujo de datos automatizado y confiable.

Cómo funciona en la práctica

Vamos a construir un DAG simple que descarga datos de una API, los procesa y los carga a una base de datos. Primero, necesitas entender la estructura básica:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def descargar_datos():
    # Lógica para descargar datos
    print("Datos descargados")

def procesar_datos():
    # Lógica para procesar
    print("Datos procesados")

def cargar_datos():
    # Lógica para cargar
    print("Datos cargados")

with DAG(
    dag_id="mi_primer_dag",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily"
) as dag:
    
    t1 = PythonOperator(
        task_id="descargar",
        python_callable=descargar_datos
    )
    
    t2 = PythonOperator(
        task_id="procesar",
        python_callable=procesar_datos
    )
    
    t3 = PythonOperator(
        task_id="cargar",
        python_callable=cargar_datos
    )
    
    t1 >> t2 >> t3

Este DAG se ejecutará diariamente desde el 1 de enero de 2024. Las tareas se ejecutan en secuencia: primero descarga, luego procesa, finalmente carga. Los operadores PythonOperator ejecutan funciones Python, pero Airflow tiene docenas de operadores para diferentes tecnologías.

Caso de estudio

Imagina que trabajas en un e-commerce que necesita actualizar diariamente el inventario. Cada noche, un sistema externo genera un archivo CSV con las ventas del día. Tu pipeline debe:

  1. Descargar el archivo CSV desde un servidor SFTP
  2. Validar que el archivo tenga el formato correcto
  3. Transformar los datos para calcular inventario actualizado
  4. Cargar los resultados a la base de datos de producción
  5. Enviar un reporte por email si hay productos con stock bajo

Con Airflow, crearías un DAG con estos operadores:

TareaOperadorDescripción
descargar_archivoSFTPOperatorDescarga CSV desde servidor remoto
validar_formatoPythonOperatorVerifica estructura del CSV
calcular_inventarioPythonOperatorProcesa datos y calcula nuevo inventario
actualizar_bdPostgresOperatorInserta datos en base de datos
enviar_alertaEmailOperatorEnvía email si stock < 10 unidades
En producción, siempre agrega manejo de errores y logging detallado. Un pipeline sin monitoreo es como volar a ciegas.

Errores comunes

  • Olvidar el start_date: Airflow necesita una fecha de inicio para programar ejecuciones. Si usas datetime.now(), el DAG nunca se ejecutará porque la fecha siempre está en el futuro.
  • Dependencias circulares: Crear tareas que dependen unas de otras en ciclo. Ejemplo: t1 >> t2 >> t3 >> t1. Airflow lo detectará y fallará.
  • No definir schedule_interval: Si no especificas cuándo debe ejecutarse el DAG, solo correrá manualmente. Para ejecución automática, define intervalos como "@daily" o "0 2 * * *" (cron).
  • Operadores bloqueantes: Usar operadores que no liberan recursos. En producción, usa operadores asíncronos o configura tiempos de espera apropiados.
  • Ignorar idempotencia: Las tareas deben poder ejecutarse múltiples veces sin efectos secundarios. Siempre diseña tus pipelines para ser idempotentes.

Checklist de dominio

  1. Puedo crear un DAG básico con al menos 3 tareas conectadas
  2. Entiendo la diferencia entre DAG, operador y tarea
  3. Sé cómo definir schedule_interval para ejecución automática
  4. Puedo usar al menos 2 tipos diferentes de operadores (PythonOperator y otro)
  5. Comprendo cómo Airflow maneja las dependencias entre tareas
  6. Sé dónde colocar mis DAGs para que Airflow los detecte
  7. Puedo verificar la validez de un DAG usando airflow dags list

Construye un DAG para procesamiento de logs diarios

En este ejercicio, crearás un DAG que procesa logs de aplicación diarios. Sigue estos pasos:

  1. Crea un nuevo archivo Python en la carpeta dags/ de tu instalación de Airflow
  2. Define un DAG llamado "procesar_logs_diarios" que se ejecute diariamente a las 3 AM
  3. Implementa tres tareas usando PythonOperator:
    • extraer_logs: Simula extraer logs del día anterior (puede ser una función que imprima "Extrayendo logs")
    • filtrar_errores: Filtra solo los logs de tipo ERROR (función que imprima "Filtrando errores")
    • generar_reporte: Crea un resumen de errores (función que imprima "Generando reporte")
  4. Establece las dependencias para que las tareas se ejecuten en el orden correcto
  5. Verifica tu DAG ejecutando: airflow dags list | grep procesar_logs_diarios
  6. Prueba una ejecución manual con: airflow dags test procesar_logs_diarios 2024-01-01

Una vez completado, tu DAG debería aparecer en la interfaz web de Airflow y poder ejecutarse automáticamente cada día.

Pistas
  • Recuerda importar los módulos necesarios: DAG, PythonOperator y datetime
  • Usa el operador >> para definir dependencias: tarea1 >> tarea2 >> tarea3
  • Asegúrate de que el dag_id en el DAG coincida con el nombre del archivo sin la extensión .py

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.