👔 Chief-of-staff pattern
Agente solo em produção falha em tarefas longas: tenta tudo no mesmo contexto, perde o fio, alucina. O chief-of-staff é um agente fino que não executa — decompõe o pedido, delega para especialistas, reconcilia o retorno. Pesquisa da Anthropic mostra +90,2% de qualidade vs. agente único no mesmo benchmark.
📊 O dado que justifica
90,2% — melhoria média do chief-of-staff multi-agente vs. agente solo (mesma família de modelos, mesmo prompt-base).
Vale para research, planejamento longo, multi-step com tools. Não vale para chat curto — aí o solo ganha em latência.
Fluxo em ASCII
user_request
│
▼
┌──────────────────┐
│ ChiefOfStaff │ ── decompose() ──▶ [task1, task2, task3]
│ (não executa) │
└──────────────────┘
│ delegate(task, subagent)
├──────────────┬──────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│Researcher│ │ Planner │ │ Executor │
│ (web,RAG)│ │ (LLM) │ │ (tools) │
└──────────┘ └──────────┘ └──────────┘
│ │ │
└──────────────┴──────────────┘
▼
reconcile(results)
│
▼
response
🧠 Esqueleto do ChiefOfStaff
class ChiefOfStaff:
def __init__(self, user: User, registry: SubagentRegistry):
self.user = user
self.registry = registry
self.llm = pick_model(user.tier) # tier define modelo
async def handle(self, request: str) -> Response:
tasks = await self.decompose(request)
results = []
for task in tasks:
sub = self.registry.pick(task.kind, tier=self.user.tier)
try:
r = await self.delegate(sub, task)
except SubagentError as e:
r = TaskFailed(task=task, error=str(e)) # isola
results.append(r)
return await self.reconcile(request, results)
async def decompose(self, request):
prompt = f"Quebre em sub-tarefas atômicas:\n{request}"
return await self.llm.complete(prompt, schema=TaskList)
async def delegate(self, sub, task):
return await sub.run(task, user=self.user)
async def reconcile(self, request, results):
prompt = build_reconcile_prompt(request, results)
return await self.llm.complete(prompt)
LLM quebra em TaskList via schema validado.
Cada task vai pro sub-agente certo. Try/except isola.
Resultado vira input do prompt final. Falhas viram contexto, não exception.
🎚️ Roteamento por tier do user
Free user não pode chamar Opus em loop — quebra unit economics. Enterprise não tolera Haiku numa decisão de M&A. O orquestrador decide modelo + tools + limites pelo tier, não pelo prompt.
🗂️ Dict TIERS
TIERS = {
"free": {
"model": "claude-haiku-4-5",
"max_subagents": 2,
"tools": ["search_web"],
"rate_per_min": 10,
"max_tokens_per_req": 4_000,
"can_approve_human": False,
},
"pro": {
"model": "claude-sonnet-4-7",
"max_subagents": 5,
"tools": ["search_web", "rag_query", "code_exec"],
"rate_per_min": 60,
"max_tokens_per_req": 32_000,
"can_approve_human": True,
},
"enterprise": {
"model": "claude-opus-4-7",
"max_subagents": 12,
"tools": ["search_web", "rag_query", "code_exec",
"db_write", "send_email", "create_ticket"],
"rate_per_min": 600,
"max_tokens_per_req": 200_000,
"can_approve_human": True,
"dedicated_circuit": True,
},
}
def pick_model(tier: str) -> LLM:
return LLM(TIERS[tier]["model"])
def allowed_tools(tier: str) -> list[str]:
return TIERS[tier]["tools"]
Tabela comparativa
| Capacidade | Free | Pro | Enterprise |
|---|---|---|---|
| Modelo | Haiku 4.5 | Sonnet 4.7 | Opus 4.7 |
| Sub-agentes paralelos | 2 | 5 | 12 |
| Tools liberadas | 1 (search) | 3 | 6 + db_write |
| Req/min | 10 | 60 | 600 |
| Max tokens/req | 4k | 32k | 200k |
| Human approval | — | ✓ | ✓ |
| Circuit dedicado | — | — | ✓ |
Nunca pergunte tier dentro do sub-agente. Decide na entrada.
User pode pagar Opus por request específico — vira override no Session.
🧩 Sub-agentes especialistas
Cada sub-agente é uma Tool pro chief, com
contrato fechado: input tipado, output tipado, timeout,
rate-limit próprio. O chief não conhece o miolo do sub — só a interface.
🔬 ResearcherSubagent como Tool
from pydantic import BaseModel
class ResearchInput(BaseModel):
query: str
max_sources: int = 5
recency_days: int | None = None
class ResearchOutput(BaseModel):
summary: str
sources: list[dict]
confidence: float
class ResearcherSubagent(Tool):
name = "researcher"
description = "Busca web + RAG e devolve summary com citações."
input_schema = ResearchInput
output_schema = ResearchOutput
timeout_s = 30
def __init__(self, llm: LLM, web: WebClient, rag: RAGClient):
self.llm, self.web, self.rag = llm, web, rag
async def run(self, inp: ResearchInput, user: User) -> ResearchOutput:
web_hits = await self.web.search(inp.query, k=inp.max_sources)
rag_hits = await self.rag.query(inp.query, user_id=user.id, k=3)
sources = web_hits + rag_hits
summary = await self.llm.complete(
build_summary_prompt(inp.query, sources),
max_tokens=2000,
)
return ResearchOutput(
summary=summary,
sources=[s.to_dict() for s in sources],
confidence=score_confidence(sources),
)
💡 Tip — isolamento de falha
Sub-agente que falha não derruba o chief. O try/except no delegate() transforma exception em TaskFailed e segue. O chief reconcilia com o que sobrou e diz na resposta: "researcher falhou, segui com planner." Usuário vê algo útil — não 500.
Web + RAG. Devolve summary + sources + confidence.
LLM puro. Recebe contexto, devolve sequência de passos.
Roda tools concretas (db, email, ticket). Único que escreve no mundo.
Fazer vs evitar
✓ Fazer
- ✓Sub-agente recebe só as tools que precisa pra função dele.
- ✓
Researchersó tem read (web, rag). Não escreve nada. - ✓
Executoré o único com write — e cada write logauser_id + request_id. - ✓Schema Pydantic em input/output. Erros viram
ValidationErrordeterminístico.
✗ Evitar
- ✗Sub-agente com toolkit completo "por garantia". Vira mini-chief, perde foco.
- ✗
Researcherque também podedb_write— caminho rápido pra deletar produção por alucinação. - ✗Sub-agente que chama outros sub-agentes sem passar pelo chief. Vira grafo intratável.
- ✗Output free-form. Sem schema o reconcile vira string parsing — frágil.
🚦 Rate limiting + circuit breaker
Sem isso, um user com loop infinito (intencional ou não) drena seu orçamento mensal em horas. TokenBucket por user e circuit breaker por sub-agente são obrigatórios — não otimização.
🚨 Alerta — loop infinito sem rate limit
Caso real: agente em loop chamando Opus a cada 200ms. Em 6h de madrugada queimou USD 14.000 antes de alguém notar. Sem TokenBucket = sua margem some.
Coloque hard cap por user/dia antes de subir o primeiro chief multi-agente.
🪣 TokenBucket por user (defaultdict)
import time
from collections import defaultdict
from dataclasses import dataclass
@dataclass
class Bucket:
capacity: float
tokens: float
refill_rate: float # tokens/segundo
last_refill: float
class TokenBucket:
def __init__(self):
# cada user_id ganha bucket sob demanda
self.buckets: dict[str, Bucket] = defaultdict(self._new)
def _new(self) -> Bucket:
return Bucket(capacity=60, tokens=60, refill_rate=1.0,
last_refill=time.monotonic())
def configure(self, user_id: str, tier: str):
rpm = TIERS[tier]["rate_per_min"]
b = self.buckets[user_id]
b.capacity = rpm
b.tokens = min(b.tokens, rpm)
b.refill_rate = rpm / 60.0
def allow(self, user_id: str, cost: float = 1.0) -> bool:
b = self.buckets[user_id]
now = time.monotonic()
b.tokens = min(b.capacity, b.tokens + (now - b.last_refill) * b.refill_rate)
b.last_refill = now
if b.tokens >= cost:
b.tokens -= cost
return True
return False
# Circuit breaker por sub-agente
class CircuitBreaker:
def __init__(self, threshold=5, cooldown_s=30):
self.fails = defaultdict(int)
self.opened_at = {}
self.threshold, self.cooldown = threshold, cooldown_s
def call_allowed(self, sub: str) -> bool:
if sub in self.opened_at:
if time.monotonic() - self.opened_at[sub] > self.cooldown:
del self.opened_at[sub]
self.fails[sub] = 0
else:
return False
return True
def record(self, sub: str, ok: bool):
if ok:
self.fails[sub] = 0
else:
self.fails[sub] += 1
if self.fails[sub] >= self.threshold:
self.opened_at[sub] = time.monotonic()
Vizinho barulhento não derruba os outros.
5 falhas → abre 30s. Para de bater num serviço caído.
Independente do bucket: USD/dia por user. Kill switch contábil.
📨 A2A protocol — envelope padrão
Quando dois agentes falam (chief → sub, ou sistema A → sistema B), envelope é obrigatório: correlation_id pra rastrear, bearer token pra autenticar, trace_id pra observabilidade.
📦 Payload A2A — exemplo JSON
{
"envelope": {
"version": "a2a/1.0",
"correlation_id": "req_01HZ8X7K3M2NRQ9V4PYJBD6S5C",
"trace_id": "trc_4f9c2b1a8e7d6c5b",
"from": "agent://chief.tenant42",
"to": "agent://researcher.tenant42",
"issued_at": "2026-05-18T20:44:11Z",
"expires_at": "2026-05-18T20:44:41Z",
"auth": {
"scheme": "Bearer",
"token": "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9..."
}
},
"user_context": {
"user_id": "u_8821",
"tenant_id": "tenant42",
"tier": "pro",
"locale": "pt-BR"
},
"task": {
"kind": "research",
"input": {
"query": "concorrentes diretos do produto X no Brasil 2026",
"max_sources": 8,
"recency_days": 90
},
"requires_approval": false,
"timeout_s": 30
},
"limits": {
"max_tokens": 8000,
"max_cost_usd": 0.50
}
}
Por que cada campo importa
- correlation_id — segue o pedido do user até as N chamadas de sub. Suporte usa isso pra reproduzir.
- trace_id — span no OpenTelemetry. Latência por hop fica visível.
- expires_at — sub-agente rejeita envelope velho. Evita replay.
- auth.token — JWT assinado pelo orquestrador. Sub valida assinatura — não confia só no IP.
- user_context — tier viaja no envelope. Sub não consulta DB.
- limits — sub aborta se ultrapassar. Cap em duas camadas.
a2a/1.0 no envelope. Breaking changes viram 1.1.
Sub valida JWT + schema. Body sem assinatura = anônimo.
🙋 Human-in-the-loop
Ações irreversíveis (enviar email a cliente, deletar dado, criar fatura) nunca rodam
sem ack humano. O orquestrador marca a task como requires_approval, congela
em estado pending e espera — com timeout de 24h.
⏳ Fluxo pending_approval
from datetime import datetime, timedelta, timezone
APPROVAL_TIMEOUT = timedelta(hours=24)
async def execute_with_approval(task, user, executor):
if not task.requires_approval:
return await executor.run(task, user)
pending = await db.create_pending(
task=task,
user_id=user.id,
state="pending_approval",
expires_at=datetime.now(timezone.utc) + APPROVAL_TIMEOUT,
)
await notify_approver(user, pending) # email/slack/webhook
decision = await wait_for_decision(
pending_id=pending.id,
timeout=APPROVAL_TIMEOUT,
)
if decision is None: # timeout
await db.update(pending.id, state="expired")
return TaskExpired(task=task)
if decision.action == "deny":
await db.update(pending.id, state="denied",
reviewer=decision.user_id, reason=decision.reason)
return TaskDenied(task=task, reason=decision.reason)
# approved
await db.update(pending.id, state="approved",
reviewer=decision.user_id)
result = await executor.run(task, user)
await db.update(pending.id, state="executed",
result_id=result.id)
return result
💡 Tip — quem aprova ≠ quem pede
Sempre. User que pediu não aprova a própria ação irreversível — vira self-service de cagada. O approver_role da task define quem pode dar ack: manager, compliance, admin. Aprovador vê o payload completo + diff do que vai acontecer.
Estados possíveis
Aguardando humano.
Ack dado. Executor roda.
Humano vetou. Audit guarda razão.
24h sem resposta. Falha segura.
Rodou. Liga result_id.
Aprovação duplicada não roda 2x. Use pending.id como chave.
Pending fica forever na DB. Suporte vê o que foi aprovado, por quem, quando.
📝 Resumo do módulo
Próximo módulo:
3.5 — Canais paralelos: como rodar múltiplos sub-agentes ao mesmo tempo sem corromper estado.