Ejecutar una sola llamada a un LLM es directo: envías un prompt, recibes una respuesta, procesas el resultado. Los sistemas de producción reales raramente se mantienen así de simples. Las tareas crecen hasta un nivel de complejidad que ninguna ventana de contexto puede contener, los requisitos de latencia empujan hacia la ejecución en paralelo y distintas subtareas se benefician de distintas capacidades de modelo. Los sistemas multi-agente resuelven estos problemas e introducen, a su vez, una clase completamente nueva de modos de fallo que la mayoría de los tutoriales ignora. Cuatro patrones aparecen en todo despliegue serio en producción: routing, descomposición de tareas, gestión de estado y recuperación ante fallos.

Routing: Despachando Tareas al Agente Correcto

El primer problema en cualquier sistema multi-agente es decidir qué agente se encarga de una entrada determinada. Hacerlo mal implica desperdiciar tokens enviando una búsqueda factual simple a un modelo de razonamiento costoso, o mandar un análisis matizado a un modelo que no puede manejarlo.

Tres enfoques dominan los sistemas en producción. El routing basado en reglas es el más predecible: un conjunto de condiciones explícitas mapea propiedades de la entrada a identificadores de agentes. Es rápido, depurable y no tiene latencia de LLM, pero se rompe cuando las categorías de entrada son difusas o se superponen. El routing por similitud de embeddings calcula un embedding vectorial de la entrada y encuentra el ejemplo etiquetado más cercano mediante similitud coseno; generaliza bien pero requiere mantener un conjunto de ejemplos etiquetados y añade una llamada al modelo de embeddings por solicitud. El routing basado en LLM usa un modelo pequeño y rápido para clasificar la intención antes de redirigir al especialista: es el más flexible de los tres, pero agrega un round-trip.

from __future__ import annotations
from dataclasses import dataclass
from typing import Callable
from openai import OpenAI
 
client = OpenAI()
 
@dataclass
class Route:
    name: str
    description: str
    handler: Callable[[str], str]
 
def llm_router(query: str, routes: list[Route]) -> str:
    """Use a cheap model to classify the query and dispatch to the correct handler."""
    route_descriptions = "\n".join(
        f"- {r.name}: {r.description}" for r in routes
    )
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a routing classifier. Given a user query, respond with "
                    "exactly one route name from the list below. Output only the name.\n\n"
                    f"{route_descriptions}"
                ),
            },
            {"role": "user", "content": query},
        ],
        max_tokens=20,
        temperature=0,
    )
    chosen = response.choices[0].message.content.strip()
    for route in routes:
        if route.name == chosen:
            return route.handler(query)
    # Fallback: first route
    return routes[0].handler(query)

Los routers basados en LLM pueden alucinar nombres de rutas. Siempre valida el nombre de ruta devuelto contra la lista registrada antes de despachar; un fallback silencioso a la primera ruta puede enmascarar fallos de clasificación durante horas antes de que alguien lo note.

El equilibrio clave en producción: precisión del routing versus costo del routing. En sistemas de alto volumen, un router basado en reglas con un 99% de precisión que maneja el 80% de las consultas, combinado con un router LLM para el 20% restante, gana tanto en costo como en fiabilidad.

Descomposición de Tareas: Dividiendo Trabajos Complejos en Pasos del Tamaño de un Agente

Una vez que tienes routing, el siguiente desafío son las tareas demasiado grandes para que un solo agente las complete de una vez. Una solicitud de “auditar esta base de código en busca de vulnerabilidades de seguridad, resumir los hallazgos y redactar tickets de remediación” abarca múltiples dominios y fácilmente supera cualquier ventana de contexto.

El enfoque naive — pasar la tarea completa a un único agente — falla de dos maneras predecibles: el modelo se queda sin contexto a mitad de la ejecución, o produce una respuesta superficial que cubre las tres subtareas de forma deficiente. El enfoque en producción es descomponer el trabajo antes de ejecutarlo.

Existen dos estrategias. El plan estático codifica la descomposición directamente en el orquestador: se sabe de antemano que una auditoría de código siempre se descompone en escaneo de archivos, búsqueda de patrones y generación de informe. El agente planificador es más flexible: el objetivo de alto nivel va a un modelo capaz, que produce un plan de ejecución estructurado para que el orquestador lo ejecute. Los planes estáticos son más rápidos y económicos; los agentes planificadores manejan tareas cuya estructura no puede anticiparse.

import json
from anthropic import Anthropic
from pydantic import BaseModel
 
anthropic = Anthropic()
 
class Step(BaseModel):
    id: str
    description: str
    depends_on: list[str]
    agent: str  # e.g. "code-analyst", "report-writer"
 
class ExecutionPlan(BaseModel):
    goal: str
    steps: list[Step]
 
def plan_task(goal: str) -> ExecutionPlan:
    """Ask a capable model to produce a structured execution plan."""
    response = anthropic.messages.create(
        model="claude-opus-4-7",
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": (
                    f"Break this goal into a structured execution plan as JSON.\n"
                    f"Goal: {goal}\n\n"
                    "Output a JSON object matching this schema:\n"
                    '{"goal": "...", "steps": [{"id": "step-1", '
                    '"description": "...", "depends_on": [], "agent": "..."}]}'
                ),
            }
        ],
    )
    raw = response.content[0].text
    # Strip markdown code fences if present
    if raw.startswith("```"):
        raw = raw.split("```")[1].lstrip("json").strip()
    return ExecutionPlan(**json.loads(raw))
 
def execute_plan(plan: ExecutionPlan) -> dict[str, str]:
    """Execute steps in dependency order, collecting results."""
    completed: dict[str, str] = {}
    pending = list(plan.steps)
 
    while pending:
        ready = [s for s in pending if all(d in completed for d in s.depends_on)]
        if not ready:
            raise RuntimeError("Circular dependency detected in plan")
        for step in ready:
            completed[step.id] = run_agent(step.agent, step.description, completed)
            pending.remove(step)
 
    return completed
 
def run_agent(agent: str, task: str, context: dict[str, str]) -> str:
    """Placeholder — dispatch to the appropriate agent implementation."""
    context_str = "\n".join(f"{k}: {v[:200]}" for k, v in context.items())
    response = anthropic.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[
            {
                "role": "user",
                "content": f"Agent: {agent}\nTask: {task}\nContext:\n{context_str}",
            }
        ],
    )
    return response.content[0].text

La validación con Pydantic sobre la salida del planificador no es opcional. Los LLM producen JSON estructuralmente válido la mayoría del tiempo, pero “la mayoría del tiempo” no es producción. Envuelve cada parseo JSON en un try/except y ten un fallback (reintento con un prompt más estricto, o caída a un plan estático).

El costo esperado de una descomposición basada en planificador escala con el número de pasos y el tamaño del contexto de cada paso. Para un plan con nn pasos, conteo promedio de tokens por paso tit_i, y costo por token cic_i, el costo total esperado es:

E[cost]=i=1npitici\mathbb{E}[\text{cost}] = \sum_{i=1}^{n} p_i \cdot t_i \cdot c_i

donde pip_i es la probabilidad de que el paso ii realmente se ejecute (los pasos con dependencias no satisfechas pueden omitirse ante un éxito parcial). En la práctica, estima esto antes de elegir entre descomposición estática y dinámica: los agentes planificadores añaden una llamada de planificación de alto costo al principio.

Gestión de Estado: Memoria Compartida a Escala

Con múltiples agentes ejecutando en paralelo o en secuencia, el estado se convierte en el problema más difícil. Cada agente necesita acceso a los resultados de los pasos previos, pero no puedes pasar todo el contexto acumulado a cada llamada descendente: las ventanas de contexto se desbordan, la latencia se acumula y se produce el efecto “perdido en el medio”, donde los modelos ignoran la información enterrada en prompts largos.

Tres patrones dominan la gestión de estado en producción: el scratchpad almacena resultados intermedios en un objeto estructurado y pasa solo el subconjunto relevante a cada agente; un bus de mensajes compartido (frecuentemente Redis o una cola de mensajes) permite que los agentes publiquen y se suscriban a resultados sin acoplamiento estrecho; y una capa de resumen condensa periódicamente los pasos previos en un resumen compacto que los agentes descendentes usan en lugar del historial completo.

import redis
import json
from dataclasses import dataclass, field, asdict
 
@dataclass
class AgentState:
    task_id: str
    goal: str
    completed_steps: dict[str, str] = field(default_factory=dict)
    summary: str = ""
    token_budget_remaining: int = 100_000
 
class RedisStateStore:
    def __init__(self, host: str = "localhost", port: int = 6379):
        self._client = redis.Redis(host=host, port=port, decode_responses=True)
        self._ttl = 3600  # 1 hour — tasks should not run forever
 
    def save(self, state: AgentState) -> None:
        key = f"agent_state:{state.task_id}"
        self._client.setex(key, self._ttl, json.dumps(asdict(state)))
 
    def load(self, task_id: str) -> AgentState | None:
        key = f"agent_state:{task_id}"
        raw = self._client.get(key)
        if raw is None:
            return None
        return AgentState(**json.loads(raw))
 
    def update_step(self, task_id: str, step_id: str, result: str) -> None:
        state = self.load(task_id)
        if state is None:
            raise KeyError(f"No state found for task {task_id}")
        # Trim result to avoid unbounded state growth
        state.completed_steps[step_id] = result[:2000]
        # Reduce token budget estimate
        state.token_budget_remaining -= len(result.split())
        self.save(state)

El estado obsoleto es el asesino silencioso en los flujos de trabajo de larga duración: si un paso falla y se reintenta, el resultado previo (parcial) puede seguir en el almacén. Siempre escribe el estado de forma atómica: actualiza el resultado del paso y su indicador de estado (pendiente/completo/fallido) en una sola operación. Las lecturas que ven “completo” sin resultado, o un resultado sin “completo”, indican una escritura parcial; hay que tratarlas como un fallo.

A escala, el desbordamiento de la ventana de contexto se convierte en el principal impulsor de costos. En lugar de acumular salidas brutas de los pasos, un agente de resumen se ejecuta después de cada kk pasos completados y comprime el historial. Los agentes descendentes reciben el resumen más el paso inmediatamente anterior, lo que representa típicamente un 80% menos de tokens que pasar el historial completo.

Recuperación ante Fallos: Cuando los Agentes y las Redes se Rompen

Cada llamada a un agente puede fallar: la API del modelo devuelve un 500, la respuesta supera el tiempo de espera, la salida no pasa la validación, una dependencia descendente no está disponible. Los sistemas de producción necesitan manejo explícito de fallos en cada capa.

El patrón más simple es el backoff exponencial con jitter en errores transitorios. tenacity encapsula esto de forma limpia:

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
)
import logging
from openai import APIError, APITimeoutError, RateLimitError
 
logger = logging.getLogger(__name__)
 
@retry(
    retry=retry_if_exception_type((APIError, APITimeoutError, RateLimitError)),
    stop=stop_after_attempt(4),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    before_sleep=before_sleep_log(logger, logging.WARNING),
)
def call_with_retry(model: str, messages: list[dict]) -> str:
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        timeout=30,
    )
    return response.choices[0].message.content

Los reintentos manejan fallos transitorios, pero no los fallos semánticos: aquellos en que el modelo devuelve una respuesta sintácticamente válida pero que no satisface los criterios de aceptación. Para estos casos necesitas un agente de fallback: un prompt más simple y restringido que tiene menos probabilidad de producir basura, pero también es menos capaz.

from pydantic import BaseModel, ValidationError
 
class StructuredOutput(BaseModel):
    summary: str
    action_items: list[str]
    confidence: float
 
def call_with_fallback(primary_prompt: str, fallback_prompt: str) -> StructuredOutput:
    """Try the primary agent; fall back to a simpler prompt on validation failure."""
    for prompt in [primary_prompt, fallback_prompt]:
        try:
            raw = call_with_retry("gpt-4o", [{"role": "user", "content": prompt}])
            # Strip markdown fences
            if raw.startswith("```"):
                raw = raw.split("```")[1].lstrip("json").strip()
            return StructuredOutput.model_validate_json(raw)
        except (ValidationError, Exception) as exc:
            logger.warning("Agent call failed: %s — trying fallback", exc)
    # Both failed — return a safe empty result rather than raising
    return StructuredOutput(summary="", action_items=[], confidence=0.0)

La idempotencia no es negociable para los agentes que escriben en sistemas externos: asigna un task_id estable antes de que comience cualquier llamada y verifica, antes de ejecutar cada paso, si ya existe un resultado exitoso en el almacén de estado. Esto hace que el orquestador sea seguro para reintentar a nivel de tarea sin duplicar efectos secundarios, algo esencial cuando un flujo de trabajo parcial falla y debe reanudarse.

Para tareas que pueden tener éxito parcial — cinco de ocho subtareas completas antes de un fallo — la reconciliación de resultados parciales es mejor que el reintento completo. Almacena el resultado de cada paso de forma atómica a medida que se completa. En la re-ejecución, omite cualquier paso con un resultado exitoso en el almacén de estado y solo re-ejecuta los pasos fallidos o faltantes. Este patrón convierte un reintento completo (costo: ntˉcˉn \cdot \bar{t} \cdot \bar{c}) en un reintento incremental (costo: proporcional al número de pasos fallidos).

Juntándolo Todo

Estos cuatro patrones se componen. En un flujo de trabajo en producción: el router despacha la entrada al descomponedor apropiado; el descomponedor produce un plan (estático o dinámico); el orquestador ejecuta el plan, escribiendo resultados en un almacén de estado y resumiendo periódicamente; y cada llamada a un agente está envuelta con lógica de reintentos y agentes de fallback. El almacén de estado es la columna vertebral: hace que todo el sistema sea reanudable, observable e idempotente.

El orden importa: routing y descomposición primero, gestión de estado y recuperación después. Implementar los cuatro en paralelo garantiza que ninguno funcione bien. El router híbrido —reglas para el 80%, LLM para el 20%— cuesta menos y falla de forma más predecible que un router LLM puro. Y el state store no es un detalle de infraestructura: es lo que convierte un prototipo que se cae en un sistema que puede reanudarse, observarse y auditarse.