Concepto clave
Un DAG (Grafo Acíclico Dirigido) en Apache Airflow es la unidad fundamental para definir workflows de datos. Piensa en él como un plano arquitectónico para tu pipeline ETL: especifica qué tareas deben ejecutarse, en qué orden y bajo qué condiciones, pero sin definir el momento exacto de ejecución (eso lo maneja el scheduler).
En la práctica, un DAG es un archivo Python que describe dependencias entre tareas usando operadores. La analogía perfecta es una receta de cocina: los ingredientes (datos) y pasos (tareas) están definidos, pero el cocinero (Airflow) decide cuándo y cómo ejecutarlos según los recursos disponibles.
Cómo funciona en la práctica
Para crear tu primer DAG, sigue estos pasos fundamentales:
- Define el objeto DAG con parámetros básicos como
dag_id,schedule_intervalystart_date - Crea tareas usando operadores (como
PythonOperatorpara scripts Python) - Establece dependencias entre tareas con los operadores
>>o<< - Guarda el archivo en la carpeta
dags/de tu instalación Airflow
Ejemplo básico:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def mi_funcion():
print("Ejecutando mi script Python")
dag = DAG(
dag_id="mi_primer_dag",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily"
)
tarea = PythonOperator(
task_id="ejecutar_script",
python_callable=mi_funcion,
dag=dag
)Caso de estudio
Imagina que trabajas en un e-commerce y necesitas procesar diariamente las ventas del día anterior. Tu DAG podría incluir:
| Tarea | Operador | Propósito |
|---|---|---|
| Extraer ventas | PythonOperator | Leer datos de la base de datos SQL |
| Transformar datos | PythonOperator | Calcular métricas como ingresos totales |
| Cargar reporte | PythonOperator | Guardar resultados en un data warehouse |
En producción, este DAG se ejecutaría automáticamente cada día a las 2 AM, procesando las ventas del día anterior sin intervención manual.
Errores comunes
- Start date en el futuro: Si defines
start_datecon una fecha futura, Airflow no ejecutará el DAG hasta que llegue esa fecha. Usa fechas pasadas para testing. - Dependencias circulares: Crear un ciclo en las dependencias de tareas (A → B → A) causará errores. Verifica que tu grafo sea realmente acíclico.
- Scripts con errores de importación: Si tu script Python usa librerías no instaladas en el entorno de Airflow, fallará. Usa entornos virtuales o contenedores Docker.
- Permisos incorrectos: El archivo DAG debe tener permisos de lectura para el usuario que ejecuta Airflow (normalmente
airflow:airflow).
Checklist de dominio
- ✓ Puedo crear un DAG básico con al menos dos tareas
- ✓ Entiendo la diferencia entre
dag_idytask_id - ✓ Sé configurar correctamente
start_dateyschedule_interval - ✓ Puedo definir dependencias entre tareas usando
>> - ✓ Verifico que mi archivo DAG esté en la carpeta correcta (
AIRFLOW_HOME/dags) - ✓ Comprendo cómo Airflow detecta automáticamente nuevos DAGs
- ✓ Sé revisar los logs cuando una ejecución falla
Crear un DAG para procesamiento diario de logs
Objetivo
Crea un DAG que simule el procesamiento diario de logs de una aplicación web.
Pasos
- Crea un archivo Python llamado
procesar_logs_dag.pyen tu carpetadags/ - Define un DAG con ID
procesamiento_logs_diarioque se ejecute diariamente a partir del 1 de enero de 2024 - Implementa tres funciones Python:
extraer_logs(): Simula la extracción de logs (puede imprimir un mensaje)filtrar_errores(): Simula filtrar logs con nivel ERRORgenerar_reporte(): Simula crear un reporte con estadísticas
- Crea tres tareas usando
PythonOperator, una para cada función - Establece las dependencias: extraer → filtrar → generar
- Activa el DAG en la interfaz web de Airflow y verifica su ejecución
Requisitos técnicos
- Usa
schedule_interval="0 2 * * *"para ejecución diaria a las 2 AM - Asegúrate de que cada tarea tenga un
task_idúnico y descriptivo - Incluye manejo básico de errores en las funciones (try-except)
- Recuerda importar todos los módulos necesarios al inicio del archivo
- Verifica que las funciones estén definidas antes de crear los PythonOperator
- Usa
dag.test()para probar tu DAG sin necesidad del scheduler
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.