Práctica: Implementar un sistema de procesamiento en lote con goroutines

Lectura
30 min~10 min lectura

Práctica: Implementar un sistema de procesamiento en lote con goroutines

En esta lección práctica, nos sumergiremos en la construcción de un componente fundamental para APIs de alto rendimiento: un sistema de procesamiento en lote (batch processing) concurrente. Moveremos datos de manera eficiente, transformaremos registros y gestionaremos recursos, todo utilizando el poder de las goroutines y los channels de Go. Este patrón es esencial para tareas como la ingestión de datos, el procesamiento de colas de mensajes, la transformación de registros antes de su inserción en una base de datos, o la generación de reportes agregados. Aprenderás a diseñar una canalización (pipeline) concurrente que maximice el throughput mientras mantiene un consumo de memoria controlado y maneja errores de forma elegante.

El objetivo final es pasar de un modelo secuencial y lento a uno concurrente y eficiente. Comenzaremos con un problema clásico: leer miles de registros de una fuente (como un archivo JSON, una API o una cola), aplicar una transformación o validación costosa a cada uno, y finalmente escribir los resultados en un destino. Hacer esto de uno en uno es inaceptablemente lento en sistemas de alto rendimiento. Nuestra solución utilizará un patrón de worker pool (grupo de trabajadores) para paralelizar el trabajo de procesamiento, canales con búfer para desacoplar las etapas y WaitGroups para la sincronización segura.

Concepto clave: Pipelines concurrentes y Worker Pools

Imagina una línea de ensamblaje en una fábrica de automóviles. El chasis entra por un extremo y sale un coche terminado por el otro. En lugar de que un solo robot haga todas las tareas (pintar, instalar el motor, colocar los asientos) de forma secuencial sobre un coche antes de pasar al siguiente, la línea tiene estaciones de trabajo especializadas que operan en paralelo. Mientras el coche A está siendo pintado en la estación 1, el coche B puede estar recibiendo su motor en la estación 2. Esto es una pipeline concurrente. En nuestro contexto, las estaciones son goroutines, y los coches son nuestros datos (por ejemplo, pedidos, logs, transacciones).

Un Worker Pool es una implementación concreta de este concepto para una etapa particularmente costosa de la pipeline. Siguiendo la analogía, si pintar un coche es la operación más lenta, tendremos no una, sino varias cabinas de pintura idénticas (los workers) operando al mismo tiempo. Un sistema de gestión (el canal de jobs) asigna un chasis a la primera cabina disponible. Esto evita que una tarea lenta congele toda la línea de producción. En Go, el pool se implementa lanzando un número fijo de goroutines (los workers) que escuchan un canal de trabajos (jobs channel). Una goroutine productora envía los trabajos a este canal, y los workers los consumen y procesan concurrentemente, enviando los resultados a otro canal.

Tip de Diseño: El tamaño del worker pool (número de goroutines) y el buffer de los canales son parámetros críticos para el rendimiento. Un pool muy pequeño no aprovecha los núcleos de la CPU; uno muy grande puede saturar la memoria o los recursos externos (como conexiones de base de datos). Comienza con un tamaño igual al número de núcleos lógicos (runtime.NumCPU()) y ajústalo mediante pruebas de carga.

Cómo funciona en la práctica: Diseñando nuestra canalización de lote

Vamos a desglosar nuestro sistema en etapas claramente definidas. Primero, la etapa de lectura (Producer). Esta goroutine es responsable de obtener los datos brutos de la fuente. Podría estar leyendo líneas de un archivo, consumiendo mensajes de Kafka, o paginando resultados de una API REST. Su trabajo es colocar cada unidad de trabajo (por ejemplo, un struct que representa un pedido sin procesar) en un canal, conocido como el canal de trabajos (jobs channel). Para evitar que el productor inunde la memoria si los workers son lentos, usaremos un canal con un búfer de tamaño definido. Cuando el productor termina de leer todos los datos, cierra el canal, señalando que no hay más trabajos.

La segunda etapa es el procesamiento concurrente (Worker Pool). Aquí es donde reside la magia de la concurrencia. Iniciamos un número predefinido de goroutines workers. Cada worker ejecuta un bucle infinito que intenta recibir un trabajo del jobs channel. Cuando recibe uno, ejecuta la operación costosa: podría ser validar el pedido, enriquecerlo con datos de otra API, calcular impuestos, o transformar su formato. Una vez procesado, el worker envía el resultado a un tercer canal: el canal de resultados (results channel). Si el jobs channel se cierra y no quedan más elementos, el worker sale de su bucle.

Finalmente, la etapa de escritura (Consumer/Collector). Una goroutine (o varias) se encarga de recibir los resultados procesados del results channel y persistirlos en el destino final: escribirlos en un nuevo archivo, insertarlos en una base de datos, o enviarlos a otro servicio. Esta etapa debe ser consciente de la finalización: necesita saber cuándo todos los workers han terminado su trabajo y no habrá más resultados. Para coordinar esto, utilizaremos un sync.WaitGroup que los workers señalarán cuando terminen, permitiendo que el collector cierre el canal de resultados de forma segura y luego proceda con cualquier operación final (como hacer commit de una transacción).

Código en acción: Sistema de procesamiento de pedidos

A continuación, implementaremos un sistema que simula el procesamiento de un lote de pedidos desde un archivo JSON. Cada pedido será validado y enriquecido con información de impuestos de forma concurrente. Este es un ejemplo completo y funcional que puedes adaptar.

Estructuras y configuración inicial

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "math/rand"
    "os"
    "sync"
    "time"
)

// Order representa un pedido entrante.
type Order struct {
    ID        string  `json:"id"`
    CustomerID string `json:"customer_id"`
    Amount    float64 `json:"amount"`
    TaxRate   float64 `json:"-"` // Se calculará y enriquecerá
    Total     float64 `json:"-"` // Se calculará
    Valid     bool    `json:"-"` // Resultado de la validación
}

// Config holds the pipeline configuration.
type Config struct {
    InputFile     string
    WorkerCount   int
    JobQueueSize  int // Buffer del canal de jobs
    ResultQueueSize int // Buffer del canal de resultados
}

// Simula la obtención de la tasa de impuesto para un cliente (llamada costosa).
func fetchTaxRate(customerID string) float64 {
    time.Sleep(time.Millisecond * time.Duration(rand.Intn(50)+10)) // Simula latencia de red
    // En la realidad, aquí habría una llamada HTTP o una consulta a DB.
    // Asignamos una tasa ficticia basada en el ID.
    return 0.21 // 21% de IVA para el ejemplo
}

// Valida el pedido (reglas de negocio simples).
func validateOrder(order *Order) bool {
    return order.ID != "" && order.CustomerID != "" && order.Amount > 0
}

Implementación de la Pipeline Concurrente

func main() {
    rand.Seed(time.Now().UnixNano())
    config := Config{
        InputFile:       "orders.json",
        WorkerCount:     5,
        JobQueueSize:    100,
        ResultQueueSize: 100,
    }

    // CANALES: El sistema circulatorio de nuestra pipeline.
    jobsChan := make(chan Order, config.JobQueueSize)
    resultsChan := make(chan Order, config.ResultQueueSize)

    // WAITGROUP: Para saber cuándo todos los workers han terminado.
    var wg sync.WaitGroup

    // --- ETAPA 1: Producer (Lectura) ---
    go func() {
        defer close(jobsChan) // Señal crucial: no hay más trabajos.
        file, err := os.Open(config.InputFile)
        if err != nil {
            log.Fatalf("No se pudo abrir el archivo: %v", err)
        }
        defer file.Close()

        decoder := json.NewDecoder(file)
        // Lee el archivo como un array de órdenes.
        _, err = decoder.Token() // Lee el '['
        if err != nil {
            log.Fatalf("Error leyendo el array: %v", err)
        }

        for decoder.More() {
            var order Order
            err := decoder.Decode(&order)
            if err != nil {
                log.Printf("Error decodificando orden, se omite: %v", err)
                continue
            }
            jobsChan <- order // Envía la orden al canal de trabajos.
            fmt.Printf("Productor: Orden %s enviada a procesar.\n", order.ID)
        }
        fmt.Println("Productor: Finalizada la lectura de órdenes.")
    }()

    // --- ETAPA 2: Worker Pool (Procesamiento) ---
    for i := 0; i < config.WorkerCount; i++ {
        wg.Add(1) // Añade un worker al contador del WaitGroup.
        workerID := i + 1
        go func(id int) {
            defer wg.Done() // Señaliza que este worker ha terminado.
            fmt.Printf("Worker %d: Iniciado.\n", id)
            for job := range jobsChan { // Recibe trabajos hasta que el canal se cierre.
                fmt.Printf("Worker %d: Procesando orden %s.\n", id, job.ID)
                
                // 1. Validación
                job.Valid = validateOrder(&job)
                if !job.Valid {
                    fmt.Printf("Worker %d: Orden %s inválida.\n", id, job.ID)
                    resultsChan <- job
                    continue
                }
                
                // 2. Enriquecimiento (operación "costosa" simulada)
                job.TaxRate = fetchTaxRate(job.CustomerID)
                job.Total = job.Amount * (1 + job.TaxRate)
                
                // 3. Envío del resultado
                resultsChan <- job
                fmt.Printf("Worker %d: Orden %s procesada. Total: %.2f\n", id, job.ID, job.Total)
            }
            fmt.Printf("Worker %d: Finalizado (canal de jobs cerrado).\n", id)
        }(workerID)
    }

    // --- ETAPA 3: Collector (Escritura/Consumo) ---
    // Una goroutine que espera a que los workers terminen y luego cierra resultsChan.
    go func() {
        wg.Wait()          // Espera BLOQUEANTE hasta que todos los workers llamen a Done().
        close(resultsChan) // Señal crucial: no habrá más resultados.
        fmt.Println("Collector: Todos los workers han terminado. Cerrando canal de resultados.")
    }()

    // --- CONSUMIDOR FINAL (en el hilo principal) ---
    var processedOrders []Order
    for result := range resultsChan { // Recibe resultados hasta que el canal se cierre.
        processedOrders = append(processedOrders, result)
        fmt.Printf("Collector: Recibida orden %s (Válida: %v).\n", result.ID, result.Valid)
    }

    // --- POST-PROCESAMIENTO ---
    fmt.Println("\n--- Procesamiento de Lote Completado ---")
    fmt.Printf("Total de órdenes procesadas: %d\n", len(processedOrders))
    validCount := 0
    var totalRevenue float64
    for _, o := range processedOrders {
        if o.Valid {
            validCount++
            totalRevenue += o.Total
        }
    }
    fmt.Printf("Órdenes válidas: %d\n", validCount)
    fmt.Printf("Ingreso total calculado: %.2f\n", totalRevenue)
    // Aquí se podrían escribir los resultados procesados a un archivo o DB.
}

Archivo de datos de ejemplo (orders.json)

[
  {"id": "ORD001", "customer_id": "CUST100", "amount": 150.50},
  {"id": "ORD002", "customer_id": "CUST101", "amount": 75.25},
  {"id": "ORD003", "customer_id": "", "amount": 200.00},
  {"id": "ORD004", "customer_id": "CUST102", "amount": -50.00},
  {"id": "ORD005", "customer_id": "CUST103", "amount": 300.75},
  {"id": "ORD006", "customer_id": "CUST104", "amount": 89.99}
]

Errores comunes y cómo evitarlos

1. Fugas de Goroutines (Goroutine Leaks): El error más grave. Ocurre cuando lanzas una goroutine que se bloquea indefinidamente, por ejemplo, esperando en un canal que nunca será leído o escrito. Esto consume recursos y puede llevar al agotamiento de memoria.
Solución: Siempre diseña con una vía de salida clara. Usa context.Context con cancelación para operaciones de larga duración. Asegúrate de que los canales sean cerrados apropiadamente (generalmente por la goroutine productora) y que los bucles for range sobre canales terminen cuando el canal se cierre.

2. Cerrar canales prematuramente o múltiples veces: Cerrar un canal que ya está cerrado causará un panic. Enviar a un canal cerrado también causa panic.
Solución: La responsabilidad de cerrar un canal debe estar claramente asignada, idealmente a una única goroutine (la productora). Usa sync.WaitGroup o un patrón de "coordinador" para asegurar que todas las operaciones de envío hayan terminado antes de proceder al cierre. El patrón de usar una goroutine separada para esperar al wg.Wait() y luego cerrar el canal de resultados, como en nuestro ejemplo, es una práctica robusta.

3. No manejar el backpressure: Si el productor es mucho más rápido que los workers y el canal de jobs no tiene buffer (o es pequeño), el productor se bloqueará constantemente, ralentizando todo el sistema. Por el contrario, un buffer enorme puede enmascarar problemas y consumir mucha memoria.
Solución: Perfila tu pipeline para encontrar cuellos de botella. Ajusta el tamaño del buffer (JobQueueSize) y el número de workers (WorkerCount) según las características de tu carga de trabajo. Considera patrones más avanzados como canales de canales o tasa limitada (rate limiting) si es necesario.

4. Acceso concurrente a datos compartidos sin sincronización: Si múltiples goroutines leen y escriben en la misma variable (como un slice o un map) sin protección, se producirán condiciones de carrera (data races) y resultados impredecibles.
Solución: El mantra de Go es "No te comuniques compartiendo memoria; comparte memoria comunicándote". Usa canales para pasar la propiedad de los datos entre goroutines. Si debes compartir un estado, utiliza los primitivos de sincronización del paquete sync, como sync.Mutex o sync.RWMutex. En nuestro ejemplo, cada orden es procesada por un solo worker y enviada por el canal, evitando compartir datos.

5. Ignorar los errores en goroutines: Si una goroutine falla silenciosamente (por ejemplo, por un panic no recuperado o un error no registrado), el sistema puede continuar en un estado inconsistente o perder datos.
Solución: Implementa un mecanismo robusto de manejo de errores dentro de cada goroutine. Puedes usar un canal dedicado de errores que recopile fallos desde todas las goroutines. Siempre usa defer y recover() en el inicio de goroutines que puedan hacer panic (aunque lo ideal es evitar los panic). Registra los errores de forma centralizada.

Checklist de dominio

Antes de considerar dominada esta lección, asegúrate de poder verificar los siguientes puntos:

  • Puedo explicar la analogía de la línea de ensamblaje y cómo se mapea a una pipeline de goroutines y canales.
  • He implementado un Worker Pool desde cero, configurando el número de workers, un canal de jobs con buffer, y coordinando su cierre y finalización.
  • Uso sync.WaitGroup correctamente para esperar la finalización de un grupo de goroutines, llamando a Add(), Done(), y Wait() en el orden adecuado y desde las goroutines correctas.
  • Sé cuándo y quién debe cerrar un canal en una pipeline de múltiples etapas, y entiendo las consecuencias de cerrarlo mal (panic) o no cerrarlo (goroutine leak).
  • Puedo ajustar los parámetros de rendimiento (número de workers, tamaño de buffer) basándome en el perfil de la carga de trabajo (operaciones I/O-bound vs CPU-bound).
  • He integrado un sistema de procesamiento en lote en un contexto de API, por ejemplo, para procesar asincrónicamente archivos subidos o colas de mensajes.
  • Puedo diagnosticar y evitar fugas de goroutines y condiciones de carrera usando las herramientas del runtime de Go (como el race detector: go run -race).
  • He diseñado un manejo de errores básico para la pipeline, asegurando que los fallos en una goroutine no colapsen silenciosamente el proceso completo.
De lección a portfolio

Convertí esta lección en una habilidad visible para entrevistas.

Guardá el curso, completá los ejercicios y conectá esta habilidad con una ruta de empleo, data, IA, programación o marketing.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión