🆔 User model mínimo
Antes de qualquer coisa, defina o que é um user no seu sistema — com schema, tipos e constraints. Sem isso, "user" vira string solta em prompt e você passa a debugar identidade no log. 5 campos cobrem 95% dos casos.
🗄️ Schema canônico (SQLite/Postgres)
CREATE TABLE users ( id TEXT PRIMARY KEY, -- UUID v4 (str) name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, roles TEXT NOT NULL DEFAULT 'user',-- CSV: 'user,admin' created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, deleted_at TIMESTAMP -- soft delete (LGPD) ); CREATE INDEX idx_users_email ON users(email) WHERE deleted_at IS NULL;
✓ Fazer: UUID v4 como ID
- ✓ Não enumerável — atacante não adivinha o próximo.
- ✓ Geração client-side ou server-side, mesma forma.
- ✓ Funciona em sistema distribuído sem coordenação.
- ✓ Aceito direto como nome de pasta no FS.
✗ Evitar: autoincrement
- ✗ ID enumeration attack: /user/1, /user/2, /user/3 vaza base inteira.
- ✗ Migração entre bancos vira pesadelo (colisão de IDs).
- ✗ Não dá pra criar user offline (precisa do servidor pra alocar ID).
- ✗ Revela quantos clientes você tem ("sou o user 47").
🚨 Alerta: ID enumeration attack
Em 2024 um app de IA brasileiro vazou 12 mil prompts privados
porque a rota GET /sessions/{id} usava autoincrement.
Bot rodou de 1 a 12000 em 4 minutos. Com UUID v4, o espaço de busca é ~10³⁸ — força-bruta inviável.
UUID v4. Imutável. Chave de tudo.
Unique + soft-delete aware index.
CSV simples. RBAC vem depois.
Soft delete pra LGPD art. 18.
📁 Workspace por user
Cada user precisa do seu próprio diretório raiz —
para tokens, cache, anexos, logs. Convenção simples: ~/.app/users/<uuid>/.
Sem isso, você acaba com tudo num /tmp que vaza entre clientes na próxima reinicialização.
🌳 Estrutura recomendada
~/.app/
├── db.sqlite # users, sessions, audit
└── users/
└── 7c3a-...-9f2b/ # UUID do user
├── auth.json # tokens OAuth encriptados
├── memory/ # vector store privado
│ └── facts.db
├── uploads/ # anexos do user
└── logs/
└── 2026-05.jsonl # audit por mês
🐍 Resolver workspace (com guard-rail)
from pathlib import Path
import re
UUID_RE = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$")
def user_workspace(user_id: str) -> Path:
"""Retorna ~/.app/users/<uuid>/, criando se necessário.
Levanta ValueError se user_id não for UUID v4 — bloqueia path traversal."""
if not UUID_RE.match(user_id):
raise ValueError(f"user_id inválido: {user_id!r}")
ws = Path.home() / ".app" / "users" / user_id
ws.mkdir(parents=True, exist_ok=True, mode=0o700) # só o dono lê
return ws
🚨 Alerta: path traversal
Sem validar UUID, user_id = "../../etc/passwd"
vira ~/.app/users/../../etc/passwd. O regex acima rejeita qualquer coisa que não seja
UUID v4 canônico — é a primeira linha de defesa.
Backup, delete e migração ficam triviais.
Só o processo do app lê. Não confie no FS.
Bloqueia traversal antes do mkdir.
du -sh no cron evita user "esponja".
🔐 Auth-profiles (lição openclaw)
Um user pode ter N identidades externas: Google pessoal, Google corporativo, Notion do cliente A, Notion do cliente B. Cada um vira um auth-profile. Lição que aprendemos no openclaw: nunca achatar tudo num token só — vira impossível trocar de cliente sem reautenticar.
📄 auth.json — estrutura aninhada por provider/profile
{
"version": 1,
"user_id": "7c3a-...-9f2b",
"providers": {
"google": {
"profiles": {
"pessoal": {
"email": "nei@gmail.com",
"access_token_enc": "gAAAAABm...",
"refresh_token_enc": "gAAAAABm...",
"scopes": ["calendar.readonly", "gmail.send"],
"expires_at": "2026-05-18T22:00:00Z"
},
"corp": {
"email": "nei@empresa.com.br",
"access_token_enc": "gAAAAABm...",
"refresh_token_enc": "gAAAAABm...",
"scopes": ["drive.file"],
"expires_at": "2026-05-18T22:00:00Z"
}
},
"default_profile": "pessoal"
},
"notion": { "profiles": { "...": "..." } }
}
}
🔒 Encriptar tokens com Fernet
from cryptography.fernet import Fernet
import json, os
KEY = os.environ["AUTH_FERNET_KEY"].encode() # rotação por env
fernet = Fernet(KEY)
def save_token(user_id: str, provider: str, profile: str, token: dict) -> None:
path = user_workspace(user_id) / "auth.json"
data = json.loads(path.read_text()) if path.exists() else {"version": 1, "providers": {}}
enc = {
"email": token["email"],
"access_token_enc": fernet.encrypt(token["access_token"].encode()).decode(),
"refresh_token_enc": fernet.encrypt(token["refresh_token"].encode()).decode(),
"scopes": token["scopes"],
"expires_at": token["expires_at"],
}
data["providers"].setdefault(provider, {"profiles": {}, "default_profile": profile})
data["providers"][provider]["profiles"][profile] = enc
path.write_text(json.dumps(data, indent=2))
os.chmod(path, 0o600) # só o dono lê
📚 Dado: por que perfis múltiplos importam
- ~63% dos usuários B2B do openclaw têm 2+ contas Google ativas (pessoal + corp).
- ~28% precisam alternar entre clientes (consultores, freelas) — auth-profile vira diferenciador.
- 1 token só = forçar logout/login a cada troca = churn imediato.
Bom pra começar. Em produção sénior, migrar pra AWS KMS ou Vault.
Defesa em camadas: encriptado + permissão de FS.
UX: quando o user não escolhe, qual usar.
Suporte a múltiplas keys ativas (MultiFernet).
🧵 Sessão por request (contextvars)
Cada requisição precisa de um objeto Session imutável que carrega
user, request_id, profile escolhido e timestamps. contextvars da stdlib
propaga isso por toda a chain async sem passar parâmetro manualmente em cada função.
🐍 Session dataclass + contextvars
from dataclasses import dataclass
from contextvars import ContextVar
from datetime import datetime
from uuid import uuid4
@dataclass(frozen=True, slots=True)
class Session:
user_id: str
request_id: str
provider: str | None = None
profile: str | None = None
started_at: datetime = None
_current: ContextVar[Session | None] = ContextVar("session", default=None)
def current_session() -> Session:
s = _current.get()
if s is None:
raise RuntimeError("Nenhuma sessão ativa — chame within_session() antes.")
return s
async def within_session(user_id: str, handler, *args, **kwargs):
sess = Session(
user_id=user_id,
request_id=str(uuid4()),
started_at=datetime.utcnow(),
)
token = _current.set(sess)
try:
return await handler(*args, **kwargs)
finally:
_current.reset(token)
💡 Tip: frozen=True evita race em async
@dataclass(frozen=True) bloqueia mutação após a criação.
Em código async, qualquer tarefa que tente sess.user_id = "outro" levanta
FrozenInstanceError. Combinado com contextvars (que isola por task),
você ganha imunidade a vazamento de estado entre clientes — sem mutex, sem lock.
🔌 Uso no handler de mensagem
async def handle_message(msg):
await within_session(msg.user_id, _process, msg)
async def _process(msg):
sess = current_session()
workspace = user_workspace(sess.user_id)
log.info("request", extra={"request_id": sess.request_id, "user_id": sess.user_id})
return await llm.run(msg.text, ctx=sess)
Imutável. Sem race em async.
Isola por asyncio.Task automaticamente.
Cola logs, métricas e traces da mesma req.
Sem vazar sessão pra próxima task no pool.
🚀 Onboarding via /start
Primeiro contato é onde 40% dos users desistem. Um wizard inline guiado por state machine resolve: cada mensagem do user avança 1 estado, o agente sempre sabe a próxima pergunta. Sem prompt gigante "me conte tudo sobre você".
🤖 State machine do /start
from enum import Enum
class OnboardingState(str, Enum):
NEW = "new"
ASKED_NAME = "asked_name"
ASKED_EMAIL = "asked_email"
ASKED_AUTH = "asked_auth"
DONE = "done"
PROMPTS = {
OnboardingState.NEW: "👋 Oi! Pra começar, como você se chama?",
OnboardingState.ASKED_NAME: "Prazer, {name}. Qual seu email?",
OnboardingState.ASKED_EMAIL: "Quer conectar o Google agora? (sim/depois)",
OnboardingState.ASKED_AUTH: "✅ Tudo pronto. Tente: 'liste meus eventos de hoje'.",
}
async def on_start(user_id: str, text: str) -> str:
state = await db.get_onboarding_state(user_id) or OnboardingState.NEW
if state == OnboardingState.NEW:
await db.set_state(user_id, OnboardingState.ASKED_NAME)
return PROMPTS[OnboardingState.NEW]
if state == OnboardingState.ASKED_NAME:
await db.update_user(user_id, name=text.strip())
await db.set_state(user_id, OnboardingState.ASKED_EMAIL)
return PROMPTS[OnboardingState.ASKED_NAME].format(name=text.strip())
if state == OnboardingState.ASKED_EMAIL:
if "@" not in text:
return "Hmm, isso não parece um email. Tente de novo."
await db.update_user(user_id, email=text.strip().lower())
await db.set_state(user_id, OnboardingState.ASKED_AUTH)
return PROMPTS[OnboardingState.ASKED_EMAIL]
if state == OnboardingState.ASKED_AUTH:
await db.set_state(user_id, OnboardingState.DONE)
if text.lower().startswith("sim"):
return f"Abre este link: {oauth_url(user_id)}"
return PROMPTS[OnboardingState.ASKED_AUTH]
return PROMPTS[OnboardingState.ASKED_AUTH]
✓ Por que state machine
- ✓ Cada turno é idempotente — se cair, retoma do mesmo estado.
- ✓ Validação local (email tem @?) antes de gastar token de LLM.
- ✓ Onboarding fica auditável: você vê em qual passo o user desistiu.
- ✓ A/B testar 1 passo de cada vez é trivial.
✗ Evite: prompt "conte tudo"
- ✗ LLM inventa campos que o user não disse.
- ✗ Sem checkpoint — se o user fechar, perde tudo.
- ✗ Difícil cobrar campo obrigatório que veio vazio.
- ✗ Latência alta no primeiro turno = abandono.
Não em memória. Sobrevive a restart.
Mesma msg duas vezes = mesmo resultado.
Conte users em cada state diariamente.
Comando reseta o wizard se quiser refazer.
🗑️ Logout & delete account (LGPD)
Logout é fácil (apaga session). Delete account é caro — mas obrigatório. LGPD art. 18 dá ao titular o direito de eliminação dos dados pessoais. Padrão correto: soft delete imediato + hard delete em 15 dias (janela de undo + auditoria).
📚 Dado: LGPD art. 18, §3º
O controlador tem 15 dias para responder a um pedido de eliminação. A ANPD vem multando empresas que ignoram o prazo — multa pode chegar a 2% do faturamento (até R$ 50 milhões por infração).
Fonte: Lei 13.709/2018, art. 18, II + IV; Resolução CD/ANPD nº 4/2023 sobre sanções.
🐍 Delete em duas fases
import shutil
from datetime import datetime, timedelta
SOFT_DELETE_TTL = timedelta(days=15) # janela de undo + auditoria
async def delete_account(user_id: str, reason: str = "user_request") -> None:
"""Fase 1: soft delete imediato. User não consegue mais logar."""
await db.execute(
"UPDATE users SET deleted_at = ?, email = ? WHERE id = ?",
(datetime.utcnow(), f"deleted+{user_id}@example.invalid", user_id),
)
await db.execute(
"INSERT INTO audit_log (user_id, event, reason) VALUES (?, ?, ?)",
(user_id, "soft_delete", reason),
)
# invalida sessões ativas
await sessions.revoke_all(user_id)
async def purge_expired() -> int:
"""Fase 2: roda em cron diário. Hard delete passados 15 dias."""
cutoff = datetime.utcnow() - SOFT_DELETE_TTL
rows = await db.fetch_all(
"SELECT id FROM users WHERE deleted_at IS NOT NULL AND deleted_at < ?",
(cutoff,),
)
purged = 0
for (user_id,) in rows:
# 1. apaga workspace inteiro (auth.json, memória, uploads)
ws = Path.home() / ".app" / "users" / user_id
if ws.exists():
shutil.rmtree(ws)
# 2. apaga registro do user
await db.execute("DELETE FROM users WHERE id = ?", (user_id,))
# 3. mantém só linha de audit (anonimizada)
await db.execute(
"INSERT INTO audit_log (user_id, event) VALUES (?, ?)",
(user_id, "hard_delete"),
)
purged += 1
return purged
✓ Fazer: soft → hard
- ✓ Soft delete imediato: bloqueia acesso já.
- ✓ Email anonimizado evita UNIQUE conflict se user voltar.
- ✓ 15 dias de janela = compliance + undo + auditoria.
- ✓ Audit log mantém apenas o ID (sem PII).
✗ Evite: DELETE direto
- ✗ User clica errado e perde tudo na hora.
- ✗ Sem audit trail = você não prova que cumpriu LGPD.
- ✗ FK em cascata pode apagar dado de outro user por engano.
- ✗ Sem janela = suporte não consegue reverter fraude.
💡 Tip: cron de purge é seu amigo
Rode purge_expired() 1x por dia (cron 03:00).
Logue quantos foram purgados. Se um dia o número explodir, você tem um alerta antes do
incidente virar manchete. Bonus: a métrica de "users em soft delete" é um early warning de churn.
deleted_at = now(). Sem mais acesso.
rmtree do workspace + DELETE do row.
Só o UUID. Sem nome, email, conteúdo.
Métrica observável. Falha = alerta.
📝 Resumo do módulo
Próximo módulo:
3.3 — Memória compartilhada vs privada (como decidir o que cada user vê dos outros)