34 - 多通道统一适配架构

5大IM通道统一框架、AbstractBotRegistry模板方法、Redis去重、流式卡片

一、为什么需要统一通道框架

没有统一框架时的痛点:

  ┌─────────────────────────────────────────────────────────┐
  │ 每接入一个新平台,都要从头写一遍:                       │
  │                                                         │
  │ 飞书Bot:  消息接收 → 验签 → 解析 → 调Agent → 回复     │
  │ 钉钉Bot:  消息接收 → 验签 → 解析 → 调Agent → 回复     │
  │ 企微Bot:  消息接收 → 验签 → 解析 → 调Agent → 回复     │
  │ QQ Bot:   消息接收 → 验签 → 解析 → 调Agent → 回复     │
  │ 微信Bot:  消息接收 → 验签 → 解析 → 调Agent → 回复     │
  │                                                         │
  │ 问题:                                                   │
  │ · 80%的逻辑重复 (去重、会话管理、Agent调用、错误处理)  │
  │ · 每个平台各自维护,Bug要修5次                         │
  │ · 新人接手困难,代码风格不统一                         │
  │ · 新增平台成本高(2000+行代码)                        │
  └─────────────────────────────────────────────────────────┘

  统一框架后:

  ┌─────────────────────────────────────────────────────────┐
  │ 共享层 (AbstractBotRegistry + ChannelServiceSupport):   │
  │ · 消息去重 (Redis SETNX)                               │
  │ · 会话管理 (Redis Session)                             │
  │ · Agent路由与调用                                      │
  │ · 错误处理与降级                                       │
  │ · 限流与熔断                                           │
  │ · 对话历史持久化                                       │
  └─────────────────────────┬───────────────────────────────┘
                            │ (只需实现差异部分)
        ┌───────────┬───────┼───────┬───────────┐
        ▼           ▼       ▼       ▼           ▼
  ┌──────────┐┌──────────┐┌────┐┌──────────┐┌───────┐
  │飞书:     ││钉钉:     ││QQ: ││企微:     ││Voice: │
  │流式卡片  ││Stream    ││WS  ││XML加密   ││STT/TTS│
  │~200行    ││~150行    ││~180││~200行    ││~250行 │
  └──────────┘└──────────┘└────┘└──────────┘└───────┘

  新增平台: 只需 ~200行平台特有代码!
  └─────────────────────────────────────────────────────────┘

二、AbstractBotRegistry 模板方法模式

使用模板方法模式定义Bot注册和消息处理的骨架,子类只需实现平台差异化的部分:

public abstract class AbstractBotRegistry<TBot, TEvent> {

    // ===== 模板方法:定义处理骨架 =====

    /** 处理入站消息的完整流程 (final=不可覆盖) */
    public final void handleInboundMessage(TEvent event) {
        // 1. 验证签名 (每个平台验签方式不同)
        if (!verifySignature(event)) {
            log.warn("签名验证失败: {}", extractEventId(event));
            return;
        }

        // 2. 提取标准化消息
        InboundMessage message = extractMessage(event);

        // 3. 去重检查 (共享实现)
        if (dedupeStore.isDuplicate(message.getMessageId())) {
            log.debug("重复消息已忽略: {}", message.getMessageId());
            return;
        }

        // 4. 获取/创建会话 (共享实现)
        ChannelSession session = sessionStore.getOrCreate(
            message.getChannelType(), message.getUserId());

        // 5. 路由到Agent并执行
        AgentResponse response = channelService.routeAndExecute(session, message);

        // 6. 发送回复 (每个平台发送方式不同)
        sendReply(event, response);
    }

    // ===== 抽象方法:子类必须实现 =====

    /** 验证平台签名 */
    protected abstract boolean verifySignature(TEvent event);

    /** 从平台事件中提取标准消息 */
    protected abstract InboundMessage extractMessage(TEvent event);

    /** 发送回复到平台 */
    protected abstract void sendReply(TEvent event, AgentResponse response);

    /** 提取事件ID (用于日志) */
    protected abstract String extractEventId(TEvent event);

    // ===== 可选钩子:子类可覆盖 =====

    /** 消息预处理 (如@机器人提取、指令解析) */
    protected InboundMessage preProcess(InboundMessage msg) { return msg; }

    /** 回复后处理 (如消息已读回执) */
    protected void postReply(TEvent event) { /* no-op */ }
}
模板方法模式类图:

  ┌──────────────────────────────────────────┐
  │  AbstractBotRegistry<TBot, TEvent>       │
  │  ─────────────────────────────────────── │
  │  + handleInboundMessage(event)  [final]  │
  │  # verifySignature(event)       [abstract]│
  │  # extractMessage(event)        [abstract]│
  │  # sendReply(event, response)   [abstract]│
  │  # preProcess(msg)              [hook]    │
  │  # postReply(event)             [hook]    │
  └───────────────────┬──────────────────────┘
                      │
       ┌──────────────┼──────────────┐
       ▼              ▼              ▼
  ┌──────────┐  ┌──────────┐  ┌──────────┐
  │FeishuBot │  │DingTalkBot│  │QQBot     │
  │Registry  │  │Registry   │  │Registry  │
  └──────────┘  └──────────┘  └──────────┘

三、ChannelServiceSupport 共享流程

@Service
public class ChannelServiceSupport {

    private final AgentRegistry agentRegistry;
    private final ConversationService conversationService;
    private final DistributedRateLimiter rateLimiter;

    /**
     * 路由消息到Agent并执行
     * 所有通道共享此核心流程
     */
    public AgentResponse routeAndExecute(ChannelSession session, InboundMessage msg) {
        // 1. 限流检查
        if (!rateLimiter.tryAcquire("channel:" + session.getUserId(), 30, 60)) {
            return AgentResponse.rateLimited("请求过于频繁,请稍后再试");
        }

        // 2. 确定目标Agent (会话绑定 or 默认)
        String agentId = session.getBoundAgentId() != null
            ? session.getBoundAgentId()
            : resolveAgent(msg);
        Agent agent = agentRegistry.getAgent(agentId);

        // 3. 加载对话历史
        List<Message> history = conversationService.getHistory(
            session.getSessionId(), 20);  // 最近20条

        // 4. 构建请求
        ChatRequest request = ChatRequest.builder()
            .messages(history)
            .userMessage(msg.getContent())
            .sessionId(session.getSessionId())
            .build();

        // 5. 调用Agent
        String reply = agent.chat(request)
            .map(ChatResponse::getContent)
            .collectList()
            .map(chunks -> String.join("", chunks))
            .block(Duration.ofSeconds(agent.getTimeout().getSeconds()));

        // 6. 持久化对话
        conversationService.saveExchange(session.getSessionId(), msg.getContent(), reply);

        return AgentResponse.success(reply);
    }
}

四、RedisInboundDedupeStore 消息去重

IM平台经常重复推送消息(网络超时重试、Webhook重发),必须做幂等去重:

@Component
public class RedisInboundDedupeStore implements DistributedDedupeStore {

    private final StringRedisTemplate redis;

    /**
     * 检查消息是否重复
     * 使用 SET key NX EX ttl 原子操作
     *
     * NX: 只在key不存在时设置 (Not eXists)
     * EX: 设置过期时间
     *
     * @return true=重复消息(应丢弃), false=新消息(应处理)
     */
    @Override
    public boolean isDuplicate(String messageId) {
        String key = "dedup:msg:" + messageId;
        // SET NX EX: 原子操作,无竞态条件
        Boolean result = redis.opsForValue()
            .setIfAbsent(key, "1", Duration.ofMinutes(5));
        // result=true → 首次设置成功 → 非重复
        // result=false → key已存在 → 重复
        return !Boolean.TRUE.equals(result);
    }

    /**
     * 批量去重检查 (Pipeline优化)
     */
    @Override
    public Map<String, Boolean> batchCheck(List<String> messageIds) {
        List<Object> results = redis.executePipelined((RedisCallback<Object>) conn -> {
            for (String msgId : messageIds) {
                conn.stringCommands().set(
                    ("dedup:msg:" + msgId).getBytes(),
                    "1".getBytes(),
                    Expiration.seconds(300),
                    SetOption.ifAbsent()
                );
            }
            return null;
        });
        // 组装结果...
    }
}
去重时序图:

  飞书服务器            我们的服务              Redis
  │                     │                      │
  │── POST /webhook ──→│                      │
  │   msgId=abc123      │── SET dedup:msg:abc123 NX EX 300 ──→│
  │                     │                      │── OK (首次) ─→│
  │                     │← 处理消息            │
  │←── 200 OK ────────│                      │
  │                     │                      │
  │── POST /webhook ──→│  (网络超时重发)       │
  │   msgId=abc123      │── SET dedup:msg:abc123 NX EX 300 ──→│
  │                     │                      │── nil (已存在)│
  │                     │← 丢弃! 不重复处理   │
  │←── 200 OK ────────│                      │
  │                     │                      │

  关键设计:
  · TTL = 5分钟: 平衡去重窗口和内存占用
  · NX原子性: 即使并发请求也只有一个能处理
  · Pipeline批量: 高吞吐场景减少网络往返

五、RedisChannelSessionStore 会话管理

@Component
public class RedisChannelSessionStore implements DistributedSessionStore {

    /**
     * 会话数据结构 (Redis Hash):
     * Key: session:{channelType}:{userId}
     * Fields:
     *   sessionId    → UUID
     *   agentId      → 当前绑定的Agent
     *   createdAt    → 创建时间
     *   lastActiveAt → 最后活跃时间
     *   context      → 会话上下文JSON
     */
    @Override
    public ChannelSession getOrCreate(String channelType, String userId) {
        String key = "session:" + channelType + ":" + userId;

        // 尝试获取已有会话
        Map<Object, Object> data = redis.opsForHash().entries(key);

        if (data.isEmpty()) {
            // 创建新会话
            ChannelSession session = ChannelSession.builder()
                .sessionId(UUID.randomUUID().toString())
                .channelType(channelType)
                .userId(userId)
                .createdAt(Instant.now())
                .build();
            saveSession(key, session);
            return session;
        }

        // 更新最后活跃时间
        redis.opsForHash().put(key, "lastActiveAt", Instant.now().toString());
        redis.expire(key, Duration.ofHours(2));  // 2小时无活动自动过期
        return deserialize(data);
    }
}

六、各平台特有实现

飞书:流式卡片更新

飞书流式卡片 (Streaming Card):

  普通回复: 等Agent全部生成完 → 一次性发送(用户等很久)
  流式卡片: Agent边生成边更新卡片内容(实时感)

  实现原理:
  ┌────────────────────────────────────────────────────┐
  │ 1. 先发送一张"思考中..."的卡片 → 拿到 messageId   │
  │ 2. Agent生成过程中,每500ms PATCH更新卡片内容      │
  │ 3. Agent完成后,最终PATCH更新为完整内容            │
  │                                                    │
  │ API调用:                                           │
  │ POST /messages → 创建卡片 (返回message_id)         │
  │ PATCH /messages/{id} → 更新卡片内容 (流式)        │
  │                                                    │
  │ 防抖优化:                                          │
  │ · 不是每个token都更新(太频繁会被限流)             │
  │ · 攒够50字符 或 超过500ms 才触发一次更新          │
  │ · 最终完成时强制更新一次确保内容完整              │
  └────────────────────────────────────────────────────┘

钉钉:Stream模式

钉钉Stream (长连接模式):

  传统Webhook:
  钉钉 ──HTTP POST──→ 我们的服务器 (需要公网IP)

  Stream模式:
  我们的服务器 ──WebSocket──→ 钉钉开放平台
  (主动连接钉钉,无需公网IP和域名!)

  ┌─────────────────────────────────────────────┐
  │ DingTalkStreamClient                        │
  │                                             │
  │ 1. 启动时注册Stream → 获得endpoint          │
  │ 2. WebSocket连接到endpoint                  │
  │ 3. 接收推送消息 (JSON格式)                  │
  │ 4. 处理后通过HTTP API回复                   │
  │                                             │
  │ 优势:                                       │
  │ · 开发环境无需ngrok/内网穿透               │
  │ · 无需SSL证书和域名                        │
  │ · 连接稳定,断线自动重连                   │
  └─────────────────────────────────────────────┘

QQ:WebSocket官方Bot

QQ Bot WebSocket:

  ┌─────────────────────────────────────────────┐
  │ QQBotWebSocketClient                        │
  │                                             │
  │ 1. 获取WebSocket网关地址                    │
  │    GET /gateway → wss://...                 │
  │                                             │
  │ 2. 建立WS连接 + 鉴权                       │
  │    IDENTIFY { token, intents, shard }       │
  │                                             │
  │ 3. 维持心跳 (每41.25秒)                    │
  │    HEARTBEAT { last_sequence }              │
  │                                             │
  │ 4. 接收消息事件                             │
  │    DISPATCH { type: "MESSAGE_CREATE", ... } │
  │                                             │
  │ 5. 通过REST API回复                         │
  │    POST /channels/{id}/messages             │
  │                                             │
  │ 断线重连:                                   │
  │ · 6/RECONNECT → 重新IDENTIFY               │
  │ · 网络异常 → 指数退避重连(1s→2s→4s...)     │
  └─────────────────────────────────────────────┘

企业微信:XML加密

企业微信消息处理:

  特殊之处: 消息体是XML + AES加密

  ┌────────────────────────────────────────────┐
  │ 接收消息:                                  │
  │ 1. 验证URL签名 (SHA1(token+timestamp+nonce+encrypt)) │
  │ 2. 解密消息体 (AES-256-CBC)               │
  │ 3. 解析XML → 提取MsgType/Content/FromUser │
  │                                            │
  │ 回复消息:                                  │
  │ 1. 构建回复XML                            │
  │ 2. AES加密                                │
  │ 3. 生成签名                               │
  │ 4. 拼装加密XML响应体                      │
  │                                            │
  │ 被动回复 vs 主动推送:                      │
  │ · 被动回复: 5秒内必须响应(否则超时)       │
  │ · 超时处理: 先返回空串,再主动推送结果    │
  └────────────────────────────────────────────┘

七、Voice 通道 (OpenAI STT/TTS)

Voice通道处理流程:

  ┌────────┐      ┌─────────────┐      ┌───────────┐      ┌────────┐
  │ 用户   │      │ VoiceChannel│      │ Agent     │      │ 用户   │
  │ 语音   │─────→│ Service     │─────→│ (文本处理)│─────→│ 语音   │
  │ 输入   │      │             │      │           │      │ 输出   │
  └────────┘      └──────┬──────┘      └───────────┘      └────────┘
                         │
              ┌──────────┼──────────┐
              ▼                     ▼
  ┌───────────────────┐  ┌───────────────────┐
  │ STT (Speech→Text) │  │ TTS (Text→Speech) │
  │ OpenAI Whisper    │  │ OpenAI TTS-1      │
  │                   │  │                   │
  │ 支持格式:         │  │ 支持音色:         │
  │ · WAV/MP3/OGG    │  │ · alloy/echo/fable│
  │ · 实时流式输入   │  │ · onyx/nova/shimmer│
  │ · 多语言自动识别 │  │ · 流式音频输出    │
  └───────────────────┘  └───────────────────┘

  完整链路:
  用户语音 → Whisper STT → 文本 → Agent处理 → 回复文本 → TTS-1 → 语音播放

  优化:
  · STT流式: 边录边转,减少等待时间
  · TTS流式: 边生成边播放,首字节延迟<500ms
  · 语音活动检测(VAD): 自动判断用户说完了
  · 静音超时: 3秒无声自动结束录制
@Service
public class VoiceChannelService {

    private final OpenAiAudioClient audioClient;
    private final ChannelServiceSupport channelService;

    /**
     * 处理语音消息
     */
    public Mono<byte[]> handleVoiceMessage(byte[] audioData, String userId) {
        // 1. STT: 语音 → 文本
        String transcript = audioClient.transcribe(
            TranscriptionRequest.builder()
                .audio(audioData)
                .model("whisper-1")
                .language("zh")
                .build()
        ).getText();

        // 2. 调用Agent处理文本
        ChannelSession session = sessionStore.getOrCreate("voice", userId);
        InboundMessage msg = InboundMessage.of(transcript, "voice", userId);
        AgentResponse response = channelService.routeAndExecute(session, msg);

        // 3. TTS: 文本 → 语音
        byte[] speechAudio = audioClient.speech(
            SpeechRequest.builder()
                .model("tts-1")
                .input(response.getContent())
                .voice("nova")          // 中文女声效果最好
                .responseFormat("opus") // 压缩格式,带宽友好
                .speed(1.0)
                .build()
        ).getAudioData();

        return Mono.just(speechAudio);
    }
}

八、通道架构总览

多通道统一架构全景:

  ┌─────────────────────────────────────────────────────────────────┐
  │                         外部IM平台                              │
  │  飞书      钉钉      QQ       企业微信     语音设备             │
  └──┬─────────┬─────────┬─────────┬───────────┬───────────────────┘
     │         │         │         │           │
     ▼         ▼         ▼         ▼           ▼
  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌────────────┐
  │Feishu│ │Ding  │ │QQ Bot│ │WeCom │ │Voice       │
  │Bot   │ │Talk  │ │WS    │ │Bot   │ │Channel     │
  │Ctrl  │ │Stream│ │Client│ │Ctrl  │ │Service     │
  └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └─────┬──────┘
     │         │         │         │           │
     └─────────┴─────────┴─────────┴───────────┘
                         │
                         ▼
  ┌─────────────────────────────────────────────────────────────────┐
  │                 AbstractBotRegistry (模板方法)                   │
  │  verifySignature → extractMessage → dedupe → route → reply     │
  └─────────────────────────────┬───────────────────────────────────┘
                                │
                                ▼
  ┌─────────────────────────────────────────────────────────────────┐
  │              ChannelServiceSupport (共享流程)                    │
  │  限流 → 会话管理 → 历史加载 → Agent调用 → 持久化              │
  └─────────────────────────────┬───────────────────────────────────┘
                                │
          ┌─────────────────────┼─────────────────────┐
          ▼                     ▼                     ▼
  ┌──────────────┐    ┌──────────────┐    ┌──────────────────┐
  │ DedupeStore  │    │ SessionStore │    │ AgentRegistry    │
  │ (Redis NX)   │    │ (Redis Hash) │    │ (100+ Agents)   │
  └──────────────┘    └──────────────┘    └──────────────────┘

九、各平台接入对比

维度飞书钉钉QQ企业微信Voice
连接方式Webhook回调Stream长连接WebSocketWebhook回调REST API
验签方式SHA256+EncryptHMAC-SHA256Ed25519SHA1+AESBearer Token
消息格式JSONJSONJSONXML(加密)Binary+JSON
流式支持卡片更新Markdown更新不支持不支持流式音频
回复时限无限制无限制5分钟5秒(被动)无限制
需公网IP否(Stream)否(WS)
特有代码量~200行~150行~180行~200行~250行

十、面试话术

面试话术:"我们设计了统一的多通道适配框架,核心是AbstractBotRegistry模板方法模式。它定义了消息处理的完整骨架:验签→去重→提取消息→路由Agent→发送回复。各平台只需实现差异化的部分(验签算法、消息格式解析、回复方式),共享层负责去重、会话管理、限流、Agent调用等80%的通用逻辑。目前支持飞书、钉钉、QQ、企业微信、语音5大通道,新增一个平台只需约200行代码。"
面试话术:"消息去重用Redis的SET NX EX原子命令实现——收到消息时用messageId作为key做SETNX,设5分钟TTL。首次设置成功说明是新消息则处理,设置失败说明是重复则丢弃。这保证了幂等性,即使IM平台因网络超时重复推送webhook也不会重复处理。会话管理用Redis Hash存储,Key按channelType:userId分区,2小时无活动自动过期。飞书通道还实现了流式卡片更新——先创建一张'思考中'的卡片拿到messageId,然后Agent边生成边PATCH更新卡片内容,用户能看到实时打字效果。"