Implementación del microservicio de órdenes con concurrencia

Lectura
25 min~11 min lectura

Introducción al Microservicio de Órdenes Concurrente

En esta lección, culminaremos nuestro proyecto integrador de e-commerce abordando el corazón de cualquier transacción en línea: el procesamiento de órdenes. El microservicio de órdenes es crítico, ya que coordina inventario, pagos y envíos, operaciones que deben ser consistentes y rápidas incluso bajo una carga masiva de usuarios. Implementaremos este servicio en Go, explotando su modelo de concurrencia basado en goroutines y canales para manejar múltiples solicitudes de creación de órdenes de manera eficiente y segura.

La concurrencia no es solo hacer las cosas en paralelo; es estructurar nuestro programa para gestionar múltiples tareas de forma que se maximice el uso de recursos y se minimicen los tiempos de espera. En el contexto de un e-commerce, un usuario no debería experimentar lentitud porque otros miles de usuarios están comprando al mismo tiempo. Aquí, diseñaremos una arquitectura que utilice goroutines para tareas de validación y notificación, canales (channels) para la comunicación segura entre componentes, y mutexes o transacciones de base de datos para proteger los datos compartidos, como el inventario.

El objetivo final es construir un endpoint /orders que, al recibir una solicitud POST, inicie un pipeline concurrente para validar los ítems, reservar inventario, registrar la orden en la base de datos y emitir eventos asíncronos, todo ello manteniendo la integridad de los datos y proporcionando una respuesta rápida al cliente. Este enfoque contrasta con un modelo secuencial bloqueante, que se convertiría rápidamente en un cuello de botella.

Concepto Clave: Concurrencia vs. Paralelismo en Procesamiento de Órdenes

Es fundamental distinguir entre concurrencia y paralelismo, ya que Go está diseñado para la concurrencia, que puede o no ejecutarse en paralelo. La concurrencia se trata de la composición de procesos de ejecución independientes (goroutines), permitiendo que un programa avance en múltiples tareas de manera entrelazada, incluso en una sola CPU. El paralelismo, en cambio, implica la ejecución simultánea de múltiples tareas en múltiples núcleos de CPU. En nuestro microservicio, diseñamos un flujo concurrente: mientras una goroutine valida la dirección de envío, otra puede estar verificando el stock. El scheduler de Go decide si estas goroutines se ejecutan en paralelo en diferentes núcleos o se intercalan en uno solo.

Una analogía del mundo real sería una cocina de un restaurante muy concurrido (nuestro servidor). La concurrencia es el chef que maneja varias sartenes en un solo fuego, atendiendo cada una por turnos breves para que nada se queme. El paralelismo sería tener varios chefs, cada uno en su propio fuego, cocinando platos simultáneamente. Nuestro microservicio debe estar preparado para operar eficientemente tanto en un entorno de un solo núcleo (concurrencia) como en uno multi-núcleo (paralelismo), y el modelo de Go nos permite hacerlo de manera transparente.

Los mecanismos clave que implementaremos son: Goroutines para la unidad de ejecución ligera, Canales (Channels) para la comunicación y sincronización segura entre goroutines (evitando condiciones de carrera), y el Patrón Worker Pool para controlar el número máximo de goroutines procesando órdenes simultáneamente y evitar sobrecargar el sistema. Este patrón es como tener un equipo fijo de cajeros en una tienda; si llegan más clientes, forman una cola, pero nunca hay más cajeros trabajando de los que la tienda puede soportar, manteniendo el servicio estable.

Cómo Funciona en la Práctica: Pipeline Concurrente de una Orden

Vamos a desglosar el flujo paso a paso para una sola solicitud de creación de orden. Cuando un cliente hace POST a /orders con un payload JSON, el handler no ejecuta toda la lógica de forma secuencial. En su lugar, coordina un pipeline de etapas, algunas de las cuales se ejecutan concurrentemente.

Paso 1: Recepción y Deserialización. El handler principal, enlazado con gorilla/mux, decodifica el JSON en una estructura de orden. Inmediatamente después, valida la estructura básica (campos requeridos). Luego, en lugar de proceder secuencialmente, lanza goroutines para tareas independientes. Por ejemplo, puede lanzar una goroutine para validar la información del cliente contra el microservicio de usuarios (vía una llamada HTTP concurrente) y otra para iniciar la validación de los ítems del carrito.

Paso 2: Validación y Reserva Concurrente. Este es el núcleo. Para cada ítem en la orden, necesitamos verificar su disponibilidad en el inventario y reservarlo. Hacer esto secuencialmente para 10 ítems implicaría 10 llamadas de red o consultas de base de datos consecutivas. En su lugar, creamos una goroutine por ítem (o un pool de workers) que consulta concurrentemente el servicio de inventario. Utilizamos un canal para recolectar los resultados (éxito o error) de todas estas goroutines. El handler principal espera (usando una declaración select o un sync.WaitGroup) a que todas las validaciones terminen. Si alguna falla, se cancela toda la operación y se liberan las reservas ya hechas.

Paso 3: Consolidación y Persistencia. Una vez que todas las validaciones concurrentes son exitosas, una única goroutine (generalmente la del handler principal o una dedicada) procede a persistir la orden completa en la base de datos. Esta operación debe ser atómica, por lo que usamos una transacción de base de datos. Tras el commit exitoso, se lanzan goroutines para tareas asíncronas de "segundo plano": enviar un correo de confirmación al cliente, notificar al servicio de envíos y registrar un evento en un sistema de mensajería (como Kafka) para que otros microservicios reaccionen. El handler puede entonces responder al cliente con un 201 Created, sin esperar a que estas notificaciones secundarias finalicen.

Código en Acción: Implementación del Handler Concurrente

A continuación, presentamos una implementación simplificada pero funcional del endpoint de creación de órdenes. Este código asume estructuras de datos definidas y una capa de acceso a datos (repository).


package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
    "time"

    "github.com/gorilla/mux"
    "github.com/google/uuid"
)

// Estructuras de datos
type OrderItemRequest struct {
    ProductID string `json:"product_id"`
    Quantity  int    `json:"quantity"`
}

type OrderRequest struct {
    UserID string              `json:"user_id"`
    Items  []OrderItemRequest `json:"items"`
}

type OrderResponse struct {
    OrderID   string    `json:"order_id"`
    Status    string    `json:"status"`
    CreatedAt time.Time `json:"created_at"`
}

// InventoryServiceClient simula una llamada a un microservicio de inventario
func InventoryServiceClient(productID string, quantity int) (bool, error) {
    // Simula latencia de red
    time.Sleep(50 * time.Millisecond)
    // Lógica de reserva ficticia. En la realidad, aquí iría una HTTP request.
    return true, nil // Siempre disponible para el ejemplo
}

func createOrderHandler(w http.ResponseWriter, r *http.Request) {
    var orderReq OrderRequest
    if err := json.NewDecoder(r.Body).Decode(&orderReq); err != nil {
        http.Error(w, "Invalid request payload", http.StatusBadRequest)
        return
    }

    // Validación básica
    if orderReq.UserID == "" || len(orderReq.Items) == 0 {
        http.Error(w, "UserID and Items are required", http.StatusBadRequest)
        return
    }

    // **Paso 2: Validación Concurrente de Inventario**
    var wg sync.WaitGroup
    errChan := make(chan error, len(orderReq.Items)) // Canal con buffer
    var mu sync.Mutex
    var reservedItems []string // Slice compartido para trackear reservas (requiere mutex)

    for _, item := range orderReq.Items {
        wg.Add(1)
        go func(item OrderItemRequest) {
            defer wg.Done()
            available, err := InventoryServiceClient(item.ProductID, item.Quantity)
            if err != nil {
                errChan <- fmt.Errorf("inventory service error for product %s: %v", item.ProductID, err)
                return
            }
            if !available {
                errChan <- fmt.Errorf("product %s is out of stock", item.ProductID)
                return
            }
            // Simular reserva (en realidad, sería un UPDATE en DB)
            mu.Lock()
            reservedItems = append(reservedItems, item.ProductID)
            mu.Unlock()
        }(item) // Pasar item como argumento para evitar closure sobre la variable de bucle
    }

    // Esperar a que todas las goroutines de validación terminen
    wg.Wait()
    close(errChan) // Cerrar el canal para poder iterar

    // Revisar si hubo errores
    var validationErrors []string
    for err := range errChan {
        validationErrors = append(validationErrors, err.Error())
    }
    if len(validationErrors) > 0 {
        // IMPORTANTE: Aquí se debería llamar a una función para liberar las reservas hechas (rollback)
        http.Error(w, fmt.Sprintf("Validation failed: %v", validationErrors), http.StatusUnprocessableEntity)
        return
    }

    // **Paso 3: Persistencia de la Orden (Transaccional)**
    newOrderID := uuid.New().String()
    // Aquí iría la lógica de base de datos transaccional.
    // orderRepo.CreateInTransaction(orderReq, newOrderID, reservedItems)
    // Por simplicidad, simulamos una persistencia exitosa.
    time.Sleep(20 * time.Millisecond)

    // **Tareas Asíncronas (Fire-and-Forget)**
    // Notificación por email
    go func(orderID, userID string) {
        // Simular envío de email
        time.Sleep(100 * time.Millisecond)
        fmt.Printf("Email de confirmación enviado para la orden %s al usuario %s\n", orderID, userID)
    }(newOrderID, orderReq.UserID)

    // Notificación al servicio de envíos
    go func(orderID string, items []OrderItemRequest) {
        time.Sleep(80 * time.Millisecond)
        fmt.Printf("Orden %s notificada al servicio de envíos\n", orderID)
    }(newOrderID, orderReq.Items)

    // Respuesta al cliente
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(OrderResponse{
        OrderID:   newOrderID,
        Status:    "PROCESSING",
        CreatedAt: time.Now().UTC(),
    })
}

func main() {
    r := mux.NewRouter()
    r.HandleFunc("/orders", createOrderHandler).Methods("POST")
    fmt.Println("Servicio de órdenes escuchando en puerto 8080...")
    http.ListenAndServe(":8080", r)
}

Este código ilustra el patrón fundamental: lanzar goroutines para operaciones de I/O (como las llamadas al servicio de inventario), sincronizar su finalización con WaitGroup, recoger errores a través de un canal, y finalmente persistir y responder. Las tareas de notificación se lanzan como goroutines independientes después de la confirmación, en un patrón "fire-and-forget" donde no nos interesa su resultado inmediato para la respuesta al cliente.

Tip Crítico: Siempre pasa los parámetros necesarios a la goroutine como argumentos de la función anónima. Evita cerrar (closure) sobre la variable de iteración del bucle directamente, ya que su valor puede cambiar antes de que la goroutine se ejecute, llevando a un comportamiento errático. Fíjate en go func(item OrderItemRequest) {...}(item) en el ejemplo.

Implementando un Worker Pool para Control de Carga

Lanzar una goroutine por ítem de orden puede ser peligroso si un carrito tiene cientos de ítems o bajo tráfico masivo. Un Worker Pool limita el número de goroutines concurrentes. Aquí hay un esquema básico:


func validateItemsWithWorkerPool(items []OrderItemRequest, numWorkers int) []error {
    jobs := make(chan OrderItemRequest, len(items))
    results := make(chan error, len(items))
    var wg sync.WaitGroup

    // Iniciar workers
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for item := range jobs {
                // Cada worker procesa ítems del canal 'jobs'
                _, err := InventoryServiceClient(item.ProductID, item.Quantity)
                results <- err
            }
        }(w)
    }

    // Enviar trabajos al pool
    for _, item := range items {
        jobs <- item
    }
    close(jobs) // Cerramos el canal para que los workers terminen

    // Esperar a que todos los workers terminen
    wg.Wait()
    close(results)

    // Recoger errores
    var errs []error
    for err := range results {
        if err != nil {
            errs = append(errs, err)
        }
    }
    return errs
}

Errores Comunes y Cómo Evitarlos

Al implementar concurrencia, los errores suelen ser sutiles y difíciles de depurar. Aquí los principales peligros:

1. Condiciones de Carrera (Race Conditions): Ocurren cuando múltiples goroutines acceden y modifican una variable compartida (como un contador, un slice o un mapa) sin sincronización, llevando a resultados impredecibles. En nuestro ejemplo, el slice reservedItems se protege con un sync.Mutex.

Cómo evitarlo: Usa canales para comunicar resultados en lugar de compartir memoria. Si debes compartir memoria, emplea primitivas de sincronización como mutexes (sync.Mutex o sync.RWMutex) o aprovecha los tipos seguros para concurrencia del paquete sync/atomic. La regla de oro es: "No compartas memoria por comunicación; comunícate compartiendo memoria (pero de forma segura)".

2. Fugas de Goroutines (Goroutine Leaks): Suceden cuando lanzas una goroutine que nunca termina, consumiendo recursos indefinidamente. Esto puede pasar si una goroutine se bloquea esperando en un canal que nunca recibe un valor o si se queda en un bucle infinito.

Cómo evitarlo: Siempre diseña una vía de finalización clara. Usa canales con contexto (context.Context) para permitir la cancelación. Por ejemplo, pasa un context.Context a tus goroutines y escucha su señal Done(). En nuestro worker pool, el cierre del canal jobs permite que los workers terminen elegantemente.

3. Uso Incorrecto de Cierres (Closures) en Bucles: Como se mencionó en el tip, si lanzas una goroutine dentro de un bucle y haces referencia a la variable de iteración, todas las goroutines probablemente verán el valor final de esa variable, no el valor en el momento de su lanzamiento.

Cómo evitarlo: Pasa la variable como parámetro a la función anónima de la goroutine: go func(param Tipo) { ... }(variableDelBucle). Esto crea una copia local para cada ejecución.

4. Bloqueo por Falta de Buffer en Canales o Deadlocks: Un deadlock ocurre cuando un conjunto de goroutines se bloquean esperando unas a otras indefinidamente. Un caso común es enviar a un canal sin buffer desde la goroutine principal sin que haya otra goroutine lista para recibir, o viceversa.

Cómo evitarlo: Para canales de recolección de resultados, usa buffers (como hicimos con errChan). Diagrama el flujo de comunicación. Usa la sentencia select con default para operaciones no bloqueantes cuando sea apropiado. Herramientas como el race detector de Go (go run -race) y un análisis cuidadoso del flujo son esenciales.

5. Ignorar el Manejo de Errores en Goroutines: Los errores que ocurren dentro de una goroutine lanzada de forma "fire-and-forget" se pierden silenciosamente si no se capturan. Esto puede ocultar fallos críticos en el sistema, como el envío de emails o notificaciones.

Cómo evitarlo: Para tareas críticas asíncronas, implementa un patrón de "supervisor" o un canal de errores dedicado. Incluso en tareas menos críticas, registra (log) los errores dentro de la propia goroutine usando un logger estructurado.

Checklist de Dominio

Antes de considerar esta lección completa, asegúrate de poder verificar los siguientes puntos:

  • Puedo explicar la diferencia entre concurrencia y paralelismo usando una analogía relacionada con microservicios.
  • He implementado un endpoint HTTP en Go que lanza goroutines para tareas de I/O independientes (como validaciones) y las sincroniza correctamente usando sync.WaitGroup.
  • Sé cómo usar canales con buffer y sin buffer para comunicar resultados y errores desde un conjunto de goroutines a la goroutine principal.
  • Puedo identificar y prevenir una condición de carrera en un escenario donde múltiples goroutines actualizan un slice o mapa compartido, utilizando un sync.Mutex.
  • He implementado o comprendo la estructura de un Worker Pool para limitar la concurrencia máxima en el procesamiento de elementos de una solicitud.
  • Sé cómo pasar parámetros a una goroutine dentro de un bucle para evitar el problema del cierre sobre la variable de iteración.
  • Puedo describir al menos dos estrategias para evitar fugas de goroutines (goroutine leaks) en un servicio de larga duración.
  • Comprendo la importancia de las transacciones de base de datos en el paso de persistencia para garantizar la atomicidad, incluso después de una validación concurrente exitosa.
De lección a portfolio

Convertí esta lección en una habilidad visible para entrevistas.

Guardá el curso, completá los ejercicios y conectá esta habilidad con una ruta de empleo, data, IA, programación o marketing.

Newsletter Cursalo

Recibí rutas y cursos nuevos

Sumate para recibir recursos orientados a empleo y portfolio.

  • Rutas de empleo
  • Cursos prácticos
  • Portfolio y entrevistas

Sin spam. También podés entrar con tu cuenta para guardar progreso. Iniciá sesión