Concepto clave
Apache Airflow es una plataforma de orquestación de workflows diseñada para programar y monitorear pipelines de datos de manera programática. Imagina que estás construyendo una fábrica automatizada donde cada máquina realiza una tarea específica, como extraer datos de una base de datos, transformarlos y cargarlos en un almacén. Airflow actúa como el supervisor que coordina todas estas máquinas, asegurándose de que cada paso se ejecute en el orden correcto, en el momento adecuado y con las dependencias necesarias.
El núcleo de Airflow son los DAGs (Directed Acyclic Graphs), que representan el flujo de trabajo como una serie de tareas conectadas por dependencias. Piensa en un DAG como un mapa de ruta para tu pipeline: cada nodo es una tarea (como ejecutar un script Python o una consulta SQL) y las flechas indican el orden de ejecución. La clave es que no puede haber ciclos (de ahí "acíclico"), lo que garantiza que el pipeline siempre avance hacia un final definido.
Cómo funciona en la práctica
Para implementar un pipeline en Airflow, sigues estos pasos:
- Define un DAG en Python, especificando parámetros como el intervalo de ejecución (por ejemplo, diario a las 6 AM).
- Crea operadores que representan tareas individuales, como
PythonOperatorpara ejecutar funciones oBashOperatorpara comandos de shell. - Establece dependencias entre tareas usando métodos como
task1 >> task2para indicar que task2 depende de task1. - Despliega el DAG en Airflow, donde el scheduler lo ejecuta automáticamente según la programación.
Ejemplo básico de un DAG que extrae datos de una API y los guarda en un archivo:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def extract_data():
# Código para extraer datos
pass
def save_data():
# Código para guardar datos
pass
dag = DAG('mi_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
save_task = PythonOperator(task_id='save', python_callable=save_data, dag=dag)
extract_task >> save_taskCaso de estudio
Considera un pipeline ETL para un e-commerce que procesa ventas diarias. El objetivo es extraer datos de transacciones de una base de datos MySQL, transformarlos para calcular métricas como ingresos totales y productos más vendidos, y cargar los resultados en un data warehouse como BigQuery para análisis.
| Paso | Tarea | Operador | Dependencia |
|---|---|---|---|
| 1 | Extraer ventas del día anterior | MySqlOperator | Ninguna |
| 2 | Calcular ingresos y top productos | PythonOperator | Depende del paso 1 |
| 3 | Cargar resultados en BigQuery | BigQueryOperator | Depende del paso 2 |
En este caso, Airflow garantiza que los datos se procesen cada mañana sin intervención manual, reduciendo errores y ahorrando tiempo.
Errores comunes
- Programar DAGs con fechas de inicio incorrectas: Si defines
start_dateen el futuro o con un formato erróneo, el DAG no se ejecutará. Usadatetimede Python y verifica la zona horaria. - Olvidar dependencias entre tareas: Sin dependencias explícitas, las tareas pueden ejecutarse en paralelo o en orden aleatorio, causando fallos. Siempre usa operadores como
>>para establecer relaciones. - No manejar fallos en tareas: Si una tarea falla (por ejemplo, por un error en la conexión a la base de datos), el pipeline puede detenerse. Configura reintentos con
retriesen el operador y monitorea logs. - Sobrecargar el scheduler con DAGs complejos: DAGs con muchas tareas o intervalos muy cortos pueden ralentizar Airflow. Optimiza dividiendo DAGs grandes o ajustando
schedule_interval.
Checklist de dominio
- Entiendo qué es un DAG y cómo representa un flujo de trabajo en Airflow.
- Puedo crear un DAG básico en Python con al menos dos tareas y dependencias.
- Sé configurar parámetros clave como
start_dateyschedule_interval. - Reconozco operadores comunes como
PythonOperatoryBashOperator. - Puedo explicar el rol del scheduler en la ejecución automática de DAGs.
- Sé cómo depurar errores comunes revisando logs en la interfaz web de Airflow.
- Entiendo cómo Airflow se integra en un pipeline ETL del mundo real.
Crear un DAG para procesar logs de servidor
En este ejercicio, crearás un DAG en Airflow que simule un pipeline para procesar logs de un servidor web. Sigue estos pasos:
- Configura un entorno local de Airflow (si no lo tienes, usa un sandbox en línea o instala Docker con la imagen oficial de Airflow).
- Crea un archivo Python llamado
process_logs_dag.pyen la carpeta de DAGs de Airflow. - Define un DAG con:
- Nombre:
process_server_logs - Fecha de inicio: 1 de enero de 2023
- Intervalo de ejecución: diario (
@daily)
- Nombre:
- Añade tres tareas usando
PythonOperator:extract_logs: Simula la extracción de logs de un archivo (usa una función que imprima "Extrayendo logs").clean_logs: Simula la limpieza de datos (por ejemplo, filtrar líneas vacías; imprime "Limpiando logs").load_logs: Simula la carga a una base de datos (imprime "Cargando logs").
- Establece dependencias para que las tareas se ejecuten en orden: extraer → limpiar → cargar.
- Despliega el DAG y verifica su ejecución en la interfaz web de Airflow.
Al final, deberías ver el DAG ejecutándose automáticamente cada día, con las tareas completadas en secuencia.
Pistas- Usa el módulo
datetimede Python para definirstart_datecorrectamente. - Recuerda que las dependencias se pueden establecer con sintaxis como
extract_logs >> clean_logs >> load_logs. - Si el DAG no aparece en la interfaz web, verifica que el archivo esté en la carpeta correcta y que Airflow esté reiniciado.
Evalua tu comprension
Completa el quiz interactivo de arriba para ganar XP.