34 - 多通道统一适配架构

飞书/钉钉/QQ 机器人 + 企微出站推送 + Voice 语音通道、AbstractBotRegistry模板方法、Redis去重、卡片化回复

一、为什么需要统一通道框架

没有统一框架时的痛点:

  ┌─────────────────────────────────────────────────────────┐
  │ 每接入一个新平台,都要从头写一遍:                       │
  │                                                         │
  │ 飞书Bot:  消息接收 → 验签 → 解析 → 调Agent → 回复     │
  │ 钉钉Bot:  消息接收 → 验签 → 解析 → 调Agent → 回复     │
  │ 企微Webhook: 主动发送 → Markdown/Text 消息推送       │
  │ QQ Bot:   消息接收 → 验签 → 解析 → 调Agent → 回复     │
  │                                                         │
  │ 问题:                                                   │
  │ · 多数逻辑重复 (去重、会话管理、Agent调用、错误处理)  │
  │ · 每个平台各自维护,Bug要修多次                       │
  │ · 新人接手困难,代码风格不统一                         │
  │ · 新增平台成本高(2000+行代码)                        │
  └─────────────────────────────────────────────────────────┘

  统一框架后:

  ┌─────────────────────────────────────────────────────────┐
  │ 共享层 (AbstractBotRegistry + ChannelServiceSupport):   │
  │ · 消息去重 (Redis SETNX)                               │
  │ · 会话管理 (Redis Session)                             │
  │ · Agent路由与调用                                      │
  │ · 错误处理与降级                                       │
  │ · 限流与熔断                                           │
  │ · 对话历史持久化                                       │
  └─────────────────────────┬───────────────────────────────┘
                            │ (只需实现差异部分)
        ┌───────────┬───────┼───────┬───────────┐
        ▼           ▼       ▼       ▼           ▼
  ┌──────────┐┌──────────┐┌────┐┌──────────┐┌───────┐
  │飞书:     ││钉钉:     ││QQ: ││企微:     ││Voice: │
  │完成卡片  ││Stream    ││WS  ││出站推送  ││STT/TTS│
  │~200行    ││~150行    ││~180││~200行    ││~250行 │
  └──────────┘└──────────┘└────┘└──────────┘└───────┘

  新增平台: 只需 ~200行平台特有代码!
  └─────────────────────────────────────────────────────────┘

二、AbstractBotRegistry 模板方法模式

使用模板方法模式定义Bot注册和消息处理的骨架,子类只需实现平台差异化的部分:

public abstract class AbstractBotRegistry<TBot, TEvent> {

    // ===== 模板方法:定义处理骨架 =====

    /** 处理入站消息的完整流程 (final=不可覆盖) */
    public final void handleInboundMessage(TEvent event) {
        // 1. 验证签名 (每个平台验签方式不同)
        if (!verifySignature(event)) {
            log.warn("签名验证失败: {}", extractEventId(event));
            return;
        }

        // 2. 提取标准化消息
        InboundMessage message = extractMessage(event);

        // 3. 去重检查 (共享实现)
        if (dedupeStore.isDuplicate(message.getMessageId())) {
            log.debug("重复消息已忽略: {}", message.getMessageId());
            return;
        }

        // 4. 获取/创建会话 (共享实现)
        ChannelSession session = sessionStore.getOrCreate(
            message.getChannelType(), message.getUserId());

        // 5. 路由到Agent并执行
        AgentResponse response = channelService.routeAndExecute(session, message);

        // 6. 发送回复 (每个平台发送方式不同)
        sendReply(event, response);
    }

    // ===== 抽象方法:子类必须实现 =====

    /** 验证平台签名 */
    protected abstract boolean verifySignature(TEvent event);

    /** 从平台事件中提取标准消息 */
    protected abstract InboundMessage extractMessage(TEvent event);

    /** 发送回复到平台 */
    protected abstract void sendReply(TEvent event, AgentResponse response);

    /** 提取事件ID (用于日志) */
    protected abstract String extractEventId(TEvent event);

    // ===== 可选钩子:子类可覆盖 =====

    /** 消息预处理 (如@机器人提取、指令解析) */
    protected InboundMessage preProcess(InboundMessage msg) { return msg; }

    /** 回复后处理 (如消息已读回执) */
    protected void postReply(TEvent event) { /* no-op */ }
}
模板方法模式类图:

  ┌──────────────────────────────────────────┐
  │  AbstractBotRegistry<TBot, TEvent>       │
  │  ─────────────────────────────────────── │
  │  + handleInboundMessage(event)  [final]  │
  │  # verifySignature(event)       [abstract]│
  │  # extractMessage(event)        [abstract]│
  │  # sendReply(event, response)   [abstract]│
  │  # preProcess(msg)              [hook]    │
  │  # postReply(event)             [hook]    │
  └───────────────────┬──────────────────────┘
                      │
       ┌──────────────┼──────────────┐
       ▼              ▼              ▼
  ┌──────────┐  ┌──────────┐  ┌──────────┐
  │FeishuBot │  │DingTalkBot│  │QQBot     │
  │Registry  │  │Registry   │  │Registry  │
  └──────────┘  └──────────┘  └──────────┘

三、ChannelServiceSupport 共享流程

@Service
public class ChannelServiceSupport {

    private final AgentRegistry agentRegistry;
    private final ConversationService conversationService;
    private final DistributedRateLimiter rateLimiter;

    /**
     * 路由消息到Agent并执行
     * 所有通道共享此核心流程
     */
    public AgentResponse routeAndExecute(ChannelSession session, InboundMessage msg) {
        // 1. 限流检查
        if (!rateLimiter.tryAcquire("channel:" + session.getUserId(), 30, 60)) {
            return AgentResponse.rateLimited("请求过于频繁,请稍后再试");
        }

        // 2. 确定目标Agent (会话绑定 or 默认)
        String agentId = session.getBoundAgentId() != null
            ? session.getBoundAgentId()
            : resolveAgent(msg);
        Agent agent = agentRegistry.getAgent(agentId);

        // 3. 加载对话历史
        List<Message> history = conversationService.getHistory(
            session.getSessionId(), 20);  // 最近20条

        // 4. 构建请求
        ChatRequest request = ChatRequest.builder()
            .messages(history)
            .userMessage(msg.getContent())
            .sessionId(session.getSessionId())
            .build();

        // 5. 调用Agent
        String reply = agent.chat(request)
            .map(ChatResponse::getContent)
            .collectList()
            .map(chunks -> String.join("", chunks))
            .block(Duration.ofSeconds(agent.getTimeout().getSeconds()));

        // 6. 持久化对话
        conversationService.saveExchange(session.getSessionId(), msg.getContent(), reply);

        return AgentResponse.success(reply);
    }
}

四、RedisInboundDedupeStore 消息去重

IM平台经常重复推送消息(网络超时重试、Webhook重发),必须做幂等去重:

@Component
public class RedisInboundDedupeStore implements DistributedDedupeStore {

    private final StringRedisTemplate redis;

    /**
     * 检查消息是否重复
     * 使用 SET key NX EX ttl 原子操作
     *
     * NX: 只在key不存在时设置 (Not eXists)
     * EX: 设置过期时间
     *
     * @return true=重复消息(应丢弃), false=新消息(应处理)
     */
    @Override
    public boolean isDuplicate(String messageId) {
        String key = "dedup:msg:" + messageId;
        // SET NX EX: 原子操作,无竞态条件
        Boolean result = redis.opsForValue()
            .setIfAbsent(key, "1", Duration.ofMinutes(5));
        // result=true → 首次设置成功 → 非重复
        // result=false → key已存在 → 重复
        return !Boolean.TRUE.equals(result);
    }

    /**
     * 批量去重检查 (Pipeline优化)
     */
    @Override
    public Map<String, Boolean> batchCheck(List<String> messageIds) {
        List<Object> results = redis.executePipelined((RedisCallback<Object>) conn -> {
            for (String msgId : messageIds) {
                conn.stringCommands().set(
                    ("dedup:msg:" + msgId).getBytes(),
                    "1".getBytes(),
                    Expiration.seconds(300),
                    SetOption.ifAbsent()
                );
            }
            return null;
        });
        // 组装结果...
    }
}
去重时序图:

  飞书服务器            我们的服务              Redis
  │                     │                      │
  │── POST /webhook ──→│                      │
  │   msgId=abc123      │── SET dedup:msg:abc123 NX EX 300 ──→│
  │                     │                      │── OK (首次) ─→│
  │                     │← 处理消息            │
  │←── 200 OK ────────│                      │
  │                     │                      │
  │── POST /webhook ──→│  (网络超时重发)       │
  │   msgId=abc123      │── SET dedup:msg:abc123 NX EX 300 ──→│
  │                     │                      │── nil (已存在)│
  │                     │← 丢弃! 不重复处理   │
  │←── 200 OK ────────│                      │
  │                     │                      │

  关键设计:
  · TTL = 5分钟: 平衡去重窗口和内存占用
  · NX原子性: 即使并发请求也只有一个能处理
  · Pipeline批量: 高吞吐场景减少网络往返

五、RedisChannelSessionStore 会话管理

@Component
public class RedisChannelSessionStore implements DistributedSessionStore {

    /**
     * 会话数据结构 (Redis Hash):
     * Key: session:{channelType}:{userId}
     * Fields:
     *   sessionId    → UUID
     *   agentId      → 当前绑定的Agent
     *   createdAt    → 创建时间
     *   lastActiveAt → 最后活跃时间
     *   context      → 会话上下文JSON
     */
    @Override
    public ChannelSession getOrCreate(String channelType, String userId) {
        String key = "session:" + channelType + ":" + userId;

        // 尝试获取已有会话
        Map<Object, Object> data = redis.opsForHash().entries(key);

        if (data.isEmpty()) {
            // 创建新会话
            ChannelSession session = ChannelSession.builder()
                .sessionId(UUID.randomUUID().toString())
                .channelType(channelType)
                .userId(userId)
                .createdAt(Instant.now())
                .build();
            saveSession(key, session);
            return session;
        }

        // 更新最后活跃时间
        redis.opsForHash().put(key, "lastActiveAt", Instant.now().toString());
        redis.expire(key, Duration.ofHours(2));  // 2小时无活动自动过期
        return deserialize(data);
    }
}

六、各平台特有实现

飞书:WebSocket 事件与卡片回复

飞书卡片化回复:

  普通回复: 文本直接发送
  当前实现: 先发"正在思考"文本 → 收集 Agent SSE → 发送结果卡片

  实现原理:
  ┌────────────────────────────────────────────────────┐
  │ 1. FeishuBotRegistry 启动 Lark WebSocket Client    │
  │ 2. FeishuChannelService 处理消息 / 菜单 / P2P事件  │
  │ 3. ChannelServiceSupport 复用会话、历史、安全扫描  │
  │ 4. streamCollect 聚合 Agent 响应                   │
  │ 5. 完成后发送结果卡片、来源和推荐追问              │
  │                                                    │
  │ API调用:                                           │
  │ Lark WebSocket → 接收事件                          │
  │ POST /messages 或 reply → 发送文本/卡片            │
  │                                                    │
  │ 当前边界:                                          │
  │ · 主链路不是逐 Token 卡片更新                      │
  │ · 没有每 500ms PATCH 同一张卡片                    │
  │ · 面试时讲卡片交互和通道复用,不讲流式卡片         │
  └────────────────────────────────────────────────────┘

钉钉:Stream模式

钉钉Stream (长连接模式):

  传统Webhook:
  钉钉 ──HTTP POST──→ 我们的服务器 (需要公网IP)

  Stream模式:
  我们的服务器 ──WebSocket──→ 钉钉开放平台
  (主动连接钉钉,无需公网IP和域名!)

  ┌─────────────────────────────────────────────┐
  │ DingTalkStreamClient                        │
  │                                             │
  │ 1. 启动时注册Stream → 获得endpoint          │
  │ 2. WebSocket连接到endpoint                  │
  │ 3. 接收推送消息 (JSON格式)                  │
  │ 4. 处理后通过HTTP API回复                   │
  │                                             │
  │ 优势:                                       │
  │ · 开发环境无需ngrok/内网穿透               │
  │ · 无需SSL证书和域名                        │
  │ · 连接稳定,断线自动重连                   │
  └─────────────────────────────────────────────┘

QQ:WebSocket官方Bot

QQ Bot WebSocket:

  ┌─────────────────────────────────────────────┐
  │ QQBotWebSocketClient                        │
  │                                             │
  │ 1. 获取WebSocket网关地址                    │
  │    GET /gateway → wss://...                 │
  │                                             │
  │ 2. 建立WS连接 + 鉴权                       │
  │    IDENTIFY { token, intents, shard }       │
  │                                             │
  │ 3. 维持心跳 (每41.25秒)                    │
  │    HEARTBEAT { last_sequence }              │
  │                                             │
  │ 4. 接收消息事件                             │
  │    DISPATCH { type: "MESSAGE_CREATE", ... } │
  │                                             │
  │ 5. 通过REST API回复                         │
  │    POST /channels/{id}/messages             │
  │                                             │
  │ 断线重连:                                   │
  │ · 6/RECONNECT → 重新IDENTIFY               │
  │ · 网络异常 → 指数退避重连(1s→2s→4s...)     │
  └─────────────────────────────────────────────┘

企业微信:出站 Webhook

企业微信发送处理:

  当前实现以管理端主动发送能力为主(outbound-only)

  ┌────────────────────────────────────────────┐
  │ 1. 管理端配置 webhookKey                  │
  │ 2. 选择 text / markdown 消息类型          │
  │ 3. 调用企业微信群机器人 Webhook           │
  │ 4. 记录审计日志与运行态状态               │
  │                                            │
  │ 说明:                                     │
  │ · 不提供入站会话                          │
  │ · 不进入 Agent 对话路由                   │
  │ · 主要用于通知/主动推送类场景             │
  └────────────────────────────────────────────┘

七、Voice 通道 (OpenAI 兼容 ASR/TTS)

Voice通道处理流程:

  ┌────────┐      ┌─────────────┐      ┌───────────┐      ┌────────┐
  │ 用户   │      │ VoiceChannel│      │ Agent     │      │ 用户   │
  │ 语音   │─────→│ Controller  │─────→│ (文本处理)│─────→│ 语音   │
  │ 输入   │      │             │      │           │      │ 输出   │
  └────────┘      └──────┬──────┘      └───────────┘      └────────┘
                         │
              ┌──────────┼──────────┐
              ▼                     ▼
  ┌───────────────────┐  ┌───────────────────┐
  │ ASR (Speech→Text) │  │ TTS (Text→Speech) │
  │ OpenAI兼容接口    │  │ OpenAI兼容接口    │
  │                   │  │                   │
  │ 支持格式:         │  │ 支持音色:         │
  │ · multipart上传  │  │ · 同步合成音频    │
  │ · wav/mp3等格式  │  │ · 音色可配置      │
  │ · 同步转写       │  │ · 格式可配置      │
  └───────────────────┘  └───────────────────┘

  完整链路:
  用户语音 → ASR → 文本 → Agent处理 → 回复文本 → TTS → 语音播放

  当前边界:
  · HTTP multipart 上传音频,非流式 ASR
  · Agent 回复聚合为文本后再同步 TTS
  · 流式 ASR/TTS、VAD、WebSocket 双工会话属于后续优化
POST /api/voice/chat
  audio      = MultipartFile
  agentId    = chat
  outputMode = text | audio

VoiceChannelController
  1. SpeechToTextService.recognize(...)
  2. ChannelServiceSupport.executeAgent(...)
  3. streamCollect(...) 聚合 Agent 回复
  4. TextToSpeechService.synthesize(...) 可选返回音频

八、通道架构总览

多通道统一架构全景:

  ┌─────────────────────────────────────────────────────────────────┐
  │                         外部接入平台                            │
  │  飞书      钉钉      QQ       企业微信出站  语音设备             │
  └──┬─────────┬─────────┬─────────┬───────────┬───────────────────┘
     │         │         │         │           │
     ▼         ▼         ▼         ▼           ▼
  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌────────────┐
  │Feishu│ │Ding  │ │QQ Bot│ │WeCom │ │Voice       │
  │Bot   │ │Talk  │ │WS    │ │Webhook│ │Channel    │
  │Ctrl  │ │Stream│ │Client│ │Send   │ │Controller │
  └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └─────┬──────┘
     │         │         │         │           │
     └─────────┴─────────┴─────────┴───────────┘
                         │
                         ▼
  ┌─────────────────────────────────────────────────────────────────┐
  │        AbstractBotRegistry (机器人生命周期,不含 Voice)          │
  │  多 Bot 配置加载 → start/stop → runtime status → 审计辅助       │
  └─────────────────────────────┬───────────────────────────────────┘
                                │
                                ▼
  ┌─────────────────────────────────────────────────────────────────┐
  │              ChannelServiceSupport (机器人共享流程)              │
  │  去重 → 会话管理 → 历史加载 → 安全扫描 → Agent调用 → 持久化    │
  └─────────────────────────────┬───────────────────────────────────┘
                                │
          ┌─────────────────────┼─────────────────────┐
          ▼                     ▼                     ▼
  ┌──────────────┐    ┌──────────────┐    ┌──────────────────┐
  │ DedupeStore  │    │ SessionStore │    │ AgentRegistry    │
  │ (Redis NX)   │    │ (Redis Hash) │    │ (100+ Agents)   │
  └──────────────┘    └──────────────┘    └──────────────────┘

  VoiceChannelController 单独提供 REST 语音入口:
  HTTP multipart → ASR → Agent 单轮调用 → 可选同步 TTS

九、各平台接入对比

维度飞书钉钉QQ企业微信Voice
连接方式WebSocket长连接Stream长连接WebSocketWebhook主动发送REST API
验签方式SHA256+EncryptHMAC-SHA256Ed25519Webhook KeyBearer Token
消息格式JSONJSONJSONJSON(text/markdown)Binary+JSON
流式支持思考态+完成卡片Markdown更新不支持不支持同步音频
回复时限无限制无限制5分钟出站-only无限制
需公网IP否(Stream)否(WS)
特有代码量~200行~150行~180行~200行~250行

十、面试话术

面试话术:"我们设计了统一的多通道适配框架,核心是AbstractBotRegistry模板方法模式。它定义了机器人消息处理的完整骨架:验签→去重→提取消息→路由Agent→发送回复。各平台只需实现差异化的部分(验签算法、消息格式解析、回复方式),共享层负责去重、会话管理、限流、Agent调用等通用逻辑。目前支持飞书、钉钉、QQ、企业微信出站推送,以及Voice语音通道;新增一个平台只需补齐平台特有适配。"
面试话术:"消息去重用 Redis 的 SET NX EX 原子命令实现——收到消息时用 messageId 作为 key 做 SETNX,设 TTL。首次设置成功说明是新消息则处理,设置失败说明重复则丢弃。会话管理用 Redis Hash 存储当前 Agent 绑定。飞书通道通过 Lark WebSocket 接收事件,先回复正在思考,再收集 Agent SSE 输出,完成后发送结果卡片和推荐追问。这里我不会夸成逐 Token 流式卡片,而是强调通道复用、幂等和卡片交互体验。"