Threads, Canales y Patrones de Concurrencia

Lectura
25 min~4 min lectura

Concepto clave

En sistemas de baja latencia y alta seguridad, la concurrencia no es un lujo sino una necesidad. Threads en Rust son unidades de ejecucion independientes que comparten memoria, mientras que canales son mecanismos de comunicacion que transfieren datos entre threads sin compartir memoria. Los patrones de concurrencia son soluciones probadas para problemas comunes, como el productor-consumidor o el worker pool.

Imagina un sistema de transacciones financieras: cada thread es un cajero procesando operaciones, los canales son las bandejas donde se depositan las solicitudes, y los patrones son los protocolos que evitan que dos cajeros atiendan al mismo cliente. La clave en Rust es que el sistema de tipos garantiza en tiempo de compilacion que no habra data races, eliminando una clase entera de errores criticos.

Como funciona en la practica

Veamos un ejemplo paso a paso de un sistema de procesamiento de pedidos con alta seguridad:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Crear un canal con multiples productores, un consumidor
    let (tx, rx) = mpsc::channel();
    
    // Thread 1: Productor de pedidos
    let tx1 = tx.clone();
    thread::spawn(move || {
        let pedidos = vec!["PED001", "PED002", "PED003"];
        for pedido in pedidos {
            tx1.send(pedido).unwrap();
            thread::sleep(Duration::from_millis(10));
        }
    });
    
    // Thread 2: Validador de seguridad
    let tx2 = tx.clone();
    thread::spawn(move || {
        let pedidos_validos = vec!["PED004", "PED005"];
        for pedido in pedidos_validos {
            tx2.send(pedido).unwrap();
            thread::sleep(Duration::from_millis(15));
        }
    });
    
    // Consumidor: Procesador central
    for pedido_recibido in rx {
        println!("Procesando: {}", pedido_recibido);
    }
}

Este ejemplo muestra como multiples threads pueden enviar datos a traves de un canal hacia un consumidor comun. La clave esta en que mpsc::channel() crea un canal seguro por tipos, y tx.clone() permite multiples productores.

Caso de estudio

Sistema de trading de alta frecuencia con latencia < 100 microsegundos:

ComponenteImplementacionMetrica de performance
Receptor de market dataThread dedicado con pinning a nucleoLatencia: 15 μs
Procesador de ordenesPool de 4 workers con canalesThroughput: 50k ops/seg
Logger de auditoriaThread separado con channel boundedZero allocation en hot path
En sistemas criticos, la regla es: un thread por nucleo fisico para CPU-bound, canales para comunicacion, y evitar locks siempre que sea posible.

Implementacion clave del worker pool:

use std::sync::mpsc;
use std::thread;

struct WorkerPool {
    workers: Vec>,
    sender: mpsc::Sender,
}

impl WorkerPool {
    fn new(size: usize) -> Self {
        let (sender, receiver) = mpsc::channel();
        let receiver = std::sync::Arc::new(std::sync::Mutex::new(receiver));
        
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            let receiver = receiver.clone();
            workers.push(thread::spawn(move || {
                loop {
                    let job = receiver.lock().unwrap().recv().unwrap();
                    job.execute();
                }
            }));
        }
        
        WorkerPool { workers, sender }
    }
}

Errores comunes

  • Deadlocks por orden incorrecto de locks: En sistemas con multiples mutexes, adquirirlos en orden inconsistente entre threads causa deadlocks. Solucion: Establecer una jerarquia global de locks.
  • Channel starvation: Un productor rapido puede saturar a un consumidor lento. Usa sync_channel con limite de capacidad para backpressure.
  • Thread explosion: Crear threads sin limite en sistemas de larga duracion. Implementa un pool con tamaño fijo basado en nucleos fisicos.
  • False sharing: Threads en nucleos diferentes accediendo a la misma linea de cache. Usa padding o #[repr(align(64))] para estructuras criticas.
  • Panic propagation: Un thread que panic puede dejar el sistema en estado inconsistente. Usa catch_unwind en puntos criticos.

Checklist de dominio

  1. Puedo implementar un worker pool que escala con nucleos fisicos, no logicos
  2. Se cuando usar mpsc::channel() vs mpsc::sync_channel() basado en requirements de latencia
  3. Puedo demostrar ausencia de data races en un sistema multi-thread complejo
  4. Implemento backpressure adecuado para evitar out-of-memory en picos de carga
  5. Uso std::thread::sleep solo para testing, nunca en produccion de baja latencia
  6. Mido y optimizo cache locality en estructuras compartidas entre threads
  7. Documento el protocolo de shutdown para todos los threads en el sistema

Sistema de procesamiento de transacciones bancarias con garantia de consistencia

Implementa un sistema que procese transacciones bancarias concurrentemente con garantias de ACID. Sigue estos pasos:

  1. Crea una estructura Transaction con campos: id (u32), amount (f64), from_account (String), to_account (String)
  2. Implementa un worker pool de 4 threads que procesen transacciones desde un canal compartido
  3. Cada worker debe validar que: amount > 0, from_account != to_account, y simular un procesamiento con std::thread::sleep(Duration::from_micros(100))
  4. Agrega un logger de auditoria en un thread separado que reciba cada transaccion procesada exitosamente
  5. Implementa backpressure: si hay mas de 1000 transacciones pendientes, los productores deben esperar
  6. Agrega un mecanismo de shutdown graceful que procese todas las transacciones pendientes antes de terminar
  7. Mide el throughput en transacciones/segundo con 10,000 transacciones de prueba

Entrega el codigo completo y las metricas de performance obtenidas.

Pistas
  • Usa sync_channel con capacidad 1000 para implementar el backpressure
  • Para el shutdown, considera enviar un mensaje especial por el canal que indique a los workers que terminen
  • Usa std::time::Instant para medir el throughput con precision

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.