5大IM通道统一框架、AbstractBotRegistry模板方法、Redis去重、流式卡片
没有统一框架时的痛点:
┌─────────────────────────────────────────────────────────┐
│ 每接入一个新平台,都要从头写一遍: │
│ │
│ 飞书Bot: 消息接收 → 验签 → 解析 → 调Agent → 回复 │
│ 钉钉Bot: 消息接收 → 验签 → 解析 → 调Agent → 回复 │
│ 企微Bot: 消息接收 → 验签 → 解析 → 调Agent → 回复 │
│ QQ Bot: 消息接收 → 验签 → 解析 → 调Agent → 回复 │
│ 微信Bot: 消息接收 → 验签 → 解析 → 调Agent → 回复 │
│ │
│ 问题: │
│ · 80%的逻辑重复 (去重、会话管理、Agent调用、错误处理) │
│ · 每个平台各自维护,Bug要修5次 │
│ · 新人接手困难,代码风格不统一 │
│ · 新增平台成本高(2000+行代码) │
└─────────────────────────────────────────────────────────┘
统一框架后:
┌─────────────────────────────────────────────────────────┐
│ 共享层 (AbstractBotRegistry + ChannelServiceSupport): │
│ · 消息去重 (Redis SETNX) │
│ · 会话管理 (Redis Session) │
│ · Agent路由与调用 │
│ · 错误处理与降级 │
│ · 限流与熔断 │
│ · 对话历史持久化 │
└─────────────────────────┬───────────────────────────────┘
│ (只需实现差异部分)
┌───────────┬───────┼───────┬───────────┐
▼ ▼ ▼ ▼ ▼
┌──────────┐┌──────────┐┌────┐┌──────────┐┌───────┐
│飞书: ││钉钉: ││QQ: ││企微: ││Voice: │
│流式卡片 ││Stream ││WS ││XML加密 ││STT/TTS│
│~200行 ││~150行 ││~180││~200行 ││~250行 │
└──────────┘└──────────┘└────┘└──────────┘└───────┘
新增平台: 只需 ~200行平台特有代码!
└─────────────────────────────────────────────────────────┘
使用模板方法模式定义Bot注册和消息处理的骨架,子类只需实现平台差异化的部分:
public abstract class AbstractBotRegistry<TBot, TEvent> {
// ===== 模板方法:定义处理骨架 =====
/** 处理入站消息的完整流程 (final=不可覆盖) */
public final void handleInboundMessage(TEvent event) {
// 1. 验证签名 (每个平台验签方式不同)
if (!verifySignature(event)) {
log.warn("签名验证失败: {}", extractEventId(event));
return;
}
// 2. 提取标准化消息
InboundMessage message = extractMessage(event);
// 3. 去重检查 (共享实现)
if (dedupeStore.isDuplicate(message.getMessageId())) {
log.debug("重复消息已忽略: {}", message.getMessageId());
return;
}
// 4. 获取/创建会话 (共享实现)
ChannelSession session = sessionStore.getOrCreate(
message.getChannelType(), message.getUserId());
// 5. 路由到Agent并执行
AgentResponse response = channelService.routeAndExecute(session, message);
// 6. 发送回复 (每个平台发送方式不同)
sendReply(event, response);
}
// ===== 抽象方法:子类必须实现 =====
/** 验证平台签名 */
protected abstract boolean verifySignature(TEvent event);
/** 从平台事件中提取标准消息 */
protected abstract InboundMessage extractMessage(TEvent event);
/** 发送回复到平台 */
protected abstract void sendReply(TEvent event, AgentResponse response);
/** 提取事件ID (用于日志) */
protected abstract String extractEventId(TEvent event);
// ===== 可选钩子:子类可覆盖 =====
/** 消息预处理 (如@机器人提取、指令解析) */
protected InboundMessage preProcess(InboundMessage msg) { return msg; }
/** 回复后处理 (如消息已读回执) */
protected void postReply(TEvent event) { /* no-op */ }
}
模板方法模式类图:
┌──────────────────────────────────────────┐
│ AbstractBotRegistry<TBot, TEvent> │
│ ─────────────────────────────────────── │
│ + handleInboundMessage(event) [final] │
│ # verifySignature(event) [abstract]│
│ # extractMessage(event) [abstract]│
│ # sendReply(event, response) [abstract]│
│ # preProcess(msg) [hook] │
│ # postReply(event) [hook] │
└───────────────────┬──────────────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│FeishuBot │ │DingTalkBot│ │QQBot │
│Registry │ │Registry │ │Registry │
└──────────┘ └──────────┘ └──────────┘
@Service
public class ChannelServiceSupport {
private final AgentRegistry agentRegistry;
private final ConversationService conversationService;
private final DistributedRateLimiter rateLimiter;
/**
* 路由消息到Agent并执行
* 所有通道共享此核心流程
*/
public AgentResponse routeAndExecute(ChannelSession session, InboundMessage msg) {
// 1. 限流检查
if (!rateLimiter.tryAcquire("channel:" + session.getUserId(), 30, 60)) {
return AgentResponse.rateLimited("请求过于频繁,请稍后再试");
}
// 2. 确定目标Agent (会话绑定 or 默认)
String agentId = session.getBoundAgentId() != null
? session.getBoundAgentId()
: resolveAgent(msg);
Agent agent = agentRegistry.getAgent(agentId);
// 3. 加载对话历史
List<Message> history = conversationService.getHistory(
session.getSessionId(), 20); // 最近20条
// 4. 构建请求
ChatRequest request = ChatRequest.builder()
.messages(history)
.userMessage(msg.getContent())
.sessionId(session.getSessionId())
.build();
// 5. 调用Agent
String reply = agent.chat(request)
.map(ChatResponse::getContent)
.collectList()
.map(chunks -> String.join("", chunks))
.block(Duration.ofSeconds(agent.getTimeout().getSeconds()));
// 6. 持久化对话
conversationService.saveExchange(session.getSessionId(), msg.getContent(), reply);
return AgentResponse.success(reply);
}
}
IM平台经常重复推送消息(网络超时重试、Webhook重发),必须做幂等去重:
@Component
public class RedisInboundDedupeStore implements DistributedDedupeStore {
private final StringRedisTemplate redis;
/**
* 检查消息是否重复
* 使用 SET key NX EX ttl 原子操作
*
* NX: 只在key不存在时设置 (Not eXists)
* EX: 设置过期时间
*
* @return true=重复消息(应丢弃), false=新消息(应处理)
*/
@Override
public boolean isDuplicate(String messageId) {
String key = "dedup:msg:" + messageId;
// SET NX EX: 原子操作,无竞态条件
Boolean result = redis.opsForValue()
.setIfAbsent(key, "1", Duration.ofMinutes(5));
// result=true → 首次设置成功 → 非重复
// result=false → key已存在 → 重复
return !Boolean.TRUE.equals(result);
}
/**
* 批量去重检查 (Pipeline优化)
*/
@Override
public Map<String, Boolean> batchCheck(List<String> messageIds) {
List<Object> results = redis.executePipelined((RedisCallback<Object>) conn -> {
for (String msgId : messageIds) {
conn.stringCommands().set(
("dedup:msg:" + msgId).getBytes(),
"1".getBytes(),
Expiration.seconds(300),
SetOption.ifAbsent()
);
}
return null;
});
// 组装结果...
}
}
去重时序图: 飞书服务器 我们的服务 Redis │ │ │ │── POST /webhook ──→│ │ │ msgId=abc123 │── SET dedup:msg:abc123 NX EX 300 ──→│ │ │ │── OK (首次) ─→│ │ │← 处理消息 │ │←── 200 OK ────────│ │ │ │ │ │── POST /webhook ──→│ (网络超时重发) │ │ msgId=abc123 │── SET dedup:msg:abc123 NX EX 300 ──→│ │ │ │── nil (已存在)│ │ │← 丢弃! 不重复处理 │ │←── 200 OK ────────│ │ │ │ │ 关键设计: · TTL = 5分钟: 平衡去重窗口和内存占用 · NX原子性: 即使并发请求也只有一个能处理 · Pipeline批量: 高吞吐场景减少网络往返
@Component
public class RedisChannelSessionStore implements DistributedSessionStore {
/**
* 会话数据结构 (Redis Hash):
* Key: session:{channelType}:{userId}
* Fields:
* sessionId → UUID
* agentId → 当前绑定的Agent
* createdAt → 创建时间
* lastActiveAt → 最后活跃时间
* context → 会话上下文JSON
*/
@Override
public ChannelSession getOrCreate(String channelType, String userId) {
String key = "session:" + channelType + ":" + userId;
// 尝试获取已有会话
Map<Object, Object> data = redis.opsForHash().entries(key);
if (data.isEmpty()) {
// 创建新会话
ChannelSession session = ChannelSession.builder()
.sessionId(UUID.randomUUID().toString())
.channelType(channelType)
.userId(userId)
.createdAt(Instant.now())
.build();
saveSession(key, session);
return session;
}
// 更新最后活跃时间
redis.opsForHash().put(key, "lastActiveAt", Instant.now().toString());
redis.expire(key, Duration.ofHours(2)); // 2小时无活动自动过期
return deserialize(data);
}
}
飞书流式卡片 (Streaming Card):
普通回复: 等Agent全部生成完 → 一次性发送(用户等很久)
流式卡片: Agent边生成边更新卡片内容(实时感)
实现原理:
┌────────────────────────────────────────────────────┐
│ 1. 先发送一张"思考中..."的卡片 → 拿到 messageId │
│ 2. Agent生成过程中,每500ms PATCH更新卡片内容 │
│ 3. Agent完成后,最终PATCH更新为完整内容 │
│ │
│ API调用: │
│ POST /messages → 创建卡片 (返回message_id) │
│ PATCH /messages/{id} → 更新卡片内容 (流式) │
│ │
│ 防抖优化: │
│ · 不是每个token都更新(太频繁会被限流) │
│ · 攒够50字符 或 超过500ms 才触发一次更新 │
│ · 最终完成时强制更新一次确保内容完整 │
└────────────────────────────────────────────────────┘
钉钉Stream (长连接模式): 传统Webhook: 钉钉 ──HTTP POST──→ 我们的服务器 (需要公网IP) Stream模式: 我们的服务器 ──WebSocket──→ 钉钉开放平台 (主动连接钉钉,无需公网IP和域名!) ┌─────────────────────────────────────────────┐ │ DingTalkStreamClient │ │ │ │ 1. 启动时注册Stream → 获得endpoint │ │ 2. WebSocket连接到endpoint │ │ 3. 接收推送消息 (JSON格式) │ │ 4. 处理后通过HTTP API回复 │ │ │ │ 优势: │ │ · 开发环境无需ngrok/内网穿透 │ │ · 无需SSL证书和域名 │ │ · 连接稳定,断线自动重连 │ └─────────────────────────────────────────────┘
QQ Bot WebSocket:
┌─────────────────────────────────────────────┐
│ QQBotWebSocketClient │
│ │
│ 1. 获取WebSocket网关地址 │
│ GET /gateway → wss://... │
│ │
│ 2. 建立WS连接 + 鉴权 │
│ IDENTIFY { token, intents, shard } │
│ │
│ 3. 维持心跳 (每41.25秒) │
│ HEARTBEAT { last_sequence } │
│ │
│ 4. 接收消息事件 │
│ DISPATCH { type: "MESSAGE_CREATE", ... } │
│ │
│ 5. 通过REST API回复 │
│ POST /channels/{id}/messages │
│ │
│ 断线重连: │
│ · 6/RECONNECT → 重新IDENTIFY │
│ · 网络异常 → 指数退避重连(1s→2s→4s...) │
└─────────────────────────────────────────────┘
企业微信消息处理: 特殊之处: 消息体是XML + AES加密 ┌────────────────────────────────────────────┐ │ 接收消息: │ │ 1. 验证URL签名 (SHA1(token+timestamp+nonce+encrypt)) │ │ 2. 解密消息体 (AES-256-CBC) │ │ 3. 解析XML → 提取MsgType/Content/FromUser │ │ │ │ 回复消息: │ │ 1. 构建回复XML │ │ 2. AES加密 │ │ 3. 生成签名 │ │ 4. 拼装加密XML响应体 │ │ │ │ 被动回复 vs 主动推送: │ │ · 被动回复: 5秒内必须响应(否则超时) │ │ · 超时处理: 先返回空串,再主动推送结果 │ └────────────────────────────────────────────┘
Voice通道处理流程:
┌────────┐ ┌─────────────┐ ┌───────────┐ ┌────────┐
│ 用户 │ │ VoiceChannel│ │ Agent │ │ 用户 │
│ 语音 │─────→│ Service │─────→│ (文本处理)│─────→│ 语音 │
│ 输入 │ │ │ │ │ │ 输出 │
└────────┘ └──────┬──────┘ └───────────┘ └────────┘
│
┌──────────┼──────────┐
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ STT (Speech→Text) │ │ TTS (Text→Speech) │
│ OpenAI Whisper │ │ OpenAI TTS-1 │
│ │ │ │
│ 支持格式: │ │ 支持音色: │
│ · WAV/MP3/OGG │ │ · alloy/echo/fable│
│ · 实时流式输入 │ │ · onyx/nova/shimmer│
│ · 多语言自动识别 │ │ · 流式音频输出 │
└───────────────────┘ └───────────────────┘
完整链路:
用户语音 → Whisper STT → 文本 → Agent处理 → 回复文本 → TTS-1 → 语音播放
优化:
· STT流式: 边录边转,减少等待时间
· TTS流式: 边生成边播放,首字节延迟<500ms
· 语音活动检测(VAD): 自动判断用户说完了
· 静音超时: 3秒无声自动结束录制
@Service
public class VoiceChannelService {
private final OpenAiAudioClient audioClient;
private final ChannelServiceSupport channelService;
/**
* 处理语音消息
*/
public Mono<byte[]> handleVoiceMessage(byte[] audioData, String userId) {
// 1. STT: 语音 → 文本
String transcript = audioClient.transcribe(
TranscriptionRequest.builder()
.audio(audioData)
.model("whisper-1")
.language("zh")
.build()
).getText();
// 2. 调用Agent处理文本
ChannelSession session = sessionStore.getOrCreate("voice", userId);
InboundMessage msg = InboundMessage.of(transcript, "voice", userId);
AgentResponse response = channelService.routeAndExecute(session, msg);
// 3. TTS: 文本 → 语音
byte[] speechAudio = audioClient.speech(
SpeechRequest.builder()
.model("tts-1")
.input(response.getContent())
.voice("nova") // 中文女声效果最好
.responseFormat("opus") // 压缩格式,带宽友好
.speed(1.0)
.build()
).getAudioData();
return Mono.just(speechAudio);
}
}
多通道统一架构全景:
┌─────────────────────────────────────────────────────────────────┐
│ 外部IM平台 │
│ 飞书 钉钉 QQ 企业微信 语音设备 │
└──┬─────────┬─────────┬─────────┬───────────┬───────────────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌────────────┐
│Feishu│ │Ding │ │QQ Bot│ │WeCom │ │Voice │
│Bot │ │Talk │ │WS │ │Bot │ │Channel │
│Ctrl │ │Stream│ │Client│ │Ctrl │ │Service │
└──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └─────┬──────┘
│ │ │ │ │
└─────────┴─────────┴─────────┴───────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ AbstractBotRegistry (模板方法) │
│ verifySignature → extractMessage → dedupe → route → reply │
└─────────────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ ChannelServiceSupport (共享流程) │
│ 限流 → 会话管理 → 历史加载 → Agent调用 → 持久化 │
└─────────────────────────────┬───────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────────┐
│ DedupeStore │ │ SessionStore │ │ AgentRegistry │
│ (Redis NX) │ │ (Redis Hash) │ │ (100+ Agents) │
└──────────────┘ └──────────────┘ └──────────────────┘
| 维度 | 飞书 | 钉钉 | 企业微信 | Voice | |
|---|---|---|---|---|---|
| 连接方式 | Webhook回调 | Stream长连接 | WebSocket | Webhook回调 | REST API |
| 验签方式 | SHA256+Encrypt | HMAC-SHA256 | Ed25519 | SHA1+AES | Bearer Token |
| 消息格式 | JSON | JSON | JSON | XML(加密) | Binary+JSON |
| 流式支持 | 卡片更新 | Markdown更新 | 不支持 | 不支持 | 流式音频 |
| 回复时限 | 无限制 | 无限制 | 5分钟 | 5秒(被动) | 无限制 |
| 需公网IP | 是 | 否(Stream) | 否(WS) | 是 | 是 |
| 特有代码量 | ~200行 | ~150行 | ~180行 | ~200行 | ~250行 |