Práctica: Implementar un worker pool para tareas asíncronas

Lectura
35 min~12 min lectura
Objetivo de la lección

Aquí es donde el patrón Worker Pool (o grupo de trabajadores) se convierte en una herramienta indispensable.

Puntos de control
  • Introducción: La Necesidad de un Worker Pool en APIs de Alto Rendimiento
  • Concepto Clave: Desglosando el Patrón Worker Pool
  • Cómo Funciona en la Práctica: Flujo Paso a Paso
  • Código en Acción: Implementación Completa y Funcional

Introducción: La Necesidad de un Worker Pool en APIs de Alto Rendimiento

En el desarrollo de APIs de alto rendimiento con Go, uno de los desafíos más críticos es manejar eficientemente tareas que son intensivas en tiempo o recursos sin bloquear el hilo principal de ejecución que atiende las peticiones HTTP. Aquí es donde el patrón Worker Pool (o grupo de trabajadores) se convierte en una herramienta indispensable. Este patrón nos permite crear un conjunto predefinido de goroutines (los trabajadores) que permanecen en espera, listas para procesar tareas que llegan a través de un canal. En lugar de lanzar una nueva goroutine para cada tarea, lo que puede llevar a un consumo descontrolado de memoria y CPU, el worker pool establece un límite controlado de concurrencia.

Imagina que tu API recibe una solicitud para generar un reporte complejo, enviar un lote de correos electrónicos o procesar una imagen. Si realizas estas operaciones de manera síncrona dentro del manejador HTTP, la conexión se mantendrá abierta hasta que finalice, degradando la capacidad de respuesta de tu servicio y limitando el número de solicitudes concurrentes que puedes manejar. Al delegar estas tareas a un worker pool, tu manejador HTTP puede aceptar la solicitud, encolar la tarea y responder inmediatamente con un "202 Accepted" o un ID de seguimiento, mientras el trabajo pesado se realiza en segundo plano de manera ordenada y eficiente. Esta lección te guiará en la implementación práctica de un worker pool robusto y listo para producción utilizando las primitivas de concurrencia de Go.

Concepto Clave: Desglosando el Patrón Worker Pool

El patrón Worker Pool se basa en cuatro componentes fundamentales: el Job (Trabajo), el Worker (Trabajador), el Dispatcher (Despachador) o Pool (Grupo), y los Canales para la comunicación. Un Job es una unidad de trabajo que necesita ser procesada; en Go, suele ser una estructura que contiene los datos necesarios y quizás un canal para devolver el resultado. Un Worker es una goroutine que se ejecuta de forma indefinida, escuchando un canal de jobs. Cuando recibe un job, lo ejecuta. El Dispatcher es el responsable de crear el pool de workers y de proporcionar un mecanismo (normalmente un canal) para que el código exterior envíe jobs al pool. Los Canales son las arterias del sistema: el canal de jobs por donde los trabajadores reciben su trabajo, y canales opcionales para resultados, errores o para detener el pool.

Una analogía del mundo real perfecta es una cocina de restaurante. El Dispatcher es el/la host que recibe los pedidos de los clientes (las solicitudes HTTP). Los Workers son los chefs en la línea de cocina. El canal de jobs es la barra donde se colocan los tickets de los pedidos. La host no cocina; toma el pedido, lo escribe en un ticket (crea el Job) y lo coloca en la barra (canal). Los chefs (Workers) están observando constantemente la barra. Cuando un chef está libre, toma el siguiente ticket (Job) y comienza a cocinar (procesar). La cocina tiene un número fijo de chefs (el tamaño del pool), lo que evita el caos de contratar y despedir chefs para cada pedido y previene la saturación de la cocina. Este modelo es predecible, eficiente y se escala controlando el número de chefs, no aceptando infinitos pedidos a la vez.

Tip Clave: El tamaño del worker pool es un parámetro de tuning crítico. Un número muy pequeño subutilizará los recursos de la CPU, mientras que uno muy grande puede saturar la memoria o el ancho de banda. Una heurística común es comenzar con un tamaño igual al número de núcleos de CPU disponibles para tareas CPU-bound, o mayor para tareas I/O-bound. Usa runtime.NumCPU() como punto de partida.

Cómo Funciona en la Práctica: Flujo Paso a Paso

Vamos a detallar el flujo de una implementación típica, desde la llegada de una petición HTTP hasta la finalización de la tarea asíncrona. Paso 1: Inicialización. Al arrancar tu microservicio, creas e inicializas el worker pool. Esto implica crear el canal de jobs (usualmente con buffer), lanzar la cantidad deseada de workers (goroutines), y hacer que cada worker escuche en un bucle infinito el canal de jobs. En este punto, los workers están inactivos, esperando jobs. Paso 2: Recepción de la Solicitud. Tu manejador HTTP, enrutado con Gorilla/Mux, recibe una petición POST a, por ejemplo, /api/v1/tasks. El manejador valida la entrada, y en lugar de ejecutar la tarea, construye una estructura Job que encapsula todos los datos necesarios (ID de usuario, parámetros, etc.).

Paso 3: Despacho Asíncrono. El manejador envía este job al canal del worker pool. Esta operación es no bloqueante si el canal tiene buffer y no está lleno. Inmediatamente después de encolar el job, el manejador HTTP responde al cliente con un estado 202 Accepted y un cuerpo JSON que contiene un ID de tarea para seguimiento. La conexión HTTP se cierra, liberando recursos. Paso 4: Procesamiento por el Worker. Uno de los workers libres en el pool recibe el job del canal. Sale de su estado de espera y ejecuta la función de procesamiento asociada al job, que podría ser un cálculo, una llamada a una base de datos, o una solicitud a otro servicio. Paso 5: Manejo de Resultados y Errores. Una vez completado, el worker debe manejar el resultado o el error. Esto puede hacerse actualizando una base de datos con el estado, enviando el resultado a otro canal de resultados que un "monitor" esté escuchando, o escribiendo en un log estructurado. La clave es que el worker no interactúa directamente con el cliente HTTP original.

Código en Acción: Implementación Completa y Funcional

A continuación, presentamos una implementación completa y lista para integrar. Incluye tipos para Job y Result, un Pool que gestiona workers, y un ejemplo de integración con un manejador HTTP.

// worker_pool.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "runtime"
    "time"

    "github.com/gorilla/mux"
)

// Job representa la unidad de trabajo que se enviará al pool.
type Job struct {
    ID       int
    TaskData interface{} // Datos flexibles para la tarea (ej., JSON, struct específica)
    Handler  func(interface{}) (Result, error) // Función que ejecutará el worker
}

// Result representa el resultado de procesar un Job.
type Result struct {
    JobID    int
    Output   interface{}
    Error    error
    Duration time.Duration
}

// WorkerPool gestiona el pool de trabajadores.
type WorkerPool struct {
    jobQueue    chan Job
    resultQueue chan Result
    numWorkers  int
    ctx         context.Context
    cancel      context.CancelFunc
}

// NewWorkerPool crea e inicializa un nuevo WorkerPool.
func NewWorkerPool(numWorkers int, queueSize int) *WorkerPool {
    ctx, cancel := context.WithCancel(context.Background())
    return &WorkerPool{
        jobQueue:    make(chan Job, queueSize),
        resultQueue: make(chan Result, queueSize),
        numWorkers:  numWorkers,
        ctx:         ctx,
        cancel:      cancel,
    }
}

// Start lanza todos los workers del pool.
func (wp *WorkerPool) Start() {
    for i := 1; i <= wp.numWorkers; i++ {
        go wp.worker(i)
    }
    log.Printf("Worker pool iniciado con %d trabajadores\n", wp.numWorkers)
}

// worker es la función que ejecuta cada goroutine trabajadora.
func (wp *WorkerPool) worker(id int) {
    for {
        select {
        case job := <-wp.jobQueue:
            start := time.Now()
            log.Printf("Worker %d comenzando job ID %d", id, job.ID)
            output, err := job.Handler(job.TaskData)
            duration := time.Since(start)

            // Enviar resultado al canal de resultados
            wp.resultQueue <- Result{
                JobID:    job.ID,
                Output:   output,
                Error:    err,
                Duration: duration,
            }
            log.Printf("Worker %d finalizó job ID %d en %v", id, job.ID, duration)

        case <-wp.ctx.Done():
            log.Printf("Worker %d detenido", id)
            return
        }
    }
}

// SubmitJob envía un trabajo al pool para su procesamiento.
func (wp *WorkerPool) SubmitJob(job Job) error {
    select {
    case wp.jobQueue <- job:
        return nil
    default:
        return fmt.Errorf("cola de trabajos llena, no se pudo enviar job ID %d", job.ID)
    }
}

// Stop finaliza de manera ordenada todos los workers del pool.
func (wp *WorkerPool) Stop() {
    wp.cancel()
    close(wp.jobQueue)
    close(wp.resultQueue)
    log.Println("Worker pool detenido")
}

// ResultStream devuelve un canal de solo lectura para recibir resultados.
func (wp *WorkerPool) ResultStream() <-chan Result {
    return wp.resultQueue
}
// main.go - Integración con HTTP y ejemplo de tarea
package main

import (
    "encoding/json"
    "log"
    "math/rand"
    "net/http"
    "strconv"
    "sync/atomic"
    "time"
)

// Simulamos un contador atómico para generar IDs de job únicos.
var jobIDCounter int64

// taskRequest representa la estructura de la petición HTTP.
type taskRequest struct {
    UserEmail string `json:"user_email"`
    Data      string `json:"data"`
}

// taskResponse representa la respuesta HTTP inmediata.
type taskResponse struct {
    JobID       int64  `json:"job_id"`
    Status      string `json:"status"`
    Message     string `json:"message"`
    EnqueuedAt  string `json:"enqueued_at"`
}

// simulateHeavyTask es un ejemplo de función manejadora para un Job.
func simulateHeavyTask(taskData interface{}) (Result, error) {
    // Convertir los datos de la tarea
    data, ok := taskData.(taskRequest)
    if !ok {
        return Result{}, fmt.Errorf("tipo de datos de tarea inválido")
    }

    // Simular un trabajo pesado (ej., procesamiento, llamada a API externa)
    time.Sleep(time.Duration(rand.Intn(3)+1) * time.Second) // 1-3 segundos

    // Simular un resultado
    resultOutput := fmt.Sprintf("Procesado correo %s con datos: %s", data.UserEmail, data.Data)
    // Aquí iría la lógica real: generar reporte, enviar email, etc.
    return Result{Output: resultOutput}, nil
}

func main() {
    // 1. Inicializar Worker Pool
    // Usamos el doble de núcleos para tareas I/O bound simuladas.
    numWorkers := runtime.NumCPU() * 2
    pool := NewWorkerPool(numWorkers, 100) // Cola con buffer para 100 jobs
    pool.Start()
    defer pool.Stop() // Asegurar la limpieza al salir

    // 2. (Opcional) Iniciar un monitor de resultados en segundo plano
    go func() {
        for result := range pool.ResultStream() {
            if result.Error != nil {
                log.Printf("ERROR en Job %d: %v", result.JobID, result.Error)
                // Aquí podrías actualizar un estado en DB a "failed"
            } else {
                log.Printf("SUCCESS Job %d: %v (Tomó %v)", result.JobID, result.Output, result.Duration)
                // Aquí podrías actualizar un estado en DB a "completed" y almacenar el resultado
            }
        }
    }()

    // 3. Configurar el enrutador HTTP con Gorilla/Mux
    r := mux.NewRouter()
    r.HandleFunc("/api/v1/tasks", func(w http.ResponseWriter, r *http.Request) {
        if r.Method != http.MethodPost {
            http.Error(w, "Método no permitido", http.StatusMethodNotAllowed)
            return
        }

        var req taskRequest
        if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
            http.Error(w, "Cuerpo de solicitud inválido", http.StatusBadRequest)
            return
        }

        // Generar un ID único para el job
        jobID := atomic.AddInt64(&jobIDCounter, 1)

        // Crear el Job
        job := Job{
            ID:       int(jobID),
            TaskData: req,
            Handler:  simulateHeavyTask,
        }

        // Intentar enviar el job al pool
        if err := pool.SubmitJob(job); err != nil {
            log.Printf("Error al encolar job %d: %v", jobID, err)
            http.Error(w, "Servidor sobrecargado, intente más tarde", http.StatusServiceUnavailable)
            return
        }

        // Responder inmediatamente al cliente
        w.Header().Set("Content-Type", "application/json")
        w.WriteHeader(http.StatusAccepted) // 202 Accepted
        json.NewEncoder(w).Encode(taskResponse{
            JobID:      jobID,
            Status:     "enqueued",
            Message:    "La tarea ha sido aceptada y está en cola para procesamiento",
            EnqueuedAt: time.Now().UTC().Format(time.RFC3339),
        })
        log.Printf("Job %d encolado exitosamente para %s", jobID, req.UserEmail)
    }).Methods("POST")

    // 4. Iniciar el servidor HTTP
    log.Println("Servidor API iniciado en :8080")
    log.Fatal(http.ListenAndServe(":8080", r))
}

# Ejemplo de solicitud HTTP para probar el endpoint
curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{"user_email":"[email protected]","data":"parametros_del_reporte_123"}'

Errores Comunes y Cómo Evitarlos

1. No Establecer un Tamaño de Buffer Adecuado para el Canal de Jobs: Usar un canal sin buffer (make(chan Job)) hará que el despacho de jobs sea bloqueante. El manejador HTTP se quedará esperando hasta que un worker esté libre, anulando la ventaja de la asincronía. Por otro lado, un buffer demasiado grande puede enmascarar problemas de capacidad y llevar a un consumo excesivo de memoria si los jobs se acumulan. Solución: Usa un canal con buffer (make(chan Job, N)). El tamaño debe ser un equilibrio entre la capacidad de absorber picos de tráfico y los límites de memoria. Monitoriza la longitud del canal para ajustar este valor.

2. Fugas de Goroutines (Workers que Nunca Terminan): Si olvidas proporcionar un mecanismo para detener los workers, permanecerán en ejecución incluso después de que el main haya finalizado, o peor, quedarán bloqueados esperando en un canal cerrado. Solución: Implementa un cierre ordenado usando context.Context. En el método Stop() del pool, cancela el contexto y luego cierra los canales. Asegúrate de que el worker tenga una cláusula select que escuche <-ctx.Done() para salir limpiamente.

3. No Manejar los Pánicos dentro de los Workers: Si la función Handler de un job entra en pánico, la goroutine del worker se terminará, reduciendo permanentemente el tamaño de tu pool. Solución: Envuelve la ejecución del job en una función con defer y recover(). Captura el pánico, regístralo como un error en el resultado, y permite que el worker continúe procesando el siguiente job. Esto hace que tu pool sea resiliente a fallos inesperados.

4. Bloqueo por Canal de Resultados sin Lector: Si envías resultados a un canal (resultQueue) pero no hay ninguna goroutine leyendo de él, los workers se bloquearán indefinidamente una vez que el buffer se llene, deteniendo todo el procesamiento. Solución: Asegúrate de iniciar al menos una goroutine consumidora (un "monitor") que lea del canal de resultados antes de que el pool comience a trabajar. En el ejemplo, lanzamos esta goroutine justo después de pool.Start(). Alternativamente, puedes hacer que el canal de resultados tenga un buffer muy grande, pero la mejor práctica es tener un consumidor dedicado.

5. Ignorar el Backpressure y la Sobre carga: Cuando el canal de jobs está lleno, pool.SubmitJob(job) se bloqueará (o fallará, si usas un select con default como en nuestro ejemplo). Si simplemente ignoras este error y continúas aceptando peticiones HTTP, los clientes podrían experimentar timeouts sin una respuesta clara. Solución: Implementa un patrón de rechazo elegante. Como se muestra en el código, al fallar el envío al canal, respondemos con un 503 Service Unavailable. Para una solución más avanzada, considera un patrón de circuit breaker o una cola externa (como Redis) para una mayor durabilidad y capacidad.

Checklist de Dominio

Antes de considerar que dominas la implementación de un worker pool para APIs de alto rendimiento en Go, verifica que puedas realizar o explicar cada uno de los siguientes puntos:

  • Puedo explicar los cuatro componentes principales de un worker pool (Job, Worker, Dispatcher/Pool, Canales) y su analogía con un sistema del mundo real.
  • Sé cómo determinar un tamaño inicial apropiado para el pool basado en el tipo de tarea (CPU-bound vs I/O-bound) usando runtime.NumCPU().
  • He implementado un worker pool que incluye un cierre ordenado y seguro usando context.Context, evitando fugas de goroutines.
  • Puedo integrar el worker pool en un manejador HTTP (usando Gorilla/Mux o el estándar) para responder con 202 Accepted inmediatamente después de encolar la tarea.
  • Sé cómo manejar y registrar los resultados (o errores) de las tareas procesadas, típicamente usando un canal de resultados consumido por una goroutine separada.
  • He añadido manejo de recuperación de pánicos (recover) dentro de la lógica del worker para garantizar que un job fallido no derribe a todo el worker.
  • Puedo identificar y resolver el problema de backpressure, implementando un rechazo elegante (ej., HTTP 503) cuando la cola de jobs está llena.
  • Comprendo la importancia de los canales con buffer en este patrón y sé cómo elegir un tamaño de buffer apropiado para mi caso de uso específico.
Falar no WhatsApp
Laboratorio de práctica

Antes de marcar esta lección como completa, escribí una evidencia breve para Go para APIs de Alto Rendimiento: Construcción de Microservicios REST con Gorilla/Mux: un ejemplo, una decisión, una captura, una mini demo o una nota que puedas reutilizar en portfolio.

Reflexión rápida

¿Qué cambiarías en tu forma de trabajar después de aplicar práctica: implementar un worker pool para tareas asíncronas?

De lección a portfolio

Convertí esta lección en una prueba técnica visible.

Una app pequeña publicada, con README y decisiones explicadas, funciona mejor que una lista de tecnologías sueltas.

Paso 1

Creá una demo mínima que use el concepto de la lección.

Paso 2

Escribí un README corto con objetivo, stack, decisión técnica y mejora futura.

Paso 3

Publicá la demo y enlazala desde tu perfil profesional.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión

Práctica: Implementar un worker pool para tarea... | Cursalo