Quiz: Conceptos Básicos de Airflow

Quiz
10 min~4 min lectura

Quiz Interactivo

Pon a prueba tus conocimientos

Concepto clave

Apache Airflow es una plataforma de orquestación de workflows diseñada para programar y monitorear pipelines de datos de manera programática. Imagina que estás construyendo una fábrica automatizada donde cada máquina realiza una tarea específica, como extraer datos de una base de datos, transformarlos y cargarlos en un almacén. Airflow actúa como el supervisor que coordina todas estas máquinas, asegurándose de que cada paso se ejecute en el orden correcto, en el momento adecuado y con las dependencias necesarias.

El núcleo de Airflow son los DAGs (Directed Acyclic Graphs), que representan el flujo de trabajo como una serie de tareas conectadas por dependencias. Piensa en un DAG como un mapa de ruta para tu pipeline: cada nodo es una tarea (como ejecutar un script Python o una consulta SQL) y las flechas indican el orden de ejecución. La clave es que no puede haber ciclos (de ahí "acíclico"), lo que garantiza que el pipeline siempre avance hacia un final definido.

Cómo funciona en la práctica

Para implementar un pipeline en Airflow, sigues estos pasos:

  1. Define un DAG en Python, especificando parámetros como el intervalo de ejecución (por ejemplo, diario a las 6 AM).
  2. Crea operadores que representan tareas individuales, como PythonOperator para ejecutar funciones o BashOperator para comandos de shell.
  3. Establece dependencias entre tareas usando métodos como task1 >> task2 para indicar que task2 depende de task1.
  4. Despliega el DAG en Airflow, donde el scheduler lo ejecuta automáticamente según la programación.

Ejemplo básico de un DAG que extrae datos de una API y los guarda en un archivo:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def extract_data():
    # Código para extraer datos
    pass

def save_data():
    # Código para guardar datos
    pass

dag = DAG('mi_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
save_task = PythonOperator(task_id='save', python_callable=save_data, dag=dag)
extract_task >> save_task

Caso de estudio

Considera un pipeline ETL para un e-commerce que procesa ventas diarias. El objetivo es extraer datos de transacciones de una base de datos MySQL, transformarlos para calcular métricas como ingresos totales y productos más vendidos, y cargar los resultados en un data warehouse como BigQuery para análisis.

PasoTareaOperadorDependencia
1Extraer ventas del día anteriorMySqlOperatorNinguna
2Calcular ingresos y top productosPythonOperatorDepende del paso 1
3Cargar resultados en BigQueryBigQueryOperatorDepende del paso 2
En este caso, Airflow garantiza que los datos se procesen cada mañana sin intervención manual, reduciendo errores y ahorrando tiempo.

Errores comunes

  • Programar DAGs con fechas de inicio incorrectas: Si defines start_date en el futuro o con un formato erróneo, el DAG no se ejecutará. Usa datetime de Python y verifica la zona horaria.
  • Olvidar dependencias entre tareas: Sin dependencias explícitas, las tareas pueden ejecutarse en paralelo o en orden aleatorio, causando fallos. Siempre usa operadores como >> para establecer relaciones.
  • No manejar fallos en tareas: Si una tarea falla (por ejemplo, por un error en la conexión a la base de datos), el pipeline puede detenerse. Configura reintentos con retries en el operador y monitorea logs.
  • Sobrecargar el scheduler con DAGs complejos: DAGs con muchas tareas o intervalos muy cortos pueden ralentizar Airflow. Optimiza dividiendo DAGs grandes o ajustando schedule_interval.

Checklist de dominio

  1. Entiendo qué es un DAG y cómo representa un flujo de trabajo en Airflow.
  2. Puedo crear un DAG básico en Python con al menos dos tareas y dependencias.
  3. Sé configurar parámetros clave como start_date y schedule_interval.
  4. Reconozco operadores comunes como PythonOperator y BashOperator.
  5. Puedo explicar el rol del scheduler en la ejecución automática de DAGs.
  6. Sé cómo depurar errores comunes revisando logs en la interfaz web de Airflow.
  7. Entiendo cómo Airflow se integra en un pipeline ETL del mundo real.

Crear un DAG para procesar logs de servidor

En este ejercicio, crearás un DAG en Airflow que simule un pipeline para procesar logs de un servidor web. Sigue estos pasos:

  1. Configura un entorno local de Airflow (si no lo tienes, usa un sandbox en línea o instala Docker con la imagen oficial de Airflow).
  2. Crea un archivo Python llamado process_logs_dag.py en la carpeta de DAGs de Airflow.
  3. Define un DAG con:
    • Nombre: process_server_logs
    • Fecha de inicio: 1 de enero de 2023
    • Intervalo de ejecución: diario (@daily)
  4. Añade tres tareas usando PythonOperator:
    • extract_logs: Simula la extracción de logs de un archivo (usa una función que imprima "Extrayendo logs").
    • clean_logs: Simula la limpieza de datos (por ejemplo, filtrar líneas vacías; imprime "Limpiando logs").
    • load_logs: Simula la carga a una base de datos (imprime "Cargando logs").
  5. Establece dependencias para que las tareas se ejecuten en orden: extraer → limpiar → cargar.
  6. Despliega el DAG y verifica su ejecución en la interfaz web de Airflow.

Al final, deberías ver el DAG ejecutándose automáticamente cada día, con las tareas completadas en secuencia.

Pistas
  • Usa el módulo datetime de Python para definir start_date correctamente.
  • Recuerda que las dependencias se pueden establecer con sintaxis como extract_logs >> clean_logs >> load_logs.
  • Si el DAG no aparece en la interfaz web, verifica que el archivo esté en la carpeta correcta y que Airflow esté reiniciado.

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.