飞书/钉钉/QQ 机器人 + 企微出站推送 + Voice 语音通道、AbstractBotRegistry模板方法、Redis去重、卡片化回复
没有统一框架时的痛点:
┌─────────────────────────────────────────────────────────┐
│ 每接入一个新平台,都要从头写一遍: │
│ │
│ 飞书Bot: 消息接收 → 验签 → 解析 → 调Agent → 回复 │
│ 钉钉Bot: 消息接收 → 验签 → 解析 → 调Agent → 回复 │
│ 企微Webhook: 主动发送 → Markdown/Text 消息推送 │
│ QQ Bot: 消息接收 → 验签 → 解析 → 调Agent → 回复 │
│ │
│ 问题: │
│ · 多数逻辑重复 (去重、会话管理、Agent调用、错误处理) │
│ · 每个平台各自维护,Bug要修多次 │
│ · 新人接手困难,代码风格不统一 │
│ · 新增平台成本高(2000+行代码) │
└─────────────────────────────────────────────────────────┘
统一框架后:
┌─────────────────────────────────────────────────────────┐
│ 共享层 (AbstractBotRegistry + ChannelServiceSupport): │
│ · 消息去重 (Redis SETNX) │
│ · 会话管理 (Redis Session) │
│ · Agent路由与调用 │
│ · 错误处理与降级 │
│ · 限流与熔断 │
│ · 对话历史持久化 │
└─────────────────────────┬───────────────────────────────┘
│ (只需实现差异部分)
┌───────────┬───────┼───────┬───────────┐
▼ ▼ ▼ ▼ ▼
┌──────────┐┌──────────┐┌────┐┌──────────┐┌───────┐
│飞书: ││钉钉: ││QQ: ││企微: ││Voice: │
│完成卡片 ││Stream ││WS ││出站推送 ││STT/TTS│
│~200行 ││~150行 ││~180││~200行 ││~250行 │
└──────────┘└──────────┘└────┘└──────────┘└───────┘
新增平台: 只需 ~200行平台特有代码!
└─────────────────────────────────────────────────────────┘
使用模板方法模式定义Bot注册和消息处理的骨架,子类只需实现平台差异化的部分:
public abstract class AbstractBotRegistry<TBot, TEvent> {
// ===== 模板方法:定义处理骨架 =====
/** 处理入站消息的完整流程 (final=不可覆盖) */
public final void handleInboundMessage(TEvent event) {
// 1. 验证签名 (每个平台验签方式不同)
if (!verifySignature(event)) {
log.warn("签名验证失败: {}", extractEventId(event));
return;
}
// 2. 提取标准化消息
InboundMessage message = extractMessage(event);
// 3. 去重检查 (共享实现)
if (dedupeStore.isDuplicate(message.getMessageId())) {
log.debug("重复消息已忽略: {}", message.getMessageId());
return;
}
// 4. 获取/创建会话 (共享实现)
ChannelSession session = sessionStore.getOrCreate(
message.getChannelType(), message.getUserId());
// 5. 路由到Agent并执行
AgentResponse response = channelService.routeAndExecute(session, message);
// 6. 发送回复 (每个平台发送方式不同)
sendReply(event, response);
}
// ===== 抽象方法:子类必须实现 =====
/** 验证平台签名 */
protected abstract boolean verifySignature(TEvent event);
/** 从平台事件中提取标准消息 */
protected abstract InboundMessage extractMessage(TEvent event);
/** 发送回复到平台 */
protected abstract void sendReply(TEvent event, AgentResponse response);
/** 提取事件ID (用于日志) */
protected abstract String extractEventId(TEvent event);
// ===== 可选钩子:子类可覆盖 =====
/** 消息预处理 (如@机器人提取、指令解析) */
protected InboundMessage preProcess(InboundMessage msg) { return msg; }
/** 回复后处理 (如消息已读回执) */
protected void postReply(TEvent event) { /* no-op */ }
}
模板方法模式类图:
┌──────────────────────────────────────────┐
│ AbstractBotRegistry<TBot, TEvent> │
│ ─────────────────────────────────────── │
│ + handleInboundMessage(event) [final] │
│ # verifySignature(event) [abstract]│
│ # extractMessage(event) [abstract]│
│ # sendReply(event, response) [abstract]│
│ # preProcess(msg) [hook] │
│ # postReply(event) [hook] │
└───────────────────┬──────────────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│FeishuBot │ │DingTalkBot│ │QQBot │
│Registry │ │Registry │ │Registry │
└──────────┘ └──────────┘ └──────────┘
@Service
public class ChannelServiceSupport {
private final AgentRegistry agentRegistry;
private final ConversationService conversationService;
private final DistributedRateLimiter rateLimiter;
/**
* 路由消息到Agent并执行
* 所有通道共享此核心流程
*/
public AgentResponse routeAndExecute(ChannelSession session, InboundMessage msg) {
// 1. 限流检查
if (!rateLimiter.tryAcquire("channel:" + session.getUserId(), 30, 60)) {
return AgentResponse.rateLimited("请求过于频繁,请稍后再试");
}
// 2. 确定目标Agent (会话绑定 or 默认)
String agentId = session.getBoundAgentId() != null
? session.getBoundAgentId()
: resolveAgent(msg);
Agent agent = agentRegistry.getAgent(agentId);
// 3. 加载对话历史
List<Message> history = conversationService.getHistory(
session.getSessionId(), 20); // 最近20条
// 4. 构建请求
ChatRequest request = ChatRequest.builder()
.messages(history)
.userMessage(msg.getContent())
.sessionId(session.getSessionId())
.build();
// 5. 调用Agent
String reply = agent.chat(request)
.map(ChatResponse::getContent)
.collectList()
.map(chunks -> String.join("", chunks))
.block(Duration.ofSeconds(agent.getTimeout().getSeconds()));
// 6. 持久化对话
conversationService.saveExchange(session.getSessionId(), msg.getContent(), reply);
return AgentResponse.success(reply);
}
}
IM平台经常重复推送消息(网络超时重试、Webhook重发),必须做幂等去重:
@Component
public class RedisInboundDedupeStore implements DistributedDedupeStore {
private final StringRedisTemplate redis;
/**
* 检查消息是否重复
* 使用 SET key NX EX ttl 原子操作
*
* NX: 只在key不存在时设置 (Not eXists)
* EX: 设置过期时间
*
* @return true=重复消息(应丢弃), false=新消息(应处理)
*/
@Override
public boolean isDuplicate(String messageId) {
String key = "dedup:msg:" + messageId;
// SET NX EX: 原子操作,无竞态条件
Boolean result = redis.opsForValue()
.setIfAbsent(key, "1", Duration.ofMinutes(5));
// result=true → 首次设置成功 → 非重复
// result=false → key已存在 → 重复
return !Boolean.TRUE.equals(result);
}
/**
* 批量去重检查 (Pipeline优化)
*/
@Override
public Map<String, Boolean> batchCheck(List<String> messageIds) {
List<Object> results = redis.executePipelined((RedisCallback<Object>) conn -> {
for (String msgId : messageIds) {
conn.stringCommands().set(
("dedup:msg:" + msgId).getBytes(),
"1".getBytes(),
Expiration.seconds(300),
SetOption.ifAbsent()
);
}
return null;
});
// 组装结果...
}
}
去重时序图: 飞书服务器 我们的服务 Redis │ │ │ │── POST /webhook ──→│ │ │ msgId=abc123 │── SET dedup:msg:abc123 NX EX 300 ──→│ │ │ │── OK (首次) ─→│ │ │← 处理消息 │ │←── 200 OK ────────│ │ │ │ │ │── POST /webhook ──→│ (网络超时重发) │ │ msgId=abc123 │── SET dedup:msg:abc123 NX EX 300 ──→│ │ │ │── nil (已存在)│ │ │← 丢弃! 不重复处理 │ │←── 200 OK ────────│ │ │ │ │ 关键设计: · TTL = 5分钟: 平衡去重窗口和内存占用 · NX原子性: 即使并发请求也只有一个能处理 · Pipeline批量: 高吞吐场景减少网络往返
@Component
public class RedisChannelSessionStore implements DistributedSessionStore {
/**
* 会话数据结构 (Redis Hash):
* Key: session:{channelType}:{userId}
* Fields:
* sessionId → UUID
* agentId → 当前绑定的Agent
* createdAt → 创建时间
* lastActiveAt → 最后活跃时间
* context → 会话上下文JSON
*/
@Override
public ChannelSession getOrCreate(String channelType, String userId) {
String key = "session:" + channelType + ":" + userId;
// 尝试获取已有会话
Map<Object, Object> data = redis.opsForHash().entries(key);
if (data.isEmpty()) {
// 创建新会话
ChannelSession session = ChannelSession.builder()
.sessionId(UUID.randomUUID().toString())
.channelType(channelType)
.userId(userId)
.createdAt(Instant.now())
.build();
saveSession(key, session);
return session;
}
// 更新最后活跃时间
redis.opsForHash().put(key, "lastActiveAt", Instant.now().toString());
redis.expire(key, Duration.ofHours(2)); // 2小时无活动自动过期
return deserialize(data);
}
}
飞书卡片化回复: 普通回复: 文本直接发送 当前实现: 先发"正在思考"文本 → 收集 Agent SSE → 发送结果卡片 实现原理: ┌────────────────────────────────────────────────────┐ │ 1. FeishuBotRegistry 启动 Lark WebSocket Client │ │ 2. FeishuChannelService 处理消息 / 菜单 / P2P事件 │ │ 3. ChannelServiceSupport 复用会话、历史、安全扫描 │ │ 4. streamCollect 聚合 Agent 响应 │ │ 5. 完成后发送结果卡片、来源和推荐追问 │ │ │ │ API调用: │ │ Lark WebSocket → 接收事件 │ │ POST /messages 或 reply → 发送文本/卡片 │ │ │ │ 当前边界: │ │ · 主链路不是逐 Token 卡片更新 │ │ · 没有每 500ms PATCH 同一张卡片 │ │ · 面试时讲卡片交互和通道复用,不讲流式卡片 │ └────────────────────────────────────────────────────┘
钉钉Stream (长连接模式): 传统Webhook: 钉钉 ──HTTP POST──→ 我们的服务器 (需要公网IP) Stream模式: 我们的服务器 ──WebSocket──→ 钉钉开放平台 (主动连接钉钉,无需公网IP和域名!) ┌─────────────────────────────────────────────┐ │ DingTalkStreamClient │ │ │ │ 1. 启动时注册Stream → 获得endpoint │ │ 2. WebSocket连接到endpoint │ │ 3. 接收推送消息 (JSON格式) │ │ 4. 处理后通过HTTP API回复 │ │ │ │ 优势: │ │ · 开发环境无需ngrok/内网穿透 │ │ · 无需SSL证书和域名 │ │ · 连接稳定,断线自动重连 │ └─────────────────────────────────────────────┘
QQ Bot WebSocket:
┌─────────────────────────────────────────────┐
│ QQBotWebSocketClient │
│ │
│ 1. 获取WebSocket网关地址 │
│ GET /gateway → wss://... │
│ │
│ 2. 建立WS连接 + 鉴权 │
│ IDENTIFY { token, intents, shard } │
│ │
│ 3. 维持心跳 (每41.25秒) │
│ HEARTBEAT { last_sequence } │
│ │
│ 4. 接收消息事件 │
│ DISPATCH { type: "MESSAGE_CREATE", ... } │
│ │
│ 5. 通过REST API回复 │
│ POST /channels/{id}/messages │
│ │
│ 断线重连: │
│ · 6/RECONNECT → 重新IDENTIFY │
│ · 网络异常 → 指数退避重连(1s→2s→4s...) │
└─────────────────────────────────────────────┘
企业微信发送处理: 当前实现以管理端主动发送能力为主(outbound-only) ┌────────────────────────────────────────────┐ │ 1. 管理端配置 webhookKey │ │ 2. 选择 text / markdown 消息类型 │ │ 3. 调用企业微信群机器人 Webhook │ │ 4. 记录审计日志与运行态状态 │ │ │ │ 说明: │ │ · 不提供入站会话 │ │ · 不进入 Agent 对话路由 │ │ · 主要用于通知/主动推送类场景 │ └────────────────────────────────────────────┘
Voice通道处理流程:
┌────────┐ ┌─────────────┐ ┌───────────┐ ┌────────┐
│ 用户 │ │ VoiceChannel│ │ Agent │ │ 用户 │
│ 语音 │─────→│ Controller │─────→│ (文本处理)│─────→│ 语音 │
│ 输入 │ │ │ │ │ │ 输出 │
└────────┘ └──────┬──────┘ └───────────┘ └────────┘
│
┌──────────┼──────────┐
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ ASR (Speech→Text) │ │ TTS (Text→Speech) │
│ OpenAI兼容接口 │ │ OpenAI兼容接口 │
│ │ │ │
│ 支持格式: │ │ 支持音色: │
│ · multipart上传 │ │ · 同步合成音频 │
│ · wav/mp3等格式 │ │ · 音色可配置 │
│ · 同步转写 │ │ · 格式可配置 │
└───────────────────┘ └───────────────────┘
完整链路:
用户语音 → ASR → 文本 → Agent处理 → 回复文本 → TTS → 语音播放
当前边界:
· HTTP multipart 上传音频,非流式 ASR
· Agent 回复聚合为文本后再同步 TTS
· 流式 ASR/TTS、VAD、WebSocket 双工会话属于后续优化
POST /api/voice/chat audio = MultipartFile agentId = chat outputMode = text | audio VoiceChannelController 1. SpeechToTextService.recognize(...) 2. ChannelServiceSupport.executeAgent(...) 3. streamCollect(...) 聚合 Agent 回复 4. TextToSpeechService.synthesize(...) 可选返回音频
多通道统一架构全景:
┌─────────────────────────────────────────────────────────────────┐
│ 外部接入平台 │
│ 飞书 钉钉 QQ 企业微信出站 语音设备 │
└──┬─────────┬─────────┬─────────┬───────────┬───────────────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌────────────┐
│Feishu│ │Ding │ │QQ Bot│ │WeCom │ │Voice │
│Bot │ │Talk │ │WS │ │Webhook│ │Channel │
│Ctrl │ │Stream│ │Client│ │Send │ │Controller │
└──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └─────┬──────┘
│ │ │ │ │
└─────────┴─────────┴─────────┴───────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ AbstractBotRegistry (机器人生命周期,不含 Voice) │
│ 多 Bot 配置加载 → start/stop → runtime status → 审计辅助 │
└─────────────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ ChannelServiceSupport (机器人共享流程) │
│ 去重 → 会话管理 → 历史加载 → 安全扫描 → Agent调用 → 持久化 │
└─────────────────────────────┬───────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────────┐
│ DedupeStore │ │ SessionStore │ │ AgentRegistry │
│ (Redis NX) │ │ (Redis Hash) │ │ (100+ Agents) │
└──────────────┘ └──────────────┘ └──────────────────┘
VoiceChannelController 单独提供 REST 语音入口:
HTTP multipart → ASR → Agent 单轮调用 → 可选同步 TTS
| 维度 | 飞书 | 钉钉 | 企业微信 | Voice | |
|---|---|---|---|---|---|
| 连接方式 | WebSocket长连接 | Stream长连接 | WebSocket | Webhook主动发送 | REST API |
| 验签方式 | SHA256+Encrypt | HMAC-SHA256 | Ed25519 | Webhook Key | Bearer Token |
| 消息格式 | JSON | JSON | JSON | JSON(text/markdown) | Binary+JSON |
| 流式支持 | 思考态+完成卡片 | Markdown更新 | 不支持 | 不支持 | 同步音频 |
| 回复时限 | 无限制 | 无限制 | 5分钟 | 出站-only | 无限制 |
| 需公网IP | 是 | 否(Stream) | 否(WS) | 是 | 是 |
| 特有代码量 | ~200行 | ~150行 | ~180行 | ~200行 | ~250行 |