Verificando acesso...

MÓDULO 3.5

📡 Canais paralelos

1 user, 3 canais (Telegram, Slack, Web). Mesmo agente, mesmo histórico, formatação certa em cada lugar. Identity stitching, push proativo, renderer por canal e failover quando um canal cai.

6
Tópicos
45
Minutos
Builder
Nível
Código
Tipo
1

🪡 Identity stitching — costurando o mesmo user em 3 canais

Telegram dá tg_user_id. Slack dá U07ABC123. Web dá um cookie de sessão. Pra o agente, é a mesma pessoa — e ela precisa ver o mesmo histórico nos três. Solução: tabela channel_links + fluxo de pareamento por código de 6 dígitos.

Diagrama: 1 user, 3 canais

              ┌─────────────────┐
              │   user_id=42    │  ← canonical (Postgres)
              │  Maria Silva    │
              └────────┬────────┘
                       │
        ┌──────────────┼──────────────┐
        │              │              │
   ┌────▼────┐    ┌────▼────┐   ┌────▼────┐
   │Telegram │    │  Slack  │   │   Web   │
   │tg:99821 │    │U07ABC123│   │sess:xyz │
   └─────────┘    └─────────┘   └─────────┘
        │              │              │
        └──────┬───────┴──────┬───────┘
               ▼              ▼
        channel_links → mesmo histórico, mesma memória

Schema

CREATE TABLE channel_links (
    id           BIGSERIAL PRIMARY KEY,
    user_id      BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    channel      TEXT NOT NULL CHECK (channel IN ('telegram','slack','web')),
    external_id  TEXT NOT NULL,           -- tg_user_id / slack U... / cookie
    linked_at    TIMESTAMPTZ NOT NULL DEFAULT now(),
    last_seen_at TIMESTAMPTZ,
    UNIQUE (channel, external_id)         -- 1 conta externa = 1 user
);

CREATE INDEX idx_channel_links_user ON channel_links(user_id);

-- Tabela de pareamento temporário (TTL 10min)
CREATE TABLE channel_pairing_codes (
    code         CHAR(6) PRIMARY KEY,     -- "493017"
    user_id      BIGINT NOT NULL REFERENCES users(id),
    expires_at   TIMESTAMPTZ NOT NULL
);

Fluxo de pareamento

1.

User logado no Web pede "conectar Telegram" → backend gera code=493017, salva em channel_pairing_codes com TTL 10min.

2.

Bot mostra: "Mande /link 493017 no Telegram".

3.

User envia. Bot resolve code → user_id, insere em channel_links, deleta o code.

4.

A partir daí, qualquer mensagem no Telegram resolve pro mesmo user_id=42.

💡 Por que 6 dígitos e não link mágico: link em Telegram precisa de domínio https registrado em BotFather. Código numérico funciona em qualquer messenger sem setup extra.

2

💬 SlackChannel com Bolt SDK Socket Mode

Socket Mode dispensa endpoint público — o bot abre WebSocket pra Slack e recebe eventos em push. Ideal pra dev local e pra clientes self-hosted que não querem expor porta.

from slack_bolt.async_app import AsyncApp
from slack_bolt.adapter.socket_mode.async_handler import AsyncSocketModeHandler
from .agent import run_agent
from .channels.base import Channel
from .identity import resolve_user

class SlackChannel(Channel):
    name = "slack"

    def __init__(self, bot_token: str, app_token: str):
        self.app = AsyncApp(token=bot_token)
        self.handler = AsyncSocketModeHandler(self.app, app_token)
        self._register()

    def _register(self):
        @self.app.event("message")
        async def on_message(event, say):
            if event.get("bot_id"):           # ignora ecos do próprio bot
                return
            user = await resolve_user("slack", event["user"])
            if not user:
                await say(text="Conta não vinculada. Use /link <código> pra parear.",
                          thread_ts=event.get("ts"))
                return

            blocks = await run_agent(user_id=user.id, text=event["text"])
            await say(
                blocks=render_for(blocks, "slack"),
                thread_ts=event.get("thread_ts") or event["ts"],  # preserva contexto
                channel=event["channel"],
            )

    async def start(self):
        await self.handler.start_async()

    async def push(self, user_id: int, blocks: list, channel_id: str):
        await self.app.client.chat_postMessage(
            channel=channel_id,
            blocks=render_for(blocks, "slack"),
        )

💡 Tip — thread_ts preserva contexto: sempre responda em thread (thread_ts=event["ts"]). Senão multi-turn vira poluição visual no canal e o agente perde o fio quando 2 users falam ao mesmo tempo.

3

🌐 WebChannel — FastAPI + SSE streaming

SSE (Server-Sent Events) é mais simples que WebSocket pra LLM streaming: HTTP/1.1, unidirecional do server pro cliente, reconnect automático no browser. Pra chat agêntico, é o default.

from fastapi import FastAPI, Depends, HTTPException
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import json
from .agent import stream_agent
from .identity import resolve_user_by_session

app = FastAPI()

class ChatIn(BaseModel):
    text: str
    conversation_id: str | None = None

@app.post("/chat")
async def chat(body: ChatIn, session: str = Depends(get_session_cookie)):
    user = await resolve_user_by_session(session)
    if not user:
        raise HTTPException(401, "not linked")

    async def event_stream():
        async for chunk in stream_agent(
            user_id=user.id,
            text=body.text,
            conversation_id=body.conversation_id,
        ):
            # chunk = {"type": "token"|"tool_call"|"done", "data": ...}
            yield {"event": chunk["type"], "data": json.dumps(chunk["data"])}

    return EventSourceResponse(event_stream(), ping=15)

Cliente browser (vanilla)

const es = new EventSource("/chat/stream?cid=abc");
es.addEventListener("token", e => appendToken(JSON.parse(e.data)));
es.addEventListener("done",  e => es.close());
SSE vs WebSocket

SSE pra streaming de resposta. WS só se precisar uplink contínuo (voz, cursor live).

Heartbeat

ping=15 evita timeout de proxy corporativo (Cloudflare, nginx).

4

📣 Push proativo — quando o agente fala primeiro

Reativo é fácil: user pergunta, agente responde. Proativo é o que diferencia um chatbot de um copiloto. "Sua fatura vence amanhã", "Detectei drop de 30% no funil", "PR aprovado, posso fazer deploy?". Mas push errado = unsubscribe. Quiet hours não são opcional.

from datetime import datetime
from zoneinfo import ZoneInfo

QUIET_START, QUIET_END = 22, 8   # 22h às 8h local

async def push(user_id: int, blocks: list, *, urgency: str = "normal"):
    user = await load_user(user_id)
    tz = ZoneInfo(user.timezone or "America/Sao_Paulo")
    hour = datetime.now(tz).hour

    in_quiet = hour >= QUIET_START or hour < QUIET_END
    if in_quiet and urgency != "critical":
        # enfileira pra primeira hora da manhã
        await queue_push(user_id, blocks, deliver_at=next_morning(tz))
        return

    # ordem de preferência configurada pelo user
    for ch_name in user.channel_preference:   # ex: ["slack","telegram","web"]
        link = await get_link(user_id, ch_name)
        if not link:
            continue
        try:
            await CHANNELS[ch_name].push(user_id, blocks, link.external_id)
            await audit(user_id, ch_name, "push_sent", urgency=urgency)
            return
        except Exception as e:
            await audit(user_id, ch_name, "push_failed", error=str(e))
            continue   # tenta próximo canal

    await audit(user_id, "none", "push_dropped", reason="all_channels_failed")

✓ Fazer — push que respeita

  • ✓ Timezone do user, não do server.
  • ✓ Quiet hours por default 22h–8h.
  • ✓ Urgência critical só pra coisa real (alerta de prod, fraude).
  • ✓ Permitir /snooze 4h e /mute.
  • ✓ Limite: máx 3 push/dia/user em urgência normal.

✗ Evitar — push que vira ruído

  • ✗ "Bom dia!" às 3h da manhã porque o cron tá em UTC.
  • ✗ Marcar tudo como critical "por garantia".
  • ✗ Notificar drop de funil de 2% (ruído estatístico).
  • ✗ Push de marketing disfarçado de alerta.
  • ✗ Sem opção de desativar — vira denúncia de spam.
5

🎨 Renderer por canal — blocks → MarkdownV2 / Slack / HTML

O agente devolve uma estrutura abstrata (blocks) — não string formatada. Cada canal tem regras próprias de markup. Misturar = pesadelo. Uma função de render por canal, agente nunca toca formatação.

Formato por canal

Bloco Telegram Slack Web (HTML)
bold*texto**texto*<strong>
code`x``x`<code>
link[t](url) MDv2<url|t><a href>
buttonInlineKeyboardactions block<button>
imagesendPhotoimage block<img>
def render_for(blocks: list[dict], channel_type: str):
    """blocks = [{"type":"text","content":"..."}, {"type":"button",...}]"""

    if channel_type == "telegram":
        # MarkdownV2 — escape pesado obrigatório
        out = []
        for b in blocks:
            if b["type"] == "text":
                out.append(_escape_mdv2(b["content"]))
            elif b["type"] == "code":
                out.append(f"```\n{b['content']}\n```")
            elif b["type"] == "link":
                out.append(f"[{_escape_mdv2(b['label'])}]({b['url']})")
        return {"text": "\n".join(out), "parse_mode": "MarkdownV2"}

    elif channel_type == "slack":
        # Block Kit — JSON estruturado
        slack_blocks = []
        for b in blocks:
            if b["type"] == "text":
                slack_blocks.append({"type": "section",
                    "text": {"type": "mrkdwn", "text": b["content"]}})
            elif b["type"] == "button":
                slack_blocks.append({"type": "actions", "elements": [
                    {"type": "button",
                     "text": {"type": "plain_text", "text": b["label"]},
                     "action_id": b["action_id"],
                     "value": b.get("value", "")}
                ]})
            elif b["type"] == "code":
                slack_blocks.append({"type": "section",
                    "text": {"type": "mrkdwn", "text": f"```{b['content']}```"}})
        return slack_blocks

    elif channel_type == "web":
        # HTML sanitizado (bleach)
        import bleach
        html = []
        for b in blocks:
            if b["type"] == "text":
                html.append(f"<p>{bleach.clean(b['content'])}</p>")
            elif b["type"] == "code":
                html.append(f"<pre><code>{bleach.clean(b['content'])}</code></pre>")
            elif b["type"] == "button":
                html.append(f'<button data-action="{b["action_id"]}">{bleach.clean(b["label"])}</button>')
        return "\n".join(html)

    raise ValueError(f"unknown channel: {channel_type}")

📊 Dado real — 3 canais paralelos triplica retention

  • D30 single-channel (web): ~18% retention.
  • D30 com 2 canais (web + telegram): ~42%.
  • D30 com 3 canais + push proativo bem calibrado: ~58%.
  • Por quê: user escolhe onde já vive. Telegram pra mobile, Slack pra trabalho, Web pra contexto longo. Não força context-switch.
6

🛟 Failover entre canais — retry exponencial e audit

Slack fora do ar acontece. Telegram bloqueia IP em rate-limit. SSE quebra atrás de proxy corporativo. Se você tem 3 canais, use os 3 — failover automático é a feature que ninguém vê mas todo mundo agradece.

import asyncio, random
from .audit import audit

async def deliver_with_failover(user_id: int, blocks: list, urgency="normal"):
    user = await load_user(user_id)
    order = user.channel_preference or ["slack", "telegram", "web"]

    for ch_name in order:
        link = await get_link(user_id, ch_name)
        if not link:
            continue

        # retry exponencial: 0.5s, 1s, 2s (com jitter)
        for attempt in range(3):
            try:
                await CHANNELS[ch_name].push(user_id, blocks, link.external_id)
                await audit(user_id, channel=ch_name, event="delivered",
                            attempt=attempt, urgency=urgency)
                return ch_name   # canal usado fica no audit
            except RateLimitError:
                wait = (2 ** attempt) * 0.5 + random.uniform(0, 0.3)
                await asyncio.sleep(wait)
            except (ConnectionError, TimeoutError) as e:
                await audit(user_id, channel=ch_name, event="transient_fail",
                            attempt=attempt, error=str(e))
                await asyncio.sleep((2 ** attempt) * 0.5)
            except ChannelDownError:
                await audit(user_id, channel=ch_name, event="channel_down")
                break   # próximo canal, sem mais retry neste

    await audit(user_id, channel="none", event="all_failed", urgency=urgency)
    if urgency == "critical":
        await page_oncall(user_id, blocks)   # último recurso: humano
    return None

🚨 Alerta — canal único é single point of failure

Se seu agente só vive no Slack e o Slack cai por 2 horas (acontece ~1x/trimestre), seu produto desaparece nesse intervalo. Pior: notificações críticas (fraude, prod down) ficam mudas. Auditoria depois é caos.

Mínimo viável: 2 canais com failover automático + um e-mail como fallback de último recurso pra eventos critical.

Audit do canal usado

Toda mensagem grava qual canal entregou. Suporte rastreia rápido.

Jitter no retry

random.uniform(0,0.3) evita thundering herd quando 1k users retry juntos.

Circuit breaker

3 falhas seguidas no canal = marca down por 60s, pula direto pro próximo.

📝 Resumo do módulo

Identity stitchingchannel_links + código de 6 dígitos costura 3 canais num só user.
Slack via Bolt + Socket Mode — sem endpoint público, thread_ts preserva contexto.
Web via FastAPI + SSE — streaming simples, reconnect nativo do browser.
Push proativo com quiet hours — timezone do user, urgência calibrada, limite diário.
Renderer por canal — agente devolve blocks, render_for() traduz pra MarkdownV2 / Slack blocks / HTML.
Failover com retry exponencial — canal único = SPOF; mínimo viável são 2 + e-mail de último recurso.

Próximo módulo:

3.6 — Operação: monitoria, custos por user, kill switch, runbook quando o agente pira em produção.