Práctica: Desarrollar un API con Multiples Bases de Datos y Transacciones

Lectura
25 min~5 min lectura

Concepto clave

En el desarrollo de APIs empresariales avanzadas, es común enfrentarse a escenarios donde una aplicación necesita interactuar con múltiples bases de datos simultáneamente. Esto puede deberse a requisitos de segregación de datos (ej: base de datos principal para usuarios y otra para logs), integración con sistemas heredados, o necesidades de escalabilidad horizontal. FastAPI, combinado con SQLAlchemy, proporciona herramientas robustas para manejar estas situaciones manteniendo la coherencia de datos a través de transacciones distribuidas.

Imagina un sistema bancario moderno: cuando realizas una transferencia entre cuentas, el sistema debe actualizar tanto la cuenta de origen (restar fondos) como la de destino (sumar fondos). Si una de estas operaciones falla, ambas deben revertirse para evitar inconsistencias. Esto es precisamente lo que logramos con transacciones en múltiples bases de datos: atomicidad (todas las operaciones se completan o ninguna) y consistencia (los datos permanecen válidos en todos los sistemas).

Cómo funciona en la práctica

Para implementar esta arquitectura en FastAPI, seguimos un patrón de diseño basado en sesiones de base de datos independientes y gestión manual de transacciones. Primero, configuramos conexiones separadas para cada base de datos usando SQLAlchemy. Luego, creamos un contexto de transacción que coordina las operaciones en todas las bases de datos involucradas.

Paso a paso:

  1. Configurar engines de SQLAlchemy para cada base de datos (ej: PostgreSQL para datos principales, MySQL para logs)
  2. Crear sesiones independientes para cada engine
  3. Implementar un gestor de contexto que inicie transacciones en todas las sesiones
  4. Ejecutar operaciones dentro del contexto, manejando commit/rollback coordinados
  5. Garantizar el cierre adecuado de conexiones incluso en caso de errores

Código en acción

Configuración inicial de múltiples bases de datos:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager

# Configurar engines para dos bases de datos diferentes
main_engine = create_engine(
    "postgresql://user:pass@localhost/main_db",
    pool_pre_ping=True,
    echo=False
)

logs_engine = create_engine(
    "mysql://user:pass@localhost/logs_db",
    pool_pre_ping=True,
    echo=False
)

# Crear sessionmakers independientes
MainSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=main_engine)
LogsSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=logs_engine)

@contextmanager
def get_multidb_session():
    """Context manager para manejar sesiones de múltiples bases de datos"""
    main_session = MainSessionLocal()
    logs_session = LogsSessionLocal()
    
    try:
        # Iniciar transacciones en ambas sesiones
        main_session.begin()
        logs_session.begin()
        
        yield {"main": main_session, "logs": logs_session}
        
        # Commit solo si todo fue exitoso
        main_session.commit()
        logs_session.commit()
        
    except Exception as e:
        # Rollback en ambas bases de datos si hay error
        main_session.rollback()
        logs_session.rollback()
        raise e
        
    finally:
        # Cerrar sesiones
        main_session.close()
        logs_session.close()

Implementación de un endpoint con transacciones distribuidas:

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from typing import Dict

router = APIRouter()

class TransferRequest(BaseModel):
    from_account: str
    to_account: str
    amount: float
    description: str

def create_log_entry(session, action: str, details: Dict):
    """Función auxiliar para crear registros de auditoría"""
    from models.logs import AuditLog
    
    log_entry = AuditLog(
        action=action,
        details=details,
        timestamp=datetime.utcnow()
    )
    session.add(log_entry)
    return log_entry.id

@router.post("/transfer")
async def transfer_funds(
    request: TransferRequest,
    sessions: Dict = Depends(get_multidb_session)
):
    """Endpoint que transfiere fondos entre cuentas y registra la operación"""
    main_session = sessions["main"]
    logs_session = sessions["logs"]
    
    try:
        # 1. Validar y actualizar cuentas en base de datos principal
        from models.main import Account
        
        from_account = main_session.query(Account).filter(
            Account.number == request.from_account
        ).with_for_update().first()
        
        to_account = main_session.query(Account).filter(
            Account.number == request.to_account
        ).with_for_update().first()
        
        if not from_account or not to_account:
            raise HTTPException(status_code=404, detail="Cuenta no encontrada")
        
        if from_account.balance < request.amount:
            raise HTTPException(status_code=400, detail="Fondos insuficientes")
        
        # Realizar transferencia
        from_account.balance -= request.amount
        to_account.balance += request.amount
        
        main_session.add(from_account)
        main_session.add(to_account)
        
        # 2. Registrar operación en base de datos de logs
        log_details = {
            "from_account": request.from_account,
            "to_account": request.to_account,
            "amount": request.amount,
            "description": request.description
        }
        
        log_id = create_log_entry(logs_session, "TRANSFER", log_details)
        
        # 3. Confirmar cambios (commit implícito al salir del contexto)
        return {
            "message": "Transferencia exitosa",
            "transaction_id": log_id,
            "new_balance": from_account.balance
        }
        
    except HTTPException:
        # Re-lanzar excepciones HTTP
        raise
        
    except Exception as e:
        # Cualquier otro error causará rollback automático
        raise HTTPException(
            status_code=500,
            detail=f"Error en la transferencia: {str(e)}"
        )

Errores comunes

1. Deadlocks por orden incorrecto de bloqueos: Al acceder a múltiples recursos en diferentes bases de datos, siempre sigue un orden consistente (ej: siempre bloquear recurso A antes que B) para evitar deadlocks.

2. No manejar timeouts de conexión: Configura timeouts apropiados en los engines y considera implementar retry logic para operaciones transaccionales.

3. Olvidar el cierre de sesiones: Usa context managers (with) o bloques try-finally para garantizar que las sesiones se cierren incluso en caso de errores.

4. Asumir consistencia inmediata: En sistemas distribuidos, puede haber latencia entre commits. Considera patrones como eventual consistency cuando sea apropiado.

5. No monitorear transacciones largas: Implementa logging y métricas para identificar transacciones que consumen demasiado tiempo y podrían causar bloqueos.

Checklist de dominio

  • ✓ Configurar múltiples engines de SQLAlchemy con parámetros optimizados para producción
  • ✓ Implementar un gestor de contexto que coordine transacciones en todas las bases de datos
  • ✓ Usar with_for_update() para operaciones que requieren bloqueo de filas
  • ✓ Manejar rollback coordinado cuando ocurren errores en cualquier base de datos
  • ✓ Implementar logging de auditoría en base de datos separada
  • ✓ Probar escenarios de fallo (timeouts, deadlocks, desconexiones)
  • ✓ Documentar el flujo transaccional para el equipo de desarrollo

Implementar un Sistema de Pedidos con Inventario y Logística

Desarrolla un API RESTful para un sistema de e-commerce que maneje pedidos, actualice inventario y registre eventos de logística en bases de datos separadas. Sigue estos pasos:

  1. Configuración inicial: Crea tres bases de datos PostgreSQL: orders_db (pedidos), inventory_db (inventario), logistics_db (logística). Configura los engines correspondientes en FastAPI.
  2. Modelos de datos: Define modelos SQLAlchemy para:
    • Order (orders_db): id, user_id, items, total, status
    • Product (inventory_db): id, name, stock, price
    • Shipment (logistics_db): id, order_id, status, tracking_number
  3. Endpoint principal: Implementa POST /orders que:
    • Reciba items y user_id
    • Verifique disponibilidad en inventory_db (reduciendo stock)
    • Cree el pedido en orders_db
    • Genere un registro de envío en logistics_db
    • Use transacciones coordinadas en las tres bases de datos
  4. Manejo de errores: Implementa rollback coordinado si:
    • No hay stock suficiente
    • Error al crear el pedido
    • Falla el registro de logística
  5. Endpoint de cancelación: Implementa POST /orders/{order_id}/cancel que revierta todas las operaciones.

Entrega: Código completo con tests que verifiquen la atomicidad de las transacciones.

Pistas
  • Usa el patrón de context manager para manejar múltiples sesiones
  • Considera usar with_for_update() al verificar/actualizar inventario para evitar condiciones de carrera
  • Implementa un mecanismo de retry para manejar deadlocks transitorios

Evalua tu comprension

Completa el quiz interactivo de arriba para ganar XP.