Verificando acesso...

MÓDULO 3.6 · ÚLTIMO DA TRILHA

📟 Operação: o turno de plantão do seu OS

Dashboard, audit log, custo por user, alertas e kill switch. O que separa um agente que "tá rodando" de um sistema que você dorme tranquilo operando.

6
Tópicos
45
Minutos
Avançado
Nível
Prática
Tipo
1

📊 Dashboard MVP: 4 cards e você dorme melhor

Antes de Datadog, Grafana, OTel: 1 página HTML com 4 números. DAU, custo do dia, erros 24h, latência p95. Se um fica vermelho, alguém olha. Pronto.

🖥️ Mockup do dashboard (ASCII honesto)

┌─────────────────────┬─────────────────────┐
│  DAU                │  CUSTO HOJE         │
│  1.247 ▲ 8%         │  R$ 84,30 ▼ 3%      │
│  vs ontem           │  budget R$ 120      │
├─────────────────────┼─────────────────────┤
│  ERROS 24h          │  LATÊNCIA p95       │
│  12 ⚠ (3 críticos)  │  2.1s ▲ 0.4s        │
│  threshold: 20      │  SLO: < 3.0s        │
└─────────────────────┴─────────────────────┘

Endpoint FastAPI que alimenta os 4 cards

# app/routes/dashboard.py
from fastapi import APIRouter, Depends
from datetime import datetime, timedelta
import asyncpg

router = APIRouter(prefix="/dashboard")

@router.get("/stats")
async def dashboard_stats(db: asyncpg.Connection = Depends(get_db)):
    today = datetime.utcnow().date()
    yesterday = today - timedelta(days=1)

    row = await db.fetchrow("""
        SELECT
          (SELECT COUNT(DISTINCT user_id)
             FROM requests
            WHERE day = $1)                                    AS dau_today,
          (SELECT COUNT(DISTINCT user_id)
             FROM requests
            WHERE day = $2)                                    AS dau_yesterday,
          (SELECT COALESCE(SUM(cost_usd), 0)
             FROM usage_daily
            WHERE day = $1)                                    AS cost_today,
          (SELECT COUNT(*) FROM errors
            WHERE ts > NOW() - INTERVAL '24 hours')            AS errors_24h,
          (SELECT COUNT(*) FROM errors
            WHERE ts > NOW() - INTERVAL '24 hours'
              AND severity = 'critical')                       AS crit_24h,
          (SELECT percentile_cont(0.95)
                  WITHIN GROUP (ORDER BY latency_ms)
             FROM requests
            WHERE day = $1)                                    AS p95_ms
    """, today, yesterday)

    return {
        "dau":     {"today": row["dau_today"], "delta_pct": _delta(row)},
        "cost":    {"today_brl": round(row["cost_today"] * 5.0, 2)},
        "errors":  {"total": row["errors_24h"], "critical": row["crit_24h"]},
        "latency": {"p95_s": round((row["p95_ms"] or 0) / 1000, 2)},
    }
DAU

Usuários únicos no dia. Vaza churn antes do CSAT.

Custo dia

USD → BRL convertido. Comparado a budget/30.

Erros 24h

Total + críticos. Crit > 0 = paginado.

p95

Percentil 95. Mediana mente; p95 não.

2

📝 Audit log estruturado: JSON lines, retenção 90d, PII redigida

Log de texto livre é log que ninguém grep. 1 evento = 1 linha JSON. Campos obrigatórios: ts, user_id, request_id, action, cost_usd, latency_ms. PII passa por redact antes de tocar disco.

✗ Evitar: log de texto livre

2026-05-18 14:23 user nei@email.com asked
about pricing, replied with R$ 99/mo plan,
used 1247 tokens, took 2.3s, OK.

Impossível agregar. Email vaza. grep é jogo de adivinhação.

✓ Fazer: JSON line

{"ts":"2026-05-18T14:23:11Z","user_id":"u_8a3f",
"request_id":"r_91c2","action":"chat.reply",
"tokens_in":412,"tokens_out":835,"cost_usd":0.0042,
"latency_ms":2310,"model":"claude-opus-4-7",
"status":"ok"}

jq, agregação SQL, retenção automática. Sem PII no log.

Audit writer com redact + retention

# app/audit.py
import json, re, time
from pathlib import Path

EMAIL_RE = re.compile(r'[\w.+-]+@[\w-]+\.[\w.-]+')
CPF_RE   = re.compile(r'\d{3}\.\d{3}\.\d{3}-\d{2}')
LOG_DIR  = Path("/var/log/agent/audit")
RETENTION_DAYS = 90

def _redact(s: str) -> str:
    s = EMAIL_RE.sub("[email]", s)
    s = CPF_RE.sub("[cpf]", s)
    return s

def audit(event: dict) -> None:
    # nunca confiar — re-aplica redact em strings
    for k, v in list(event.items()):
        if isinstance(v, str):
            event[k] = _redact(v)
    event["ts"] = event.get("ts") or time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
    day_file = LOG_DIR / f"{event['ts'][:10]}.jsonl"
    with day_file.open("a") as f:
        f.write(json.dumps(event, ensure_ascii=False) + "\n")

def purge_expired() -> int:
    """Cron diário. Remove arquivos > 90 dias."""
    cutoff = time.time() - RETENTION_DAYS * 86400
    removed = 0
    for f in LOG_DIR.glob("*.jsonl"):
        if f.stat().st_mtime < cutoff:
            f.unlink()
            removed += 1
    return removed
Formato

JSON Lines. 1 linha = 1 evento atômico.

Retenção

90 dias default. LGPD pede limite e propósito.

Redact

Email, CPF, telefone, token. Antes do disco.

3

💰 Custo por user: tabela usage_daily + PRICING versionada

Se você não sabe quanto cada user custou ontem, não tem modelo de negócio — tem voluntariado. Duas peças: tabela usage_daily com PK composta (user_id, day, model), e dict PRICING versionado por data.

-- migrations/004_usage_daily.sql
CREATE TABLE usage_daily (
    user_id      TEXT          NOT NULL,
    day          DATE          NOT NULL,
    model        TEXT          NOT NULL,
    tokens_in    BIGINT        NOT NULL DEFAULT 0,
    tokens_out   BIGINT        NOT NULL DEFAULT 0,
    requests     INT           NOT NULL DEFAULT 0,
    cost_usd     NUMERIC(10,6) NOT NULL DEFAULT 0,
    pricing_ver  TEXT          NOT NULL,
    updated_at   TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    PRIMARY KEY (user_id, day, model)
);

CREATE INDEX idx_usage_day        ON usage_daily (day);
CREATE INDEX idx_usage_user_month ON usage_daily (user_id, date_trunc('month', day));

💵 Tabela PRICING (versionada, USD por 1M tokens)

Modelo Input / 1M Output / 1M Vigente desde
claude-opus-4-7$15.00$75.002026-03-01
claude-sonnet-4-7$3.00$15.002026-03-01
claude-haiku-4-7$0.80$4.002026-04-15
# app/pricing.py
from datetime import date

PRICING = {
    "2026-03-01": {
        "claude-opus-4-7":   {"in": 15.00, "out": 75.00},
        "claude-sonnet-4-7": {"in":  3.00, "out": 15.00},
    },
    "2026-04-15": {
        "claude-opus-4-7":   {"in": 15.00, "out": 75.00},
        "claude-sonnet-4-7": {"in":  3.00, "out": 15.00},
        "claude-haiku-4-7":  {"in":  0.80, "out":  4.00},
    },
}

def pricing_for(day: date) -> tuple[str, dict]:
    """Retorna (version, table) vigente em 'day'. Nunca interpola."""
    versions = sorted(PRICING.keys())
    active = max(v for v in versions if date.fromisoformat(v) <= day)
    return active, PRICING[active]

def cost_usd(model: str, tin: int, tout: int, day: date) -> tuple[float, str]:
    ver, table = pricing_for(day)
    p = table[model]
    return (tin * p["in"] + tout * p["out"]) / 1_000_000, ver

📊 Por que pricing_ver na linha

Quando Anthropic baixar preço em julho e você reprocessar histórico de junho, o número antigo continua reproduzível. Auditoria fiscal não aceita "rodei de novo, deu diferente".

4

🚨 Alertas: cron 5min, snooze, dedup

Alerta sem dedup é alerta que vira mute. 3 regras: checa a cada 5 min, não re-dispara mesmo alerta em < 1h, snooze por chave quando você sabe que algo está sendo investigado.

💡 Cron de 5 minutos é o sweet spot

< 1 min: ruído + custo de polling. > 15 min: você descobre o incêndio pelo Slack do cliente. 5 min cobre 99% dos casos sem queimar CPU nem confiança.

# app/alerts.py
from datetime import datetime, timedelta

DEDUP_WINDOW = timedelta(hours=1)

async def check_alerts(db) -> list[dict]:
    fired = []
    now = datetime.utcnow()

    rules = [
        ("errors_critical_24h",
         "SELECT COUNT(*) FROM errors WHERE severity='critical' "
         "AND ts > NOW() - INTERVAL '24 hours'",
         lambda n: n > 0,
         "{n} erros críticos nas últimas 24h"),

        ("cost_over_budget",
         "SELECT COALESCE(SUM(cost_usd),0) FROM usage_daily WHERE day=CURRENT_DATE",
         lambda n: n > 25.0,
         "Custo do dia: ${n:.2f} (budget diário: $25)"),

        ("p95_latency",
         "SELECT percentile_cont(0.95) WITHIN GROUP (ORDER BY latency_ms) "
         "FROM requests WHERE ts > NOW() - INTERVAL '15 minutes'",
         lambda n: (n or 0) > 3000,
         "p95 latência {n:.0f}ms (SLO 3000ms)"),
    ]

    for key, sql, predicate, msg in rules:
        # snooze ativo? pula.
        snoozed = await db.fetchval(
            "SELECT until FROM alert_snooze WHERE key=$1 AND until > NOW()", key)
        if snoozed:
            continue

        n = await db.fetchval(sql)
        if not predicate(n):
            continue

        # dedup: não disparou nesta janela?
        last = await db.fetchval(
            "SELECT MAX(fired_at) FROM alert_log WHERE key=$1", key)
        if last and (now - last) < DEDUP_WINDOW:
            continue

        await db.execute(
            "INSERT INTO alert_log (key, fired_at, payload) VALUES ($1, $2, $3)",
            key, now, msg.format(n=n))
        fired.append({"key": key, "message": msg.format(n=n)})

    return fired

async def snooze(db, key: str, hours: int = 4):
    until = datetime.utcnow() + timedelta(hours=hours)
    await db.execute("""
        INSERT INTO alert_snooze (key, until) VALUES ($1, $2)
        ON CONFLICT (key) DO UPDATE SET until = EXCLUDED.until
    """, key, until)
Cron 5min

Equilíbrio entre ruído e atraso.

Dedup 1h

Mesma chave não re-dispara na janela.

Snooze 4h

Investigando? Cala o alerta com prazo.

5

🛑 Kill switch: system_flags + is_killed() em todo request

Dois modos: killall (desliga tudo) e kill_user_X (bloqueia 1 cliente abusando). Tabela única system_flags, 1 leitura por request com cache de 30s.

⚠️ Sem kill switch = noites sem dormir

Agente entrou em loop, queimou R$ 4.000 em tokens em 20 min, vazou contexto do cliente A pro B, começou a responder em japonês — todos cenários reais. Sem kill switch você precisa abrir SSH, achar o processo, matar, explicar ao cliente. Com kill switch é 1 UPDATE SQL.

-- migrations/005_system_flags.sql
CREATE TABLE system_flags (
    key      TEXT PRIMARY KEY,
    value    JSONB       NOT NULL,
    reason   TEXT,
    set_by   TEXT        NOT NULL,
    set_at   TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- killall:      key='killall',     value='true'
-- kill_user_X:  key='kill:u_8a3f', value='true'
-- read_only:    key='read_only',   value='true'
# app/killswitch.py
import time

_CACHE_TTL = 30  # segundos
_cache = {"ts": 0, "flags": {}}

async def _refresh(db):
    rows = await db.fetch("SELECT key, value FROM system_flags")
    _cache["flags"] = {r["key"]: r["value"] for r in rows}
    _cache["ts"] = time.time()

async def is_killed(db, user_id: str) -> tuple[bool, str | None]:
    if time.time() - _cache["ts"] > _CACHE_TTL:
        await _refresh(db)
    flags = _cache["flags"]
    if flags.get("killall") is True:
        return True, "system halted (killall)"
    if flags.get(f"kill:{user_id}") is True:
        return True, "user temporarily blocked"
    return False, None

# uso em CADA request handler
@router.post("/chat")
async def chat(req: ChatRequest, db = Depends(get_db)):
    killed, reason = await is_killed(db, req.user_id)
    if killed:
        audit({"action": "killed", "user_id": req.user_id, "reason": reason})
        raise HTTPException(503, detail=reason)
    return await process(req, db)
killall

Pânico geral. 1 UPDATE, 30s para propagar via cache.

kill:user_X

Cirúrgico. Cliente em loop não derruba o resto.

6

📈 Métricas que importam: 4 números, não 40

Tem time que mede 40 métricas e olha 0. Mede estes 4 e o resto é luxo.

📊 Os 4 que pagam o aluguel

  • DAU — Daily Active Users. Único número que diz se o produto importa. Cresceu = bom. Caiu 2 dias seguidos = investigar.
  • Custo / user / dia — Divide custo total pelo DAU. Se passar do ticket médio, modelo de negócio quebra. Acompanhe junto com churn.
  • 👍 / 👎 ratio — Botão de feedback após cada resposta. Meta: > 90% 👍. Quando cai abaixo de 80%, há regressão de qualidade.
  • Time-to-first-token — Da chegada do request ao 1º token streamed. UX percebida vive aqui. Meta < 800ms, vermelho > 2s.

✅ Olhar todo dia

  • DAU e delta vs ontem
  • Custo/user/dia
  • 👍/👎 das últimas 24h
  • Time-to-first-token p95

📅 Olhar toda semana

  • Top 10 users por custo
  • Erros agrupados por action
  • Mix de modelos (opus/sonnet/haiku %)
  • Snoozes ativos sem follow-up
DAU

Produto importa?

$/user/dia

Modelo fecha?

👍/👎

Qualidade hoje.

TTFT

UX percebida.

🏁 Trilha 3 concluída

Você fechou Multi-user. Agora sabe rodar um agente que aguenta 2, 200, 2.000 clientes — sem virar pesadelo de plantão.

📝 Resumo do módulo

Dashboard MVP — 4 cards (DAU, custo, erros, p95) batem 80% das perguntas operacionais.
Audit log — JSON lines, 90d retenção, PII redigida antes do disco.
Custo por user — usage_daily com PK composta + PRICING versionado por data.
Alertas — cron 5min, dedup 1h, snooze 4h. Sweet spot sem mute fatigue.
Kill switch — system_flags + is_killed() em cada request. killall + kill_user_X.
4 métricas que pagam — DAU, custo/user, 👍/👎, time-to-first-token.

Próxima trilha:

Trilha 4 — iAmasters OS: escala organizacional, governança multi-time, plataforma compartilhada.