Práctica: Crear tu Primer DAG que Ejecuta un Script Python

Lectura
25 min~3 min lectura

Concepto clave

Un DAG (Grafo Acíclico Dirigido) en Apache Airflow es la unidad fundamental para definir workflows de datos. Piensa en él como un plano arquitectónico para tu pipeline ETL: especifica qué tareas deben ejecutarse, en qué orden y bajo qué condiciones, pero sin definir el momento exacto de ejecución (eso lo maneja el scheduler).

En la práctica, un DAG es un archivo Python que describe dependencias entre tareas usando operadores. La analogía perfecta es una receta de cocina: los ingredientes (datos) y pasos (tareas) están definidos, pero el cocinero (Airflow) decide cuándo y cómo ejecutarlos según los recursos disponibles.

Cómo funciona en la práctica

Para crear tu primer DAG, sigue estos pasos fundamentales:

  1. Define el objeto DAG con parámetros básicos como dag_id, schedule_interval y start_date
  2. Crea tareas usando operadores (como PythonOperator para scripts Python)
  3. Establece dependencias entre tareas con los operadores >> o <<
  4. Guarda el archivo en la carpeta dags/ de tu instalación Airflow

Ejemplo básico:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def mi_funcion():
    print("Ejecutando mi script Python")

dag = DAG(
    dag_id="mi_primer_dag",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily"
)

tarea = PythonOperator(
    task_id="ejecutar_script",
    python_callable=mi_funcion,
    dag=dag
)

Caso de estudio

Imagina que trabajas en un e-commerce y necesitas procesar diariamente las ventas del día anterior. Tu DAG podría incluir:

TareaOperadorPropósito
Extraer ventasPythonOperatorLeer datos de la base de datos SQL
Transformar datosPythonOperatorCalcular métricas como ingresos totales
Cargar reportePythonOperatorGuardar resultados en un data warehouse

En producción, este DAG se ejecutaría automáticamente cada día a las 2 AM, procesando las ventas del día anterior sin intervención manual.

Errores comunes

  • Start date en el futuro: Si defines start_date con una fecha futura, Airflow no ejecutará el DAG hasta que llegue esa fecha. Usa fechas pasadas para testing.
  • Dependencias circulares: Crear un ciclo en las dependencias de tareas (A → B → A) causará errores. Verifica que tu grafo sea realmente acíclico.
  • Scripts con errores de importación: Si tu script Python usa librerías no instaladas en el entorno de Airflow, fallará. Usa entornos virtuales o contenedores Docker.
  • Permisos incorrectos: El archivo DAG debe tener permisos de lectura para el usuario que ejecuta Airflow (normalmente airflow:airflow).

Checklist de dominio

  • ✓ Puedo crear un DAG básico con al menos dos tareas
  • ✓ Entiendo la diferencia entre dag_id y task_id
  • ✓ Sé configurar correctamente start_date y schedule_interval
  • ✓ Puedo definir dependencias entre tareas usando >>
  • ✓ Verifico que mi archivo DAG esté en la carpeta correcta (AIRFLOW_HOME/dags)
  • ✓ Comprendo cómo Airflow detecta automáticamente nuevos DAGs
  • ✓ Sé revisar los logs cuando una ejecución falla

Crear un DAG para procesamiento diario de logs

Objetivo

Crea un DAG que simule el procesamiento diario de logs de una aplicación web.

Pasos

  1. Crea un archivo Python llamado procesar_logs_dag.py en tu carpeta dags/
  2. Define un DAG con ID procesamiento_logs_diario que se ejecute diariamente a partir del 1 de enero de 2024
  3. Implementa tres funciones Python:
    • extraer_logs(): Simula la extracción de logs (puede imprimir un mensaje)
    • filtrar_errores(): Simula filtrar logs con nivel ERROR
    • generar_reporte(): Simula crear un reporte con estadísticas
  4. Crea tres tareas usando PythonOperator, una para cada función
  5. Establece las dependencias: extraer → filtrar → generar
  6. Activa el DAG en la interfaz web de Airflow y verifica su ejecución

Requisitos técnicos

  • Usa schedule_interval="0 2 * * *" para ejecución diaria a las 2 AM
  • Asegúrate de que cada tarea tenga un task_id único y descriptivo
  • Incluye manejo básico de errores en las funciones (try-except)
Pistas
  • Recuerda importar todos los módulos necesarios al inicio del archivo
  • Verifica que las funciones estén definidas antes de crear los PythonOperator
  • Usa dag.test() para probar tu DAG sin necesidad del scheduler

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.