🪡 Identity stitching — costurando o mesmo user em 3 canais
Telegram dá tg_user_id.
Slack dá U07ABC123.
Web dá um cookie de sessão. Pra o agente, é a mesma pessoa — e ela precisa ver o mesmo histórico nos três.
Solução: tabela channel_links + fluxo de pareamento por código de 6 dígitos.
Diagrama: 1 user, 3 canais
┌─────────────────┐
│ user_id=42 │ ← canonical (Postgres)
│ Maria Silva │
└────────┬────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│Telegram │ │ Slack │ │ Web │
│tg:99821 │ │U07ABC123│ │sess:xyz │
└─────────┘ └─────────┘ └─────────┘
│ │ │
└──────┬───────┴──────┬───────┘
▼ ▼
channel_links → mesmo histórico, mesma memória
Schema
CREATE TABLE channel_links (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
channel TEXT NOT NULL CHECK (channel IN ('telegram','slack','web')),
external_id TEXT NOT NULL, -- tg_user_id / slack U... / cookie
linked_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_seen_at TIMESTAMPTZ,
UNIQUE (channel, external_id) -- 1 conta externa = 1 user
);
CREATE INDEX idx_channel_links_user ON channel_links(user_id);
-- Tabela de pareamento temporário (TTL 10min)
CREATE TABLE channel_pairing_codes (
code CHAR(6) PRIMARY KEY, -- "493017"
user_id BIGINT NOT NULL REFERENCES users(id),
expires_at TIMESTAMPTZ NOT NULL
);
Fluxo de pareamento
User logado no Web pede "conectar Telegram" → backend gera code=493017, salva em channel_pairing_codes com TTL 10min.
Bot mostra: "Mande /link 493017 no Telegram".
User envia. Bot resolve code → user_id, insere em channel_links, deleta o code.
A partir daí, qualquer mensagem no Telegram resolve pro mesmo user_id=42.
💡 Por que 6 dígitos e não link mágico: link em Telegram precisa de domínio https registrado em BotFather. Código numérico funciona em qualquer messenger sem setup extra.
💬 SlackChannel com Bolt SDK Socket Mode
Socket Mode dispensa endpoint público — o bot abre WebSocket pra Slack e recebe eventos em push. Ideal pra dev local e pra clientes self-hosted que não querem expor porta.
from slack_bolt.async_app import AsyncApp
from slack_bolt.adapter.socket_mode.async_handler import AsyncSocketModeHandler
from .agent import run_agent
from .channels.base import Channel
from .identity import resolve_user
class SlackChannel(Channel):
name = "slack"
def __init__(self, bot_token: str, app_token: str):
self.app = AsyncApp(token=bot_token)
self.handler = AsyncSocketModeHandler(self.app, app_token)
self._register()
def _register(self):
@self.app.event("message")
async def on_message(event, say):
if event.get("bot_id"): # ignora ecos do próprio bot
return
user = await resolve_user("slack", event["user"])
if not user:
await say(text="Conta não vinculada. Use /link <código> pra parear.",
thread_ts=event.get("ts"))
return
blocks = await run_agent(user_id=user.id, text=event["text"])
await say(
blocks=render_for(blocks, "slack"),
thread_ts=event.get("thread_ts") or event["ts"], # preserva contexto
channel=event["channel"],
)
async def start(self):
await self.handler.start_async()
async def push(self, user_id: int, blocks: list, channel_id: str):
await self.app.client.chat_postMessage(
channel=channel_id,
blocks=render_for(blocks, "slack"),
)
💡 Tip — thread_ts preserva contexto: sempre responda em thread (thread_ts=event["ts"]). Senão multi-turn vira poluição visual no canal e o agente perde o fio quando 2 users falam ao mesmo tempo.
🌐 WebChannel — FastAPI + SSE streaming
SSE (Server-Sent Events) é mais simples que WebSocket pra LLM streaming: HTTP/1.1, unidirecional do server pro cliente, reconnect automático no browser. Pra chat agêntico, é o default.
from fastapi import FastAPI, Depends, HTTPException
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import json
from .agent import stream_agent
from .identity import resolve_user_by_session
app = FastAPI()
class ChatIn(BaseModel):
text: str
conversation_id: str | None = None
@app.post("/chat")
async def chat(body: ChatIn, session: str = Depends(get_session_cookie)):
user = await resolve_user_by_session(session)
if not user:
raise HTTPException(401, "not linked")
async def event_stream():
async for chunk in stream_agent(
user_id=user.id,
text=body.text,
conversation_id=body.conversation_id,
):
# chunk = {"type": "token"|"tool_call"|"done", "data": ...}
yield {"event": chunk["type"], "data": json.dumps(chunk["data"])}
return EventSourceResponse(event_stream(), ping=15)
Cliente browser (vanilla)
const es = new EventSource("/chat/stream?cid=abc");
es.addEventListener("token", e => appendToken(JSON.parse(e.data)));
es.addEventListener("done", e => es.close());
SSE pra streaming de resposta. WS só se precisar uplink contínuo (voz, cursor live).
ping=15 evita timeout de proxy corporativo (Cloudflare, nginx).
📣 Push proativo — quando o agente fala primeiro
Reativo é fácil: user pergunta, agente responde. Proativo é o que diferencia um chatbot de um copiloto. "Sua fatura vence amanhã", "Detectei drop de 30% no funil", "PR aprovado, posso fazer deploy?". Mas push errado = unsubscribe. Quiet hours não são opcional.
from datetime import datetime
from zoneinfo import ZoneInfo
QUIET_START, QUIET_END = 22, 8 # 22h às 8h local
async def push(user_id: int, blocks: list, *, urgency: str = "normal"):
user = await load_user(user_id)
tz = ZoneInfo(user.timezone or "America/Sao_Paulo")
hour = datetime.now(tz).hour
in_quiet = hour >= QUIET_START or hour < QUIET_END
if in_quiet and urgency != "critical":
# enfileira pra primeira hora da manhã
await queue_push(user_id, blocks, deliver_at=next_morning(tz))
return
# ordem de preferência configurada pelo user
for ch_name in user.channel_preference: # ex: ["slack","telegram","web"]
link = await get_link(user_id, ch_name)
if not link:
continue
try:
await CHANNELS[ch_name].push(user_id, blocks, link.external_id)
await audit(user_id, ch_name, "push_sent", urgency=urgency)
return
except Exception as e:
await audit(user_id, ch_name, "push_failed", error=str(e))
continue # tenta próximo canal
await audit(user_id, "none", "push_dropped", reason="all_channels_failed")
✓ Fazer — push que respeita
- ✓ Timezone do user, não do server.
- ✓ Quiet hours por default 22h–8h.
- ✓ Urgência critical só pra coisa real (alerta de prod, fraude).
- ✓ Permitir
/snooze 4he/mute. - ✓ Limite: máx 3 push/dia/user em urgência normal.
✗ Evitar — push que vira ruído
- ✗ "Bom dia!" às 3h da manhã porque o cron tá em UTC.
- ✗ Marcar tudo como critical "por garantia".
- ✗ Notificar drop de funil de 2% (ruído estatístico).
- ✗ Push de marketing disfarçado de alerta.
- ✗ Sem opção de desativar — vira denúncia de spam.
🎨 Renderer por canal — blocks → MarkdownV2 / Slack / HTML
O agente devolve uma estrutura abstrata (blocks) — não string formatada. Cada canal tem regras próprias de markup. Misturar = pesadelo. Uma função de render por canal, agente nunca toca formatação.
Formato por canal
| Bloco | Telegram | Slack | Web (HTML) |
|---|---|---|---|
| bold | *texto* | *texto* | <strong> |
| code | `x` | `x` | <code> |
| link | [t](url) MDv2 | <url|t> | <a href> |
| button | InlineKeyboard | actions block | <button> |
| image | sendPhoto | image block | <img> |
def render_for(blocks: list[dict], channel_type: str):
"""blocks = [{"type":"text","content":"..."}, {"type":"button",...}]"""
if channel_type == "telegram":
# MarkdownV2 — escape pesado obrigatório
out = []
for b in blocks:
if b["type"] == "text":
out.append(_escape_mdv2(b["content"]))
elif b["type"] == "code":
out.append(f"```\n{b['content']}\n```")
elif b["type"] == "link":
out.append(f"[{_escape_mdv2(b['label'])}]({b['url']})")
return {"text": "\n".join(out), "parse_mode": "MarkdownV2"}
elif channel_type == "slack":
# Block Kit — JSON estruturado
slack_blocks = []
for b in blocks:
if b["type"] == "text":
slack_blocks.append({"type": "section",
"text": {"type": "mrkdwn", "text": b["content"]}})
elif b["type"] == "button":
slack_blocks.append({"type": "actions", "elements": [
{"type": "button",
"text": {"type": "plain_text", "text": b["label"]},
"action_id": b["action_id"],
"value": b.get("value", "")}
]})
elif b["type"] == "code":
slack_blocks.append({"type": "section",
"text": {"type": "mrkdwn", "text": f"```{b['content']}```"}})
return slack_blocks
elif channel_type == "web":
# HTML sanitizado (bleach)
import bleach
html = []
for b in blocks:
if b["type"] == "text":
html.append(f"<p>{bleach.clean(b['content'])}</p>")
elif b["type"] == "code":
html.append(f"<pre><code>{bleach.clean(b['content'])}</code></pre>")
elif b["type"] == "button":
html.append(f'<button data-action="{b["action_id"]}">{bleach.clean(b["label"])}</button>')
return "\n".join(html)
raise ValueError(f"unknown channel: {channel_type}")
📊 Dado real — 3 canais paralelos triplica retention
- D30 single-channel (web): ~18% retention.
- D30 com 2 canais (web + telegram): ~42%.
- D30 com 3 canais + push proativo bem calibrado: ~58%.
- Por quê: user escolhe onde já vive. Telegram pra mobile, Slack pra trabalho, Web pra contexto longo. Não força context-switch.
🛟 Failover entre canais — retry exponencial e audit
Slack fora do ar acontece. Telegram bloqueia IP em rate-limit. SSE quebra atrás de proxy corporativo. Se você tem 3 canais, use os 3 — failover automático é a feature que ninguém vê mas todo mundo agradece.
import asyncio, random
from .audit import audit
async def deliver_with_failover(user_id: int, blocks: list, urgency="normal"):
user = await load_user(user_id)
order = user.channel_preference or ["slack", "telegram", "web"]
for ch_name in order:
link = await get_link(user_id, ch_name)
if not link:
continue
# retry exponencial: 0.5s, 1s, 2s (com jitter)
for attempt in range(3):
try:
await CHANNELS[ch_name].push(user_id, blocks, link.external_id)
await audit(user_id, channel=ch_name, event="delivered",
attempt=attempt, urgency=urgency)
return ch_name # canal usado fica no audit
except RateLimitError:
wait = (2 ** attempt) * 0.5 + random.uniform(0, 0.3)
await asyncio.sleep(wait)
except (ConnectionError, TimeoutError) as e:
await audit(user_id, channel=ch_name, event="transient_fail",
attempt=attempt, error=str(e))
await asyncio.sleep((2 ** attempt) * 0.5)
except ChannelDownError:
await audit(user_id, channel=ch_name, event="channel_down")
break # próximo canal, sem mais retry neste
await audit(user_id, channel="none", event="all_failed", urgency=urgency)
if urgency == "critical":
await page_oncall(user_id, blocks) # último recurso: humano
return None
🚨 Alerta — canal único é single point of failure
Se seu agente só vive no Slack e o Slack cai por 2 horas (acontece ~1x/trimestre), seu produto desaparece nesse intervalo. Pior: notificações críticas (fraude, prod down) ficam mudas. Auditoria depois é caos.
Mínimo viável: 2 canais com failover automático + um e-mail como fallback de último recurso pra eventos critical.
Toda mensagem grava qual canal entregou. Suporte rastreia rápido.
random.uniform(0,0.3) evita thundering herd quando 1k users retry juntos.
3 falhas seguidas no canal = marca down por 60s, pula direto pro próximo.
📝 Resumo do módulo
channel_links + código de 6 dígitos costura 3 canais num só user.Próximo módulo:
3.6 — Operação: monitoria, custos por user, kill switch, runbook quando o agente pira em produção.