Concepto clave
Un DAG (Grafo Acíclico Dirigido) en Apache Airflow es la unidad fundamental que define un flujo de trabajo automatizado. Piensa en él como un plano arquitectónico para tus pipelines de datos, donde cada paso del proceso se representa como un Task (tarea) y las relaciones entre ellos como Dependencies (dependencias). La magia está en que Airflow no solo ejecuta estas tareas, sino que las Schedule (programa) automáticamente según cronogramas definidos, similar a cómo un sistema de riego inteligente activa diferentes zonas del jardín en momentos específicos, asegurando que cada planta reciba agua solo después de que se hayan cumplido ciertas condiciones.
La estructura de un DAG se compone de tres elementos esenciales: Tasks (las unidades de trabajo individuales, como extraer datos de una API), Dependencies (las reglas que determinan el orden de ejecución, como "transformar datos solo después de extraerlos"), y Scheduling (la configuración de cuándo y con qué frecuencia se ejecuta el DAG completo, por ejemplo, diariamente a medianoche). Esta combinación permite orquestar pipelines ETL complejos de manera confiable y repetible, eliminando la necesidad de intervención manual y reduciendo errores en procesos de datos críticos.
Cómo funciona en la práctica
Imagina que necesitas construir un pipeline ETL que procese ventas diarias. Primero, defines un DAG en Python con un nombre único y un intervalo de programación. Luego, creas tareas usando Operators (operadores) de Airflow, como PythonOperator para lógica personalizada o BashOperator para comandos de shell. Por ejemplo:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def extract_sales():
# Código para extraer datos de ventas
pass
def transform_sales():
# Código para transformar datos
pass
def load_sales():
# Código para cargar a una base de datos
pass
default_args = {
'owner': 'data_engineer',
'start_date': datetime(2023, 10, 1),
'retries': 2
}
dag = DAG('daily_sales_etl', default_args=default_args, schedule_interval='0 0 * * *')
extract_task = PythonOperator(task_id='extract_sales', python_callable=extract_sales, dag=dag)
transform_task = PythonOperator(task_id='transform_sales', python_callable=transform_sales, dag=dag)
load_task = PythonOperator(task_id='load_sales', python_callable=load_sales, dag=dag)
extract_task >> transform_task >> load_taskEn este ejemplo, el DAG daily_sales_etl se programa para ejecutarse diariamente a medianoche (schedule_interval='0 0 * * *'). Las tareas se definen con IDs únicos y se vinculan usando el operador >> para establecer dependencias: extract_task debe completarse antes de que transform_task comience, y así sucesivamente. Airflow gestiona automáticamente la ejecución, reintentos y monitoreo basándose en esta estructura.
Caso de estudio
Considera una empresa de e-commerce que necesita procesar datos de pedidos cada hora. El pipeline incluye: extraer pedidos recientes de una base de datos MySQL, transformarlos para calcular métricas como el valor promedio por pedido, y cargar los resultados en un data warehouse de Redshift para análisis. Usando Airflow, el equipo de ingeniería de datos implementa un DAG con las siguientes características:
| Componente | Detalle | Ejemplo en Código |
|---|---|---|
| DAG | Nombre: hourly_order_processing, programado cada hora | schedule_interval='0 * * * *' |
| Tasks | 3 tareas: extract_orders, transform_metrics, load_to_redshift | Usando MySqlOperator, PythonOperator, RedshiftOperator |
| Dependencies | Lineales: extract → transform → load | extract_task >> transform_task >> load_task |
| Datos de Ejemplo | Extrae 1000 pedidos por hora, transforma a 5 métricas clave | Tabla de salida con columnas: hora, avg_order_value, total_orders |
Este DAG se ejecuta automáticamente, reduciendo el tiempo de procesamiento manual de 30 minutos a segundos y asegurando que los datos estén siempre actualizados para el equipo de negocios. Un error común aquí sería no manejar fallos en la extracción, lo que Airflow mitiga con reintentos configurados en default_args.
Errores comunes
Al diseñar DAGs, los ingenieros de datos intermedios suelen cometer estos errores:
Programar DAGs con fechas de inicio en el futuro sin pruebas: Si definesstart_datecomo una fecha futura, Airflow no ejecutará el DAG hasta ese momento, causando retrasos inesperados. Solución: Usa fechas pasadas para pruebas y ajusta en producción.Crear dependencias circulares: Por ejemplo, hacer que una tarea A dependa de B y B dependa de A, lo que causa bucles infinitos. Airflow detectará esto, pero evítalo revisando el grafo conairflow tasks list.Ignorar el manejo de errores y reintentos: No configurarretriesoretry_delaypuede llevar a fallos permanentes en pipelines. Solución: Define valores razonables comoretries=3yretry_delay=timedelta(minutes=5)endefault_args.Sobrecargar un solo DAG con demasiadas tareas: Un DAG con 50+ tareas puede volverse difícil de depurar y mantener. Solución: Divide pipelines complejos en múltiples DAGs más pequeños y usaTriggerDagRunOperatorpara orquestarlos.No usar IDs de tarea únicos y descriptivos: IDs comotask1oprocess_datahacen que el monitoreo sea confuso. Solución: Usa nombres claros comoextract_customer_dataotransform_sales_metrics.
Checklist de dominio
Para asegurar que dominas la estructura de un DAG en Airflow, verifica que puedes:
Definir un DAG con parámetros básicos comodag_id,start_date, yschedule_interval.Crear al menos tres tareas usando diferentes operadores (e.g.,PythonOperator,BashOperator).Establecer dependencias entre tareas usando operadores como>>oset_upstream/set_downstream.Programar un DAG para ejecutarse en intervalos específicos (e.g., diario, horario) usando expresiones cron.Configurar manejo de errores con reintentos y tiempos de espera endefault_args.Visualizar la estructura del DAG en la interfaz web de Airflow para validar dependencias.Probar un DAG localmente usandoairflow tasks testantes de desplegarlo en producción.
Construye un DAG para procesar logs de servidor
En este ejercicio, crearás un DAG en Apache Airflow que automatice el procesamiento de logs de servidor. Sigue estos pasos:
- Configura el entorno: Asegúrate de tener Airflow instalado y ejecutándose localmente. Crea un nuevo archivo Python en la carpeta
dags/ de tu instalación de Airflow. - Define el DAG: Crea un DAG llamado
server_logs_etl con una start_date de hace una semana y un schedule_interval diario (ejecútalo cada día a las 2 AM). Usa default_args para configurar 2 reintentos con un retraso de 10 minutos entre ellos. - Crea las tareas: Implementa tres tareas usando
PythonOperator:extract_logs: Simula la extracción de logs de un archivo (puedes usar una función que lea un archivo de texto con datos de ejemplo).filter_errors: Filtra los logs para incluir solo entradas con nivel "ERROR".save_report: Guarda los logs filtrados en un nuevo archivo o imprime un resumen en consola.
- Establece dependencias: Ordena las tareas para que
extract_logs se ejecute primero, luego filter_errors, y finalmente save_report. - Prueba el DAG: Usa el comando
airflow tasks test server_logs_etl extract_logs 2023-10-01 para probar una tarea. Luego, activa el DAG en la interfaz web y verifica que se ejecute según lo programado.
Datos de ejemplo para los logs: Crea un archivo server_logs.txt con líneas como:
2023-10-01 10:00:00 INFO - User login successful
2023-10-01 10:05:00 ERROR - Database connection failed
2023-10-01 10:10:00 WARNING - High memory usage
Pistas- Recuerda importar los módulos necesarios de Airflow al inicio de tu archivo Python.
- Usa expresiones cron para el schedule_interval: '0 2 * * *' significa a las 2 AM diariamente.
- En las funciones de PythonOperator, asegúrate de incluir el parámetro **kwargs o context si necesitas acceder al contexto de ejecución de Airflow.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.