Patrones de concurrencia en APIs (ej., worker pools)

Video
30 min~5 min lectura

Reproductor de video

Concepto clave

Los patrones de concurrencia son soluciones probadas para manejar múltiples tareas simultáneamente en aplicaciones Go. En APIs de alto rendimiento, el patrón worker pool (grupo de trabajadores) es fundamental: creas un conjunto fijo de goroutines (trabajadores) que procesan tareas desde una cola compartida. Imagina una cocina de restaurante: en lugar de que cada pedido cree un nuevo chef (lo que saturaría la cocina), tienes un equipo fijo de chefs que toman pedidos de una mesa central. Esto controla el consumo de recursos y evita la sobrecarga del sistema.

En APIs REST con gorilla/mux, los worker pools son ideales para procesar tareas pesadas como: validaciones complejas, transformaciones de datos, llamadas a bases de datos, o integraciones con servicios externos. Sin este patrón, un pico de tráfico podría generar miles de goroutines simultáneas, agotando memoria o CPU. El worker pool actúa como un regulador: limita el paralelismo máximo, mantiene la estabilidad y mejora la previsibilidad del rendimiento.

Cómo funciona en la práctica

Implementar un worker pool en una API con gorilla/mux sigue estos pasos:

  1. Crear la cola de tareas: Usa un canal (chan) de Go para almacenar las tareas pendientes. Este canal actúa como buffer entre los handlers HTTP y los workers.
  2. Inicializar los workers: Lanza N goroutines al inicio de la aplicación. Cada worker escucha permanentemente el canal de tareas.
  3. Encolar tareas desde los handlers: Cuando llega una petición HTTP, el handler no procesa la tarea directamente. En su lugar, crea una estructura con los datos necesarios y la envía al canal.
  4. Procesamiento asíncrono: Los workers toman tareas del canal y las ejecutan. El handler HTTP puede responder inmediatamente (ej., con "202 Accepted") mientras el trabajo continúa en segundo plano.
  5. Gestión de finalización: Usa sync.WaitGroup o canales de señal para cerrar los workers de forma ordenada al apagar la API.

Código en acción

Primero, veamos un handler sin worker pool que procesa imágenes de forma síncrona:

// Handler SIN worker pool - problema: bloquea la petición
func uploadImageHandler(w http.ResponseWriter, r *http.Request) {
    file, _, err := r.FormFile("image")
    if err != nil {
        http.Error(w, "Error al leer imagen", http.StatusBadRequest)
        return
    }
    defer file.Close()
    
    // Procesamiento pesado (ejemplo: redimensionar)
    processedImage := processImage(file) // Esta función tarda 2 segundos
    
    // Guardar en base de datos
    saveToDB(processedImage)
    
    w.WriteHeader(http.StatusOK)
    w.Write([]byte("Imagen procesada"))
}

// Con alta concurrencia, cada petición crea una goroutine que bloquea recursos.

Ahora, implementamos un worker pool para el mismo caso:

// Estructura para la tarea
type ImageTask struct {
    File     multipart.File
    UserID   string
    Response chan<- string
}

// Worker pool
func StartImageWorkers(numWorkers int, taskQueue chan ImageTask) {
    for i := 0; i < numWorkers; i++ {
        go func(workerID int) {
            for task := range taskQueue {
                log.Printf("Worker %d procesando imagen para usuario %s", workerID, task.UserID)
                processed := processImage(task.File)
                saveToDB(processed)
                task.Response <- "Imagen procesada exitosamente"
            }
        }(i)
    }
}

// Handler CON worker pool
func uploadImageHandlerAsync(w http.ResponseWriter, r *http.Request) {
    file, _, err := r.FormFile("image")
    if err != nil {
        http.Error(w, "Error al leer imagen", http.StatusBadRequest)
        return
    }
    defer file.Close()
    
    // Crear tarea
    responseChan := make(chan string, 1)
    task := ImageTask{
        File:     file,
        UserID:   r.FormValue("user_id"),
        Response: responseChan,
    }
    
    // Enviar a la cola (no bloqueante si hay buffer)
    select {
    case taskQueue <- task:
        // Aceptado para procesamiento
        w.WriteHeader(http.StatusAccepted)
        w.Write([]byte("Imagen en cola de procesamiento"))
    default:
        // Cola llena - rechazar
        http.Error(w, "Servidor sobrecargado", http.StatusServiceUnavailable)
    }
    
    // Opcional: esperar respuesta asíncrona (en otra endpoint)
    // go func() { result := <-responseChan; /* notificar al usuario */ }()
}

// Inicialización en main.go
taskQueue := make(chan ImageTask, 100) // Buffer de 100 tareas
StartImageWorkers(10, taskQueue) // 10 workers
mux := mux.NewRouter()
mux.HandleFunc("/upload", uploadImageHandlerAsync).Methods("POST")

Errores comunes

  • No dimensionar el buffer del canal: Un canal sin buffer (make(chan Task)) bloquea al productor hasta que un worker tome la tarea. En APIs, esto puede causar timeouts HTTP. Solución: usar buffer adecuado al tráfico esperado (ej., 50-200 tareas).
  • Olvidar cerrar workers al apagar: Si la API se reinicia sin cerrar goroutines, pueden quedar leaks. Solución: capturar señales de sistema (os.Interrupt) y cerrar el canal para que los workers terminen con range.
  • Worker pools demasiado grandes o pequeños: Más workers que núcleos de CPU causa context switching excesivo; muy pocos workers subutilizan recursos. Solución: empezar con workers = núcleos de CPU * 2 y ajustar con métricas.
  • No manejar panics en workers: Un panic en un worker mata esa goroutine, reduciendo el pool. Solución: envolver el código del worker en defer con recuperación (recover()).
  • Bloquear en canales sin timeout: Esperar indefinidamente en <-responseChan puede bloquear goroutines. Solución: usar select con time.After para timeouts.

Checklist de dominio

  • Puedo explicar la diferencia entre concurrencia (worker pool) y paralelismo (goroutines ilimitadas).
  • He implementado un worker pool con canales, sync.WaitGroup, y cierre ordenado.
  • Sé dimensionar el número de workers y el buffer del canal según métricas de mi API.
  • Uso el patrón en handlers gorilla/mux para tareas como: procesamiento de archivos, envío de emails, o llamadas a APIs externas.
  • Manejo errores y panics dentro de los workers para mantener el pool estable.
  • Puedo decidir cuándo responder inmediatamente (202 Accepted) vs. esperar resultado síncrono.
  • Monitorizo métricas como: longitud de la cola, tiempo de procesamiento promedio, y workers activos.

Implementa un worker pool para procesamiento de pedidos en una API de e-commerce

En este ejercicio, construirás un worker pool para una API REST que procesa pedidos de un e-commerce. Los pedidos requieren validación de inventario y generación de factura, operaciones que simulan latencia.

  1. Crea la estructura del proyecto:
    Archivo main.go con gorilla/mux, un struct OrderTask que contenga: OrderID (string), Items ([]string), y un canal para respuesta.
  2. Implementa el worker pool:
    Función StartOrderWorkers(numWorkers int, taskQueue chan OrderTask) que lance goroutines. Cada worker debe:
    • Recibir tareas del canal
    • Simular validación (time.Sleep(100ms))
    • Simular generación de factura (time.Sleep(200ms))
    • Enviar resultado al canal de respuesta
  3. Crea el handler HTTP:
    /order (POST) que:
    • Acepte JSON con {"order_id": "123", "items": ["item1", "item2"]}
    • Encole la tarea en el worker pool
    • Responda inmediatamente con 202 Accepted y un task_id
    • Si la cola está llena, responda 503 Service Unavailable
  4. Añade endpoint de consulta:
    /order/status/{task_id} (GET) que espere (con timeout) la respuesta del worker y devuelva el resultado.
  5. Prueba con concurrencia:
    Usa go test o herramientas como hey para enviar 50 peticiones simultáneas y verifica que el worker pool las maneje sin bloquear.
Pistas
  • Usa un mapa sincronizado (sync.Map) para almacenar canales de respuesta asociados a task_id.
  • Implementa un timeout en el endpoint de status usando select con time.After(5*time.Second).
  • Para simular carga, varía los time.Sleep en los workers entre 50ms y 300ms.

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.