🎯 Por que abstrair (Dependency Inversion na prática)
Toda dependência externa — LLM, canal de mensagem, banco, ferramenta — entra atrás de uma classe-base abstrata. O resto do código trabalha contra essa interface, nunca contra a implementação. Resultado: você troca o modelo (Claude → GPT → Gemini) em 1 arquivo, não na aplicação inteira.
📌 A regra de ouro
Se você não consegue trocar o LLM em menos de 1 dia, suas abstrações estão erradas. Não é exigência teórica — é teste prático de saúde arquitetural.
✓ Com abstração certa
- ✓Testes usam
FakeProvider— sem chamar API. - ✓A/B de modelo é flag no config, não refactor.
- ✓Onboarding de dev novo: lê 4 arquivos
base.pye entende. - ✓Fallback automático (Claude caiu → GPT) é 5 linhas.
✗ Sem abstração
- ✗Teste depende de chave Anthropic — CI custa dinheiro.
- ✗Trocar modelo = grep + sed em 40 arquivos.
- ✗Quando Anthropic muda response shape, app inteira quebra.
- ✗Vendor lock-in disfarçado de "simplicidade".
💡 Teste prático
Pegue seu agente atual. Tente trocar de Claude pra GPT-4o por 1 hora. Se não der, você sabe onde investir esta semana.
D — Dependency Inversion.
abc.ABC + abstractmethod.
dataclass frozen=True.
~80 linhas a mais. Paga em 1 sprint.
🤖 Provider — providers/base.py
Interface que normaliza chamadas a qualquer LLM. Independente de quem está por trás — OpenAI, Anthropic, Ollama, OpenRouter — o resto do código vê a mesma assinatura.
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
@dataclass(frozen=True)
class ToolCall:
name: str
arguments: dict
call_id: str
@dataclass(frozen=True)
class LLMResponse:
content: str
tool_calls: list[ToolCall] = field(default_factory=list)
finish_reason: str = "stop" # "stop" | "tool_calls" | "length" | "filter"
tokens_in: int = 0
tokens_out: int = 0
model: str = ""
class Provider(ABC):
@abstractmethod
async def chat(
self,
messages: list[dict],
tools: list[dict] | None = None,
model: str | None = None,
temperature: float = 0.7,
max_tokens: int = 4096,
) -> LLMResponse: ...
📊 Implementações típicas
- OpenRouterProvider: 1 chave → 147 modelos. Default pra prototipagem.
- AnthropicProvider: SDK oficial. Use quando custo importa (Claude direct é mais barato que via router).
- OllamaProvider: 100% local, offline. Ótimo pra dev e dados sensíveis.
- FakeProvider: retorna scripted. Só pra teste.
⚠️ 3 erros comuns
- 1. Expor objetos do SDK (ex:
anthropic.types.Message) na response. Outras implementações não conseguem produzir. - 2. Esquecer
finish_reasonnormalizado. Cada provider chama diferente; mapeie pra enum próprio. - 3. Não contar tokens. Sem isso, não tem custo por user (capítulo 3.6 sofre).
Toda chamada é async. Sync = throughput morto.
60s pra chat normal, 5min para reasoning models.
3 tentativas com backoff exponencial em 5xx.
📡 Channel — channels/base.py
Abstração do canal de entrada/saída. Telegram, Slack, Discord, WhatsApp, Web — todos viram irmãos da mesma classe-base. Adicionar canal novo = 1 arquivo, sem tocar lógica de agente.
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Callable, Awaitable
@dataclass(frozen=True)
class IncomingMessage:
user_id: str # ID externo (telegram chat_id, slack user, etc)
text: str
timestamp: float
metadata: dict = field(default_factory=dict) # canal-específico
@dataclass(frozen=True)
class OutgoingMessage:
user_id: str
text: str
parse_mode: str = "markdown" # "markdown" | "html" | "plain"
reply_to: str | None = None # message_id pra resposta
class Channel(ABC):
@abstractmethod
async def start(self, on_message: Callable[[IncomingMessage], Awaitable[None]]) -> None:
"""Inicia listener. Chama on_message a cada mensagem recebida."""
...
@abstractmethod
async def send(self, msg: OutgoingMessage) -> str:
"""Envia mensagem. Retorna message_id externo."""
...
@abstractmethod
async def stop(self) -> None: ...
💡 Por que dataclass frozen
Imutáveis evitam que callback modifique o objeto e cause race entre handlers async. Bug raro mas brutal de debugar.
✓ Idempotência
Toda mensagem tem ID externo único. Chamar send() 2 vezes com mesma reply_to não duplica visualmente — implementação deduplica.
📦 Metadata canal-específica
Telegram coloca chat_id, Slack põe thread_ts. Agente não precisa saber — só passa adiante.
Stitching pra user interno acontece depois.
Channel não bloqueia esperando.
Cancela polling, fecha conexões.
Channel adapta pro formato nativo.
💾 Memory — memory/base.py
Store com 4 operações: save, search, forget, recent. Implementações: SQLiteFTS5, Pinecone, Chroma, Markdown. Sem contrato, memória vira bagunça de "vamos colocar num dict global por enquanto".
@dataclass(frozen=True)
class MemoryEntry:
id: str
content: str
category: str # 'fact' | 'preference' | 'decision' | 'interaction'
user_id: str
created_at: float
metadata: dict = field(default_factory=dict)
class MemoryStore(ABC):
@abstractmethod
async def save(self, content: str, category: str, user_id: str, metadata: dict | None = None) -> str:
"""Retorna ID da entry."""
...
@abstractmethod
async def search(self, query: str, user_id: str, limit: int = 10) -> list[MemoryEntry]:
"""Busca textual ou semântica. SEMPRE filtra por user_id."""
...
@abstractmethod
async def forget(self, entry_id: str, user_id: str) -> bool:
"""Hard delete. Audit log obrigatório no chamador."""
...
@abstractmethod
async def recent(self, user_id: str, limit: int = 20) -> list[MemoryEntry]: ...
🚨 user_id obrigatório em TODAS as operações
Inclui forget() — sem isso, usuário A pode tentar deletar memória de B passando o ID. Defesa em profundidade.
📊 Categorias
- fact: "O nome do filho do Marco é Davi." Verificável.
- preference: "Sempre responda em pt-BR formal." Estilo.
- decision: "Escolhemos AWS sobre GCP em jan/2026." Auditável.
- interaction: Log de Q&A. Para fine-tuning futuro.
recent = ordem temporal. search = relevância.
LGPD obriga. Cliente confia mais.
source_msg_id, confidence, etc.
Não sequencial — evita enumeration.
🔧 Tool — tools/base.py
Classe com 3 atributos de metadata + 1 método execute. Metadata vira JSON Schema que o LLM lê pra decidir quando e como chamar.
@dataclass(frozen=True)
class ToolResult:
output: str | None # texto que volta pro LLM
error: str | None # mensagem se falhou
metadata: dict | None = None # latência, custo, etc
class Tool(ABC):
name: str # snake_case, verbo: search_web, send_email
description: str # 1-3 frases. Inclui exemplo + quando NÃO usar
parameters: dict # JSON Schema. required estrito.
requires_approval: bool = False # human-in-the-loop
@abstractmethod
async def execute(self, session: "Session", **kwargs) -> ToolResult: ...
def to_openai_schema(self) -> dict:
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters
}
}
💡 Por que session no execute
Tool precisa saber quem está chamando para enforce permissões, escopar workspace, logar audit. Sem session, vira variável global → bug multi-user (T3.1).
✓ Boa description
"""Busca arquivos no workspace do user por nome ou regex. Ex: pattern='*.md' retorna paths. Não use para buscar conteúdo (use grep_files)."""
✗ Description vaga
"""Lida com arquivos.""" → LLM nunca sabe quando usar → Ou chama errado o tempo todo
search_web ✓ — web ✗
Se passa, divida em 2 tools.
Optional só com default no schema.
Tool nunca raise — sempre ToolResult.
🧩 Wire-up no agent.py — 50 linhas
Aqui os 4 contratos se encontram. Loop principal recebe mensagem do channel, busca contexto na memory, chama provider com tools registradas, executa tool calls, repete até resposta final. 50 linhas que qualquer dev novo lê e entende.
class Agent:
def __init__(self, provider, channel, memory, tools, system_prompt):
self.provider = provider
self.channel = channel
self.memory = memory
self.tools = {t.name: t for t in tools}
self.system_prompt = system_prompt
async def start(self):
await self.channel.start(self.handle)
async def handle(self, msg: IncomingMessage):
user = await load_user_by_external(msg.user_id, channel="telegram")
session = Session(user=user)
log.info("turn_start", request_id=session.request_id, user_id=user.id)
history = await self.memory.recent(user.id, limit=10)
messages = [
{"role": "system", "content": self.system_prompt},
*[{"role": "user", "content": h.content} for h in history],
{"role": "user", "content": msg.text}
]
for turn in range(5): # max 5 tool turns
resp = await self.provider.chat(
messages,
tools=[t.to_openai_schema() for t in self.tools.values()]
)
if resp.finish_reason != "tool_calls":
break
for call in resp.tool_calls:
tool = self.tools[call.name]
result = await tool.execute(session=session, **call.arguments)
messages.append({"role": "tool", "tool_call_id": call.call_id,
"content": result.output or f"ERROR: {result.error}"})
await self.memory.save(f"Q: {msg.text}\nA: {resp.content}",
category="interaction", user_id=user.id)
await self.channel.send(OutgoingMessage(user_id=msg.user_id, text=resp.content))
log.info("turn_end", request_id=session.request_id,
tokens_in=resp.tokens_in, tokens_out=resp.tokens_out)
🧠 Decisões importantes nesta classe
- max 5 tool turns: impede loop infinito (custo + UX).
- history=10: contexto suficiente sem inflar input tokens.
- tool error como string: LLM aprende a se ajustar, não raise.
- log no início e fim: request_id rastreia tudo (T3.6 audit).
⚠️ O que falta nesta versão (e que aparece nas próximas trilhas)
- • Rate limit por user — T3.4
- • requires_approval check — T3.4
- • Streaming — UX boa pede SSE, fica fora deste módulo
- • Compactação automática quando history fica longo — exercício
~50. Cabe no PR review.
Apenas as 4 ABCs + Session/User.
Provider, Channel, Memory: hot-swap.
FakeProvider + FakeChannel = e2e em 100ms.
📝 Resumo do módulo
Próximo módulo:
2.3 — Canal Telegram em 50 linhas (primeira implementação concreta de Channel)