Práctica: Crear un Pipeline End-to-end con Airflow y Kubernetes

Video
30 min~5 min lectura

Reproductor de video

Concepto clave

Un pipeline de ML end-to-end es como una cadena de montaje en una fábrica: cada etapa procesa datos y pasa el resultado a la siguiente, desde la ingesta hasta el despliegue. En nuestro contexto, Airflow orquesta estas etapas como un director de orquesta, mientras que Kubernetes proporciona los recursos escalables donde cada tarea se ejecuta en contenedores Docker. Esto permite automatizar flujos complejos de ML, como entrenar modelos periódicamente con nuevos datos, sin intervención manual.

La integración con Kubernetes transforma Airflow de un orquestador local a uno cloud-native: en lugar de ejecutar tareas en máquinas fijas, las lanza como Pods efímeros en un clúster. Esto ofrece escalabilidad automática, alta disponibilidad y mejor uso de recursos. Imagina que tu pipeline es un viaje en tren: Airflow define el horario y las estaciones (tareas), Docker empaqueta cada vagón (código y dependencias), y Kubernetes gestiona las vías y locomotoras (infraestructura).

Cómo funciona en la práctica

Vamos a construir un pipeline que entrena un modelo de clasificación cada semana. Sigue estos pasos:

  1. Diseña el DAG en Airflow: Define tareas como descarga de datos, preprocesamiento, entrenamiento y evaluación.
  2. Containeriza cada tarea: Crea imágenes Docker con el código y entorno necesario (por ejemplo, scikit-learn).
  3. Configura el KubernetesExecutor: Modifica Airflow para que use Kubernetes en lugar de ejecutores locales.
  4. Despliega en Kubernetes: Implementa Airflow en el clúster usando Helm o manifiestos YAML.
  5. Programa y monitoriza: Activa el DAG y revisa logs en la UI de Airflow.

Un ejemplo real: una empresa de e-commerce usa este enfoque para reentrenar modelos de recomendación cada noche, procesando millones de registros en contenedores que Kubernetes escala según la carga.

Código en acción

Aquí tienes un DAG de Airflow que define un pipeline simple. Nota cómo cada tarea usa el KubernetesPodOperator para ejecutarse en contenedores:

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_scientist',
    'depends_on_past': False,
    'start_date': datetime(2023, 10, 1),
    'retries': 1
}

dag = DAG(
    'ml_pipeline_weekly',
    default_args=default_args,
    schedule_interval='@weekly'
)

download_task = KubernetesPodOperator(
    namespace='airflow',
    image='myregistry/data-downloader:latest',
    cmds=['python', 'download.py'],
    name='download-data',
    task_id='download_data',
    dag=dag
)

train_task = KubernetesPodOperator(
    namespace='airflow',
    image='myregistry/model-trainer:latest',
    cmds=['python', 'train.py'],
    name='train-model',
    task_id='train_model',
    dag=dag
)

download_task >> train_task

Ahora, mira cómo refactorizamos una tarea para hacerla más eficiente. Antes, el contenedor descargaba datos en cada ejecución:

FROM python:3.9
COPY download.py .
RUN pip install requests pandas
CMD ["python", "download.py"]

Después, optimizamos con volúmenes persistentes en Kubernetes para cachear datos:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: data-pvc
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi

Y en el DAG, agregamos configuración al KubernetesPodOperator:

download_task = KubernetesPodOperator(
    namespace='airflow',
    image='myregistry/data-downloader:optimized',
    cmds=['python', 'download.py'],
    volume_mounts=[{
        'mountPath': '/data',
        'name': 'data-volume'
    }],
    volumes=[{
        'name': 'data-volume',
        'persistentVolumeClaim': {'claimName': 'data-pvc'}
    }],
    name='download-data',
    task_id='download_data',
    dag=dag
)

Errores comunes

  • Olvidar límites de recursos en Kubernetes: Sin definir CPU/memoria, los Pods pueden consumir todos los recursos del nodo. Solución: siempre especifica resources en el KubernetesPodOperator, como limits={'cpu': '1', 'memory': '2Gi'}.
  • Imágenes Docker demasiado grandes: Incluir datasets o librerías innecesarias ralentiza el despliegue. Usa imágenes base ligeras (por ejemplo, python:3.9-slim) y multi-stage builds.
  • No manejar fallos en el pipeline: Si una tarea falla, el pipeline puede detenerse sin limpieza. Configura retries y timeouts en Airflow, y usa hooks para notificaciones.
  • Ignorar la seguridad de imágenes: Descargar imágenes de repositorios no verificados introduce riesgos. Usa registros privados y escanea imágenes con herramientas como Trivy.

Checklist de dominio

  1. ¿Puedes diseñar un DAG de Airflow con al menos 3 tareas secuenciales?
  2. ¿Sabes construir y subir una imagen Docker para una tarea de ML (por ejemplo, entrenamiento con scikit-learn)?
  3. ¿Has configurado el KubernetesExecutor en Airflow para ejecutar tareas en un clúster?
  4. ¿Puedes desplegar Airflow en Kubernetes usando Helm o manifiestos YAML?
  5. ¿Entiendes cómo monitorizar logs y métricas de Pods en Kubernetes durante la ejecución?
  6. ¿Has optimizado un pipeline usando volúmenes persistentes o configmaps?
  7. ¿Puedes explicar cómo escalar automáticamente tareas pesadas con Horizontal Pod Autoscaler?

Implementa un pipeline de ML para predecir ventas con Airflow y Kubernetes

En este ejercicio, crearás un pipeline que descarga datos históricos de ventas, entrena un modelo de regresión y guarda los resultados. Sigue estos pasos:

  1. Prepara el entorno: Asegúrate de tener un clúster de Kubernetes funcionando (puedes usar Minikube localmente) y Airflow instalado con el KubernetesExecutor.
  2. Crea imágenes Docker: Construye dos imágenes:
    • sales-downloader: Contiene un script Python que descarga datos de una URL pública (por ejemplo, un CSV de Kaggle) y los guarda en /data.
    • sales-trainer: Contiene un script que lee los datos, entrena un modelo simple con scikit-learn (por ejemplo, LinearRegression) y exporta métricas a un archivo JSON.
    Usa Dockerfiles multi-stage para mantener las imágenes bajo 500MB.
  3. Escribe el DAG en Airflow: Define un DAG llamado sales_prediction_pipeline con dos tareas usando KubernetesPodOperator. La primera tarea ejecuta sales-downloader, la segunda sales-trainer. Programa el DAG para ejecutarse cada día a las 2 AM.
  4. Configura recursos en Kubernetes: Añade un PersistentVolumeClaim de 5Gi para almacenar los datos entre tareas. En el KubernetesPodOperator, monta este volumen en /data y define límites de recursos (por ejemplo, 1 CPU y 1Gi de memoria por Pod).
  5. Prueba y monitoriza: Despliega el DAG, ejecútalo manualmente desde la UI de Airflow y verifica que los Pods se creen en Kubernetes. Revisa los logs para asegurarte de que el modelo se entrene correctamente.

Entrega: Un repositorio Git con los Dockerfiles, scripts Python, el DAG y un README que explique cómo desplegar.

Pistas
  • Usa kubectl get pods para verificar que los Pods se ejecuten en el namespace de Airflow.
  • Si el download falla, revisa que la URL sea accesible y que el contenedor tenga permisos de red.
  • Para depurar, ejecuta los contenedores localmente con docker run antes de integrarlos en Airflow.

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.