📊 Dashboard MVP: 4 cards e você dorme melhor
Antes de Datadog, Grafana, OTel: 1 página HTML com 4 números. DAU, custo do dia, erros 24h, latência p95. Se um fica vermelho, alguém olha. Pronto.
🖥️ Mockup do dashboard (ASCII honesto)
┌─────────────────────┬─────────────────────┐ │ DAU │ CUSTO HOJE │ │ 1.247 ▲ 8% │ R$ 84,30 ▼ 3% │ │ vs ontem │ budget R$ 120 │ ├─────────────────────┼─────────────────────┤ │ ERROS 24h │ LATÊNCIA p95 │ │ 12 ⚠ (3 críticos) │ 2.1s ▲ 0.4s │ │ threshold: 20 │ SLO: < 3.0s │ └─────────────────────┴─────────────────────┘
Endpoint FastAPI que alimenta os 4 cards
# app/routes/dashboard.py
from fastapi import APIRouter, Depends
from datetime import datetime, timedelta
import asyncpg
router = APIRouter(prefix="/dashboard")
@router.get("/stats")
async def dashboard_stats(db: asyncpg.Connection = Depends(get_db)):
today = datetime.utcnow().date()
yesterday = today - timedelta(days=1)
row = await db.fetchrow("""
SELECT
(SELECT COUNT(DISTINCT user_id)
FROM requests
WHERE day = $1) AS dau_today,
(SELECT COUNT(DISTINCT user_id)
FROM requests
WHERE day = $2) AS dau_yesterday,
(SELECT COALESCE(SUM(cost_usd), 0)
FROM usage_daily
WHERE day = $1) AS cost_today,
(SELECT COUNT(*) FROM errors
WHERE ts > NOW() - INTERVAL '24 hours') AS errors_24h,
(SELECT COUNT(*) FROM errors
WHERE ts > NOW() - INTERVAL '24 hours'
AND severity = 'critical') AS crit_24h,
(SELECT percentile_cont(0.95)
WITHIN GROUP (ORDER BY latency_ms)
FROM requests
WHERE day = $1) AS p95_ms
""", today, yesterday)
return {
"dau": {"today": row["dau_today"], "delta_pct": _delta(row)},
"cost": {"today_brl": round(row["cost_today"] * 5.0, 2)},
"errors": {"total": row["errors_24h"], "critical": row["crit_24h"]},
"latency": {"p95_s": round((row["p95_ms"] or 0) / 1000, 2)},
}
Usuários únicos no dia. Vaza churn antes do CSAT.
USD → BRL convertido. Comparado a budget/30.
Total + críticos. Crit > 0 = paginado.
Percentil 95. Mediana mente; p95 não.
📝 Audit log estruturado: JSON lines, retenção 90d, PII redigida
Log de texto livre é log que ninguém grep. 1 evento = 1 linha JSON.
Campos obrigatórios: ts,
user_id,
request_id,
action,
cost_usd,
latency_ms. PII passa por redact antes de tocar disco.
✗ Evitar: log de texto livre
2026-05-18 14:23 user nei@email.com asked about pricing, replied with R$ 99/mo plan, used 1247 tokens, took 2.3s, OK.
Impossível agregar. Email vaza. grep é jogo de adivinhação.
✓ Fazer: JSON line
{"ts":"2026-05-18T14:23:11Z","user_id":"u_8a3f",
"request_id":"r_91c2","action":"chat.reply",
"tokens_in":412,"tokens_out":835,"cost_usd":0.0042,
"latency_ms":2310,"model":"claude-opus-4-7",
"status":"ok"}
jq, agregação SQL, retenção automática. Sem PII no log.
Audit writer com redact + retention
# app/audit.py
import json, re, time
from pathlib import Path
EMAIL_RE = re.compile(r'[\w.+-]+@[\w-]+\.[\w.-]+')
CPF_RE = re.compile(r'\d{3}\.\d{3}\.\d{3}-\d{2}')
LOG_DIR = Path("/var/log/agent/audit")
RETENTION_DAYS = 90
def _redact(s: str) -> str:
s = EMAIL_RE.sub("[email]", s)
s = CPF_RE.sub("[cpf]", s)
return s
def audit(event: dict) -> None:
# nunca confiar — re-aplica redact em strings
for k, v in list(event.items()):
if isinstance(v, str):
event[k] = _redact(v)
event["ts"] = event.get("ts") or time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
day_file = LOG_DIR / f"{event['ts'][:10]}.jsonl"
with day_file.open("a") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
def purge_expired() -> int:
"""Cron diário. Remove arquivos > 90 dias."""
cutoff = time.time() - RETENTION_DAYS * 86400
removed = 0
for f in LOG_DIR.glob("*.jsonl"):
if f.stat().st_mtime < cutoff:
f.unlink()
removed += 1
return removed
JSON Lines. 1 linha = 1 evento atômico.
90 dias default. LGPD pede limite e propósito.
Email, CPF, telefone, token. Antes do disco.
💰 Custo por user: tabela usage_daily + PRICING versionada
Se você não sabe quanto cada user custou ontem, não tem modelo de negócio — tem voluntariado.
Duas peças: tabela usage_daily com PK composta
(user_id, day, model), e dict
PRICING versionado por data.
-- migrations/004_usage_daily.sql
CREATE TABLE usage_daily (
user_id TEXT NOT NULL,
day DATE NOT NULL,
model TEXT NOT NULL,
tokens_in BIGINT NOT NULL DEFAULT 0,
tokens_out BIGINT NOT NULL DEFAULT 0,
requests INT NOT NULL DEFAULT 0,
cost_usd NUMERIC(10,6) NOT NULL DEFAULT 0,
pricing_ver TEXT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (user_id, day, model)
);
CREATE INDEX idx_usage_day ON usage_daily (day);
CREATE INDEX idx_usage_user_month ON usage_daily (user_id, date_trunc('month', day));
💵 Tabela PRICING (versionada, USD por 1M tokens)
| Modelo | Input / 1M | Output / 1M | Vigente desde |
|---|---|---|---|
claude-opus-4-7 | $15.00 | $75.00 | 2026-03-01 |
claude-sonnet-4-7 | $3.00 | $15.00 | 2026-03-01 |
claude-haiku-4-7 | $0.80 | $4.00 | 2026-04-15 |
# app/pricing.py
from datetime import date
PRICING = {
"2026-03-01": {
"claude-opus-4-7": {"in": 15.00, "out": 75.00},
"claude-sonnet-4-7": {"in": 3.00, "out": 15.00},
},
"2026-04-15": {
"claude-opus-4-7": {"in": 15.00, "out": 75.00},
"claude-sonnet-4-7": {"in": 3.00, "out": 15.00},
"claude-haiku-4-7": {"in": 0.80, "out": 4.00},
},
}
def pricing_for(day: date) -> tuple[str, dict]:
"""Retorna (version, table) vigente em 'day'. Nunca interpola."""
versions = sorted(PRICING.keys())
active = max(v for v in versions if date.fromisoformat(v) <= day)
return active, PRICING[active]
def cost_usd(model: str, tin: int, tout: int, day: date) -> tuple[float, str]:
ver, table = pricing_for(day)
p = table[model]
return (tin * p["in"] + tout * p["out"]) / 1_000_000, ver
📊 Por que pricing_ver na linha
Quando Anthropic baixar preço em julho e você reprocessar histórico de junho, o número antigo continua reproduzível. Auditoria fiscal não aceita "rodei de novo, deu diferente".
🚨 Alertas: cron 5min, snooze, dedup
Alerta sem dedup é alerta que vira mute. 3 regras: checa a cada 5 min, não re-dispara mesmo alerta em < 1h, snooze por chave quando você sabe que algo está sendo investigado.
💡 Cron de 5 minutos é o sweet spot
< 1 min: ruído + custo de polling. > 15 min: você descobre o incêndio pelo Slack do cliente. 5 min cobre 99% dos casos sem queimar CPU nem confiança.
# app/alerts.py
from datetime import datetime, timedelta
DEDUP_WINDOW = timedelta(hours=1)
async def check_alerts(db) -> list[dict]:
fired = []
now = datetime.utcnow()
rules = [
("errors_critical_24h",
"SELECT COUNT(*) FROM errors WHERE severity='critical' "
"AND ts > NOW() - INTERVAL '24 hours'",
lambda n: n > 0,
"{n} erros críticos nas últimas 24h"),
("cost_over_budget",
"SELECT COALESCE(SUM(cost_usd),0) FROM usage_daily WHERE day=CURRENT_DATE",
lambda n: n > 25.0,
"Custo do dia: ${n:.2f} (budget diário: $25)"),
("p95_latency",
"SELECT percentile_cont(0.95) WITHIN GROUP (ORDER BY latency_ms) "
"FROM requests WHERE ts > NOW() - INTERVAL '15 minutes'",
lambda n: (n or 0) > 3000,
"p95 latência {n:.0f}ms (SLO 3000ms)"),
]
for key, sql, predicate, msg in rules:
# snooze ativo? pula.
snoozed = await db.fetchval(
"SELECT until FROM alert_snooze WHERE key=$1 AND until > NOW()", key)
if snoozed:
continue
n = await db.fetchval(sql)
if not predicate(n):
continue
# dedup: não disparou nesta janela?
last = await db.fetchval(
"SELECT MAX(fired_at) FROM alert_log WHERE key=$1", key)
if last and (now - last) < DEDUP_WINDOW:
continue
await db.execute(
"INSERT INTO alert_log (key, fired_at, payload) VALUES ($1, $2, $3)",
key, now, msg.format(n=n))
fired.append({"key": key, "message": msg.format(n=n)})
return fired
async def snooze(db, key: str, hours: int = 4):
until = datetime.utcnow() + timedelta(hours=hours)
await db.execute("""
INSERT INTO alert_snooze (key, until) VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET until = EXCLUDED.until
""", key, until)
Equilíbrio entre ruído e atraso.
Mesma chave não re-dispara na janela.
Investigando? Cala o alerta com prazo.
🛑 Kill switch: system_flags + is_killed() em todo request
Dois modos: killall (desliga tudo) e
kill_user_X (bloqueia 1 cliente abusando).
Tabela única system_flags, 1 leitura por request com cache de 30s.
⚠️ Sem kill switch = noites sem dormir
Agente entrou em loop, queimou R$ 4.000 em tokens em 20 min, vazou contexto do cliente A pro B, começou a responder em japonês — todos cenários reais. Sem kill switch você precisa abrir SSH, achar o processo, matar, explicar ao cliente. Com kill switch é 1 UPDATE SQL.
-- migrations/005_system_flags.sql
CREATE TABLE system_flags (
key TEXT PRIMARY KEY,
value JSONB NOT NULL,
reason TEXT,
set_by TEXT NOT NULL,
set_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- killall: key='killall', value='true'
-- kill_user_X: key='kill:u_8a3f', value='true'
-- read_only: key='read_only', value='true'
# app/killswitch.py
import time
_CACHE_TTL = 30 # segundos
_cache = {"ts": 0, "flags": {}}
async def _refresh(db):
rows = await db.fetch("SELECT key, value FROM system_flags")
_cache["flags"] = {r["key"]: r["value"] for r in rows}
_cache["ts"] = time.time()
async def is_killed(db, user_id: str) -> tuple[bool, str | None]:
if time.time() - _cache["ts"] > _CACHE_TTL:
await _refresh(db)
flags = _cache["flags"]
if flags.get("killall") is True:
return True, "system halted (killall)"
if flags.get(f"kill:{user_id}") is True:
return True, "user temporarily blocked"
return False, None
# uso em CADA request handler
@router.post("/chat")
async def chat(req: ChatRequest, db = Depends(get_db)):
killed, reason = await is_killed(db, req.user_id)
if killed:
audit({"action": "killed", "user_id": req.user_id, "reason": reason})
raise HTTPException(503, detail=reason)
return await process(req, db)
Pânico geral. 1 UPDATE, 30s para propagar via cache.
Cirúrgico. Cliente em loop não derruba o resto.
📈 Métricas que importam: 4 números, não 40
Tem time que mede 40 métricas e olha 0. Mede estes 4 e o resto é luxo.
📊 Os 4 que pagam o aluguel
- DAU — Daily Active Users. Único número que diz se o produto importa. Cresceu = bom. Caiu 2 dias seguidos = investigar.
- Custo / user / dia — Divide custo total pelo DAU. Se passar do ticket médio, modelo de negócio quebra. Acompanhe junto com churn.
- 👍 / 👎 ratio — Botão de feedback após cada resposta. Meta: > 90% 👍. Quando cai abaixo de 80%, há regressão de qualidade.
- Time-to-first-token — Da chegada do request ao 1º token streamed. UX percebida vive aqui. Meta < 800ms, vermelho > 2s.
✅ Olhar todo dia
- DAU e delta vs ontem
- Custo/user/dia
- 👍/👎 das últimas 24h
- Time-to-first-token p95
📅 Olhar toda semana
- Top 10 users por custo
- Erros agrupados por
action - Mix de modelos (opus/sonnet/haiku %)
- Snoozes ativos sem follow-up
Produto importa?
Modelo fecha?
Qualidade hoje.
UX percebida.
🏁 Trilha 3 concluída
Você fechou Multi-user. Agora sabe rodar um agente que aguenta 2, 200, 2.000 clientes — sem virar pesadelo de plantão.
📝 Resumo do módulo
Próxima trilha:
Trilha 4 — iAmasters OS: escala organizacional, governança multi-time, plataforma compartilhada.