Verificando acesso...

MÓDULO 3.4

🎼 Orquestrador multi-tenant

Chief-of-staff que decompõe pedido, roteia para sub-agentes especialistas, isola falhas, respeita tier do user e pede aprovação humana quando o risco passa do limite.

6
Tópicos
45
Minutos
Builder
Nível
Código
Tipo
1

👔 Chief-of-staff pattern

Agente solo em produção falha em tarefas longas: tenta tudo no mesmo contexto, perde o fio, alucina. O chief-of-staff é um agente fino que não executa — decompõe o pedido, delega para especialistas, reconcilia o retorno. Pesquisa da Anthropic mostra +90,2% de qualidade vs. agente único no mesmo benchmark.

📊 O dado que justifica

90,2% — melhoria média do chief-of-staff multi-agente vs. agente solo (mesma família de modelos, mesmo prompt-base).

Vale para research, planejamento longo, multi-step com tools. Não vale para chat curto — aí o solo ganha em latência.

Fluxo em ASCII

  user_request
       │
       ▼
  ┌──────────────────┐
  │   ChiefOfStaff   │  ── decompose() ──▶  [task1, task2, task3]
  │  (não executa)   │
  └──────────────────┘
       │ delegate(task, subagent)
       ├──────────────┬──────────────┐
       ▼              ▼              ▼
  ┌──────────┐   ┌──────────┐   ┌──────────┐
  │Researcher│   │ Planner  │   │ Executor │
  │ (web,RAG)│   │ (LLM)    │   │ (tools)  │
  └──────────┘   └──────────┘   └──────────┘
       │              │              │
       └──────────────┴──────────────┘
                      ▼
              reconcile(results)
                      │
                      ▼
                 response

🧠 Esqueleto do ChiefOfStaff

class ChiefOfStaff:
    def __init__(self, user: User, registry: SubagentRegistry):
        self.user = user
        self.registry = registry
        self.llm = pick_model(user.tier)  # tier define modelo

    async def handle(self, request: str) -> Response:
        tasks = await self.decompose(request)
        results = []
        for task in tasks:
            sub = self.registry.pick(task.kind, tier=self.user.tier)
            try:
                r = await self.delegate(sub, task)
            except SubagentError as e:
                r = TaskFailed(task=task, error=str(e))  # isola
            results.append(r)
        return await self.reconcile(request, results)

    async def decompose(self, request):
        prompt = f"Quebre em sub-tarefas atômicas:\n{request}"
        return await self.llm.complete(prompt, schema=TaskList)

    async def delegate(self, sub, task):
        return await sub.run(task, user=self.user)

    async def reconcile(self, request, results):
        prompt = build_reconcile_prompt(request, results)
        return await self.llm.complete(prompt)
Decompose

LLM quebra em TaskList via schema validado.

Delegate

Cada task vai pro sub-agente certo. Try/except isola.

Reconcile

Resultado vira input do prompt final. Falhas viram contexto, não exception.

2

🎚️ Roteamento por tier do user

Free user não pode chamar Opus em loop — quebra unit economics. Enterprise não tolera Haiku numa decisão de M&A. O orquestrador decide modelo + tools + limites pelo tier, não pelo prompt.

🗂️ Dict TIERS

TIERS = {
    "free": {
        "model": "claude-haiku-4-5",
        "max_subagents": 2,
        "tools": ["search_web"],
        "rate_per_min": 10,
        "max_tokens_per_req": 4_000,
        "can_approve_human": False,
    },
    "pro": {
        "model": "claude-sonnet-4-7",
        "max_subagents": 5,
        "tools": ["search_web", "rag_query", "code_exec"],
        "rate_per_min": 60,
        "max_tokens_per_req": 32_000,
        "can_approve_human": True,
    },
    "enterprise": {
        "model": "claude-opus-4-7",
        "max_subagents": 12,
        "tools": ["search_web", "rag_query", "code_exec",
                  "db_write", "send_email", "create_ticket"],
        "rate_per_min": 600,
        "max_tokens_per_req": 200_000,
        "can_approve_human": True,
        "dedicated_circuit": True,
    },
}

def pick_model(tier: str) -> LLM:
    return LLM(TIERS[tier]["model"])

def allowed_tools(tier: str) -> list[str]:
    return TIERS[tier]["tools"]

Tabela comparativa

Capacidade Free Pro Enterprise
ModeloHaiku 4.5Sonnet 4.7Opus 4.7
Sub-agentes paralelos2512
Tools liberadas1 (search)36 + db_write
Req/min1060600
Max tokens/req4k32k200k
Human approval
Circuit dedicado
Decisão no orquestrador

Nunca pergunte tier dentro do sub-agente. Decide na entrada.

Upgrade dinâmico

User pode pagar Opus por request específico — vira override no Session.

3

🧩 Sub-agentes especialistas

Cada sub-agente é uma Tool pro chief, com contrato fechado: input tipado, output tipado, timeout, rate-limit próprio. O chief não conhece o miolo do sub — só a interface.

🔬 ResearcherSubagent como Tool

from pydantic import BaseModel

class ResearchInput(BaseModel):
    query: str
    max_sources: int = 5
    recency_days: int | None = None

class ResearchOutput(BaseModel):
    summary: str
    sources: list[dict]
    confidence: float

class ResearcherSubagent(Tool):
    name = "researcher"
    description = "Busca web + RAG e devolve summary com citações."
    input_schema = ResearchInput
    output_schema = ResearchOutput
    timeout_s = 30

    def __init__(self, llm: LLM, web: WebClient, rag: RAGClient):
        self.llm, self.web, self.rag = llm, web, rag

    async def run(self, inp: ResearchInput, user: User) -> ResearchOutput:
        web_hits = await self.web.search(inp.query, k=inp.max_sources)
        rag_hits = await self.rag.query(inp.query, user_id=user.id, k=3)
        sources = web_hits + rag_hits
        summary = await self.llm.complete(
            build_summary_prompt(inp.query, sources),
            max_tokens=2000,
        )
        return ResearchOutput(
            summary=summary,
            sources=[s.to_dict() for s in sources],
            confidence=score_confidence(sources),
        )

💡 Tip — isolamento de falha

Sub-agente que falha não derruba o chief. O try/except no delegate() transforma exception em TaskFailed e segue. O chief reconcilia com o que sobrou e diz na resposta: "researcher falhou, segui com planner." Usuário vê algo útil — não 500.

Researcher

Web + RAG. Devolve summary + sources + confidence.

Planner

LLM puro. Recebe contexto, devolve sequência de passos.

Executor

Roda tools concretas (db, email, ticket). Único que escreve no mundo.

Fazer vs evitar

✓ Fazer

  • Sub-agente recebe as tools que precisa pra função dele.
  • Researcher só tem read (web, rag). Não escreve nada.
  • Executor é o único com write — e cada write loga user_id + request_id.
  • Schema Pydantic em input/output. Erros viram ValidationError determinístico.

✗ Evitar

  • Sub-agente com toolkit completo "por garantia". Vira mini-chief, perde foco.
  • Researcher que também pode db_write — caminho rápido pra deletar produção por alucinação.
  • Sub-agente que chama outros sub-agentes sem passar pelo chief. Vira grafo intratável.
  • Output free-form. Sem schema o reconcile vira string parsing — frágil.
4

🚦 Rate limiting + circuit breaker

Sem isso, um user com loop infinito (intencional ou não) drena seu orçamento mensal em horas. TokenBucket por user e circuit breaker por sub-agente são obrigatórios — não otimização.

🚨 Alerta — loop infinito sem rate limit

Caso real: agente em loop chamando Opus a cada 200ms. Em 6h de madrugada queimou USD 14.000 antes de alguém notar. Sem TokenBucket = sua margem some.

Coloque hard cap por user/dia antes de subir o primeiro chief multi-agente.

🪣 TokenBucket por user (defaultdict)

import time
from collections import defaultdict
from dataclasses import dataclass

@dataclass
class Bucket:
    capacity: float
    tokens: float
    refill_rate: float  # tokens/segundo
    last_refill: float

class TokenBucket:
    def __init__(self):
        # cada user_id ganha bucket sob demanda
        self.buckets: dict[str, Bucket] = defaultdict(self._new)

    def _new(self) -> Bucket:
        return Bucket(capacity=60, tokens=60, refill_rate=1.0,
                      last_refill=time.monotonic())

    def configure(self, user_id: str, tier: str):
        rpm = TIERS[tier]["rate_per_min"]
        b = self.buckets[user_id]
        b.capacity = rpm
        b.tokens = min(b.tokens, rpm)
        b.refill_rate = rpm / 60.0

    def allow(self, user_id: str, cost: float = 1.0) -> bool:
        b = self.buckets[user_id]
        now = time.monotonic()
        b.tokens = min(b.capacity, b.tokens + (now - b.last_refill) * b.refill_rate)
        b.last_refill = now
        if b.tokens >= cost:
            b.tokens -= cost
            return True
        return False

# Circuit breaker por sub-agente
class CircuitBreaker:
    def __init__(self, threshold=5, cooldown_s=30):
        self.fails = defaultdict(int)
        self.opened_at = {}
        self.threshold, self.cooldown = threshold, cooldown_s

    def call_allowed(self, sub: str) -> bool:
        if sub in self.opened_at:
            if time.monotonic() - self.opened_at[sub] > self.cooldown:
                del self.opened_at[sub]
                self.fails[sub] = 0
            else:
                return False
        return True

    def record(self, sub: str, ok: bool):
        if ok:
            self.fails[sub] = 0
        else:
            self.fails[sub] += 1
            if self.fails[sub] >= self.threshold:
                self.opened_at[sub] = time.monotonic()
Bucket por user

Vizinho barulhento não derruba os outros.

Breaker por sub

5 falhas → abre 30s. Para de bater num serviço caído.

Hard cap diário

Independente do bucket: USD/dia por user. Kill switch contábil.

5

📨 A2A protocol — envelope padrão

Quando dois agentes falam (chief → sub, ou sistema A → sistema B), envelope é obrigatório: correlation_id pra rastrear, bearer token pra autenticar, trace_id pra observabilidade.

📦 Payload A2A — exemplo JSON

{
  "envelope": {
    "version": "a2a/1.0",
    "correlation_id": "req_01HZ8X7K3M2NRQ9V4PYJBD6S5C",
    "trace_id": "trc_4f9c2b1a8e7d6c5b",
    "from": "agent://chief.tenant42",
    "to":   "agent://researcher.tenant42",
    "issued_at": "2026-05-18T20:44:11Z",
    "expires_at": "2026-05-18T20:44:41Z",
    "auth": {
      "scheme": "Bearer",
      "token": "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9..."
    }
  },
  "user_context": {
    "user_id": "u_8821",
    "tenant_id": "tenant42",
    "tier": "pro",
    "locale": "pt-BR"
  },
  "task": {
    "kind": "research",
    "input": {
      "query": "concorrentes diretos do produto X no Brasil 2026",
      "max_sources": 8,
      "recency_days": 90
    },
    "requires_approval": false,
    "timeout_s": 30
  },
  "limits": {
    "max_tokens": 8000,
    "max_cost_usd": 0.50
  }
}

Por que cada campo importa

  • correlation_id — segue o pedido do user até as N chamadas de sub. Suporte usa isso pra reproduzir.
  • trace_id — span no OpenTelemetry. Latência por hop fica visível.
  • expires_at — sub-agente rejeita envelope velho. Evita replay.
  • auth.token — JWT assinado pelo orquestrador. Sub valida assinatura — não confia só no IP.
  • user_context — tier viaja no envelope. Sub não consulta DB.
  • limits — sub aborta se ultrapassar. Cap em duas camadas.
Sempre versione

a2a/1.0 no envelope. Breaking changes viram 1.1.

Nunca confie no body

Sub valida JWT + schema. Body sem assinatura = anônimo.

6

🙋 Human-in-the-loop

Ações irreversíveis (enviar email a cliente, deletar dado, criar fatura) nunca rodam sem ack humano. O orquestrador marca a task como requires_approval, congela em estado pending e espera — com timeout de 24h.

Fluxo pending_approval

from datetime import datetime, timedelta, timezone

APPROVAL_TIMEOUT = timedelta(hours=24)

async def execute_with_approval(task, user, executor):
    if not task.requires_approval:
        return await executor.run(task, user)

    pending = await db.create_pending(
        task=task,
        user_id=user.id,
        state="pending_approval",
        expires_at=datetime.now(timezone.utc) + APPROVAL_TIMEOUT,
    )
    await notify_approver(user, pending)  # email/slack/webhook

    decision = await wait_for_decision(
        pending_id=pending.id,
        timeout=APPROVAL_TIMEOUT,
    )

    if decision is None:                  # timeout
        await db.update(pending.id, state="expired")
        return TaskExpired(task=task)
    if decision.action == "deny":
        await db.update(pending.id, state="denied",
                        reviewer=decision.user_id, reason=decision.reason)
        return TaskDenied(task=task, reason=decision.reason)

    # approved
    await db.update(pending.id, state="approved",
                    reviewer=decision.user_id)
    result = await executor.run(task, user)
    await db.update(pending.id, state="executed",
                    result_id=result.id)
    return result

💡 Tip — quem aprova ≠ quem pede

Sempre. User que pediu não aprova a própria ação irreversível — vira self-service de cagada. O approver_role da task define quem pode dar ack: manager, compliance, admin. Aprovador vê o payload completo + diff do que vai acontecer.

Estados possíveis

pending

Aguardando humano.

approved

Ack dado. Executor roda.

denied

Humano vetou. Audit guarda razão.

expired

24h sem resposta. Falha segura.

executed

Rodou. Liga result_id.

Idempotente

Aprovação duplicada não roda 2x. Use pending.id como chave.

Reabrir histórico

Pending fica forever na DB. Suporte vê o que foi aprovado, por quem, quando.

📝 Resumo do módulo

Chief-of-staff bate solo em 90,2% — decompose / delegate / reconcile.
TIERS dirige modelo + tools + limites — free/pro/enterprise no orquestrador, nunca no sub.
Sub-agentes têm tools mínimas — researcher só lê, executor só escreve, schema Pydantic.
TokenBucket + CircuitBreaker são obrigatórios — sem isso, um loop queima USD 14k de madrugada.
A2A envelope com correlation_id + JWT — rastreabilidade ponta-a-ponta e auth por hop.
Human-in-the-loop com timeout 24h — pending/approved/denied/expired/executed, aprovador ≠ requisitante.

Próximo módulo:

3.5 — Canais paralelos: como rodar múltiplos sub-agentes ao mesmo tempo sem corromper estado.