Diseño del pipeline: Desde datos hasta predicciones en producción

Lectura
20 min~12 min lectura

Introducción al Pipeline de Machine Learning en Producción

La transición de un modelo de Machine Learning desde un Jupyter Notebook hasta un sistema de producción robusto y confiable es el verdadero desafío en la industria. Un pipeline de ML es precisamente la arquitectura que automatiza y orquesta este flujo, transformando datos en bruto en predicciones valiosas de manera repetible y escalable. En esta lección, nos enfocaremos en el diseño de un pipeline completo, pensado para ser contenerizado con Docker y orquestado con Kubernetes, garantizando que cada componente sea modular, testeable y despliegable de forma independiente.

Diseñar este pipeline implica mucho más que escribir código de modelado. Se trata de ingeniería de software aplicada al ML: debemos considerar la ingestión de datos, la reproducibilidad de los experimentos, el versionado de modelos, el monitoreo y la capacidad de servir predicciones a bajas latencias. Un diseño pobre lleva a lo que se conoce como "deuda técnica de ML", donde los modelos se vuelven frágiles, costosos de mantener y prácticamente imposibles de actualizar. Nuestro objetivo es estructurar un sistema que evite estos escollos, utilizando las mejores prácticas de DevOps (o MLOps) para crear un ciclo de vida del modelo eficiente.

Concepto Clave: El Pipeline como Línea de Ensamblaje Industrial

Imagina una fábrica de automóviles. La materia prima (acero, plástico, cables) llega al inicio de una larga línea de ensamblaje. En cada estación, una máquina o robot especializado realiza una tarea muy específica: estampar la chapa, pintar, instalar el motor, colocar los asientos. La pieza pasa de una estación a la siguiente de forma automatizada, y al final de la línea, sale un automóvil funcional listo para su entrega. Un pipeline de ML opera bajo el mismo principio.

En nuestra analogía, la materia prima son los datos en bruto (archivos CSV, logs, imágenes). La primera estación de trabajo es el módulo de ingestión y validación de datos, que verifica que la materia prima cumple con los estándares de calidad. La siguiente estación es el preprocesamiento y feature engineering, donde los datos se limpian, transforman y convierten en características (features) listas para el modelo. La estación central es el entrenamiento del modelo, donde el algoritmo "aprende". Luego, el modelo producido pasa a una estación de empaquetado y versionado (por ejemplo, guardándolo en un registro de modelos). Finalmente, la última estación es el servicio de predicciones (serving), donde el modelo empaquetado se despliega para responder a peticiones de usuarios o sistemas externos. Cada componente es independiente y se comunica a través de interfaces bien definidas (archivos, APIs, mensajes).

Tip: La clave de un buen diseño es la separación de responsabilidades. Cada etapa del pipeline debe tener una única tarea principal y exponer una interfaz clara. Esto permite desarrollar, probar, contenerizar y escalar cada componente por separado, facilitando enormemente el uso de Docker y Kubernetes.

Cómo Funciona en la Práctica: Un Ejemplo Paso a Paso

Vamos a detallar el flujo de un pipeline para un sistema de recomendación de películas. El objetivo es, dado un usuario, recomendarle películas que no ha visto. El pipeline se activa cada semana para reentrenar el modelo con nuevos datos de calificaciones.

Paso 1: Ingestión y Validación. Un programa programado (por ejemplo, un CronJob en Kubernetes) ejecuta el primer contenedor. Este script descarga los nuevos archivos de ratings y metadatos de películas desde un bucket de S3 o una base de datos. Inmediatamente, valida el esquema: ¿tienen las columnas esperadas? ¿Los ratings están en el rango 1-5? ¿Hay valores nulos masivos? Si la validación falla, el pipeline se detiene y notifica a los ingenieros.

Paso 2: Preprocesamiento y Feature Engineering. Los datos validados se pasan al siguiente contenedor. Aquí se realiza la limpieza (imputar valores faltantes con la mediana), transformación (normalización de características numéricas) y, lo más importante, la ingeniería de características. Para nuestro recomendador, podríamos calcular la matriz usuario-ítem, extraer embeddings de títulos con un modelo de lenguaje simple o generar características agregadas por género.

Paso 3: Entrenamiento del Modelo. Las características procesadas se alimentan al contenedor de entrenamiento. Este contiene el script que carga el algoritmo (por ejemplo, Alternating Least Squares para filtrado colaborativo), divide los datos en entrenamiento y validación, ajusta los hiperparámetros y entrena el modelo final. Al terminar, el modelo se serializa (con pickle, joblib, o formatos como ONNX) y se guarda, junto con un reporte de métricas (RMSE, precisión@k), en un almacenamiento persistente.

Paso 4: Empaquetado y Registro. Un componente crucial toma el modelo serializado y lo "empaqueta" dentro de una imagen de Docker. Esta imagen incluye no solo el archivo del modelo, sino también todo el entorno de ejecución necesario para hacer predicciones: la versión de Python, las bibliotecas específicas y el código de la API de servicio. Esta imagen se etiqueta con un número de versión (ej: v1.2.5) y se sube a un registro de contenedores como Docker Hub o un registro privado.

Paso 5: Despliegue y Servicio. El orquestador (Kubernetes) recibe la instrucción de desplegar la nueva versión de la imagen del modelo. Utilizando un recurso como un Deployment, levanta uno o más pods con el contenedor del modelo. Un recurso Service expone estos pods bajo una dirección IP estable. Finalmente, un Ingress puede proveer un endpoint público (ej: api.misistema.com/recommend). Los sistemas cliente ahora pueden enviar solicitudes HTTP con un `user_id` y recibir una lista de películas recomendadas.

Código en Acción: Esqueleto del Pipeline con Scripts Funcionales

A continuación, presentamos fragmentos de código reales que ilustran componentes clave del pipeline. Estos scripts están simplificados para la enseñanza, pero son completamente funcionales y muestran la separación de responsabilidades.

Componente 1: Validación de Datos

Este script Python lee datos crudos y valida su estructura básica. Es el guardián de la calidad de entrada.

# validate_data.py
import pandas as pd
import sys
import json
from pathlib import Path

def validate_schema(df: pd.DataFrame, expected_schema: dict) -> bool:
    """Valida tipos de columna y valores no nulos."""
    for col, dtype in expected_schema.items():
        if col not in df.columns:
            return False, f"Columna faltante: {col}"
        if not pd.api.types.is_dtype_equal(df[col].dtype, dtype):
            return False, f"Tipo incorrecto para {col}. Esperado {dtype}, obtenido {df[col].dtype}"
        if df[col].isnull().all():
            return False, f"Columna {col} está completamente vacía"
    return True, "Esquema válido"

def main(data_path):
    # 1. Cargar datos
    raw_data = pd.read_csv(data_path)
    
    # 2. Definir esquema esperado (esto podría venir de un config file)
    expected_schema = {"user_id": "int64", "movie_id": "int64", "rating": "float64", "timestamp": "int64"}
    
    # 3. Ejecutar validación
    is_valid, message = validate_schema(raw_data, expected_schema)
    
    # 4. Validación de dominio: rating entre 1 y 5
    if is_valid and not raw_data['rating'].between(1, 5).all():
        is_valid = False
        message = "Valores de rating fuera del rango permitido (1-5)"
    
    # 5. Salida para el siguiente paso (éxito/fracaso)
    validation_result = {
        "is_valid": is_valid,
        "message": message,
        "rows": raw_data.shape[0],
        "columns": list(raw_data.columns)
    }
    
    # Escribir resultado a un archivo JSON para que lo lea el siguiente paso
    with open('/pipeline/artifacts/validation_result.json', 'w') as f:
        json.dump(validation_result, f)
    
    # Terminar con código de error si la validación falla
    if not is_valid:
        print(f"ERROR: {message}", file=sys.stderr)
        sys.exit(1)
    else:
        print("VALIDACIÓN EXITOSA")
        # Opcional: Guardar datos validados para el siguiente paso
        raw_data.to_parquet('/pipeline/artifacts/validated_data.parquet', index=False)

if __name__ == "__main__":
    # La ruta del dato se pasa como argumento o variable de entorno
    data_file_path = sys.argv[1] if len(sys.argv) > 1 else "/pipeline/data/raw_ratings.csv"
    main(data_file_path)

Componente 2: Entrenamiento del Modelo (Simplificado)

Este script carga los datos procesados, entrena un modelo simple y lo serializa. En la práctica, aquí iría la lógica de hiperparámetros y experimentación.

# train_model.py
import pandas as pd
import pickle
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import json
import sys

def main():
    # 1. Cargar datos procesados (salida del paso anterior)
    processed_data_path = "/pipeline/artifacts/processed_features.parquet"
    df = pd.read_parquet(processed_data_path)
    
    # 2. Separar características (X) y target (y)
    # Supongamos que tenemos características de usuario y película
    target_col = 'rating'
    feature_cols = [c for c in df.columns if c != target_col]
    
    X = df[feature_cols]
    y = df[target_col]
    
    # 3. Dividir en train/test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 4. Instanciar y entrenar el modelo
    print("Entrenando modelo RandomForest...")
    model = RandomForestRegressor(n_estimators=50, random_state=42, n_jobs=-1)
    model.fit(X_train, y_train)
    
    # 5. Evaluar
    y_pred = model.predict(X_test)
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    print(f"RMSE en conjunto de test: {rmse:.4f}")
    
    # 6. Serializar el modelo
    model_path = "/pipeline/artifacts/movie_recommender_model.pkl"
    with open(model_path, 'wb') as f:
        pickle.dump(model, f)
    
    # 7. Guardar métricas y metadata para registro
    metadata = {
        "model_version": "1.0.0",
        "algorithm": "RandomForestRegressor",
        "feature_count": len(feature_cols),
        "performance": {"rmse": rmse},
        "training_samples": X_train.shape[0]
    }
    
    with open('/pipeline/artifacts/model_metadata.json', 'w') as f:
        json.dump(metadata, f)
    
    print("Entrenamiento completado y modelo guardado.")

if __name__ == "__main__":
    main()

Componente 3: API de Servicio con Flask (para Dockerizar)

Este es el código que irá dentro del contenedor de servicio. Expone el modelo como un endpoint REST.

# serve_model.py
from flask import Flask, request, jsonify
import pickle
import numpy as np
import pandas as pd
import os

app = Flask(__name__)

# Cargar el modelo al iniciar la app
MODEL_PATH = '/app/model/movie_recommender_model.pkl'
with open(MODEL_PATH, 'rb') as f:
    MODEL = pickle.load(f)

# Cargar metadata del modelo (ej: nombres de características)
FEATURE_ORDER = ['feature_1', 'feature_2', 'feature_3']  # Esto debería cargarse de un config

@app.route('/health', methods=['GET'])
def health():
    """Endpoint de salud para verificaciones de readiness/liveness en Kubernetes."""
    return jsonify({"status": "healthy", "model_loaded": True})

@app.route('/predict', methods=['POST'])
def predict():
    """
    Endpoint de predicción.
    Espera un JSON: {"features": [val1, val2, val3, ...]}
    """
    try:
        data = request.get_json()
        
        if not data or 'features' not in data:
            return jsonify({"error": "Se requiere un campo 'features' en el JSON"}), 400
        
        input_features = data['features']
        
        # Validar longitud de características
        if len(input_features) != len(FEATURE_ORDER):
            return jsonify({
                "error": f"Número incorrecto de características. Esperado {len(FEATURE_ORDER)}, obtenido {len(input_features)}"
            }), 400
        
        # Convertir a formato numpy para predicción
        features_array = np.array(input_features).reshape(1, -1)
        
        # Hacer la predicción
        prediction = MODEL.predict(features_array)
        
        # Formatear respuesta
        response = {
            "prediction": float(prediction[0]),
            "model_version": "1.0.0",
            "status": "success"
        }
        return jsonify(response)
    
    except Exception as e:
        # Loggear el error internamente
        app.logger.error(f"Error en predicción: {str(e)}")
        return jsonify({"error": "Error interno del servidor", "details": str(e)}), 500

if __name__ == '__main__':
    # En producción, usar un WSGI server como Gunicorn
    # Para desarrollo/debug:
    app.run(host='0.0.0.0', port=8080, debug=False)

Errores Comunes y Cómo Evitarlos

Al diseñar e implementar pipelines de ML, varios errores recurrentes pueden comprometer la estabilidad y utilidad del sistema. Identificarlos temprano es crucial.

1. Acoplamiento Excesivo entre Etapas: El error más grave es hacer que cada etapa dependa directamente de variables internas o estructuras de memoria de la etapa anterior. Esto hace imposible la ejecución independiente o el reemplazo de componentes.
Solución: Utilizar interfaces basadas en archivos (Parquet, JSON), mensajes (Kafka) o APIs REST para la comunicación entre etapas. Cada componente debe leer su entrada desde una ubicación definida y escribir su salida en otra, documentando claramente el formato.

2. Falta de Versionado de Datos y Modelos: Entrenar un nuevo modelo con datos diferentes y sobrescribir el anterior sin guardar una copia es una receta para el desastre. Si las predicciones se degradan, no hay forma de retroceder.
Solución: Implementar un sistema de versionado para conjuntos de datos (usando DVC o simplemente nombres de archivo con timestamps o hashes) y para modelos (registrando cada modelo entrenado en un sistema como MLflow o un bucket de S3 con metadatos).

3. Ignorar la Validación de Entrada en el Servicio (Serving): Asumir que los datos de predicción que llegan al endpoint en producción tendrán el mismo formato y calidad que los datos de entrenamiento.
Solución: Incluir una capa de validación robusta en el endpoint `/predict`, tal como se hizo en la etapa de ingestión. Verificar tipos, rangos, valores nulos y esquema. Rechazar solicitudes malformadas con errores HTTP claros (400 Bad Request).

4. Contenedores "Gordos" y No Especializados: Crear una única imagen de Docker gigante que contenga el código para la validación, el entrenamiento y el servicio. Esto aumenta el tamaño de la imagen, la superficie de ataque y dificulta la gestión de dependencias.
Solución: Diseñar una imagen de Docker por cada etapa del pipeline o, al menos, una para entrenamiento y otra para servicio. Cada imagen solo debe contener las bibliotecas estrictamente necesarias para su función, usando imágenes base ligeras como `python:3.9-slim`.

5. No Planificar para el Escalado Horizontal: Diseñar el servicio de predicciones como una aplicación monolítica que mantiene estado (stateful) y no puede ejecutarse en múltiples réplicas.
Solución: Asegurarse de que el contenedor de servicio sea sin estado (stateless). El modelo debe cargarse desde un almacenamiento compartido (como un volumen persistente en Kubernetes) o desde un registro al iniciar. Las solicitudes HTTP no deben depender de estado mantenido en memoria entre ellas. Esto permite que Kubernetes escale horizontalmente el número de pods según la demanda.

Checklist de Dominio

Antes de considerar que has dominado el diseño de un pipeline de ML para producción, verifica que puedes afirmar lo siguiente:

  • Puedo desglosar un problema de ML en al menos 5 etapas independientes y secuenciales (Ingestión, Validación, Procesamiento, Entrenamiento, Servicio).
  • Sé diseñar la interfaz de comunicación (formato de archivo, esquema de mensaje) entre cada par de etapas consecutivas.
  • Puedo escribir un script para cada etapa que sea autónomo, que lea de una entrada definida y escriba en una salida definida, y que falle de forma controlada.
  • Comprendo la importancia del versionado y puedo explicar una estrategia para versionar tanto conjuntos de datos como modelos entrenados.
  • Sé cómo estructurar una aplicación de servicio (como Flask/FastAPI) que sea stateless, incluya endpoints de salud (`/health`) y valide rigurosamente las solicitudes de entrada.
  • Puedo esbozar el diseño de los Dockerfiles necesarios, argumentando por qué se necesitan imágenes separadas para diferentes etapas.
  • Puedo describir cómo se orquestaría este pipeline en Kubernetes, mencionando recursos como Jobs para etapas de entrenamiento y Deployments/Services para la etapa de servicio.
  • Identifico al menos tres puntos de falla potencial en un pipeline y sé qué mecanismos (logging, alertas, reinicios) implementar para mitigarlos.
De lección a portfolio

Convertí esta lección en una habilidad visible para entrevistas.

Guardá el curso, completá los ejercicios y conectá esta habilidad con una ruta de empleo, data, IA, programación o marketing.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión