从"等半天"到"实时打字"——AI流式响应背后的技术原理
传统请求 vs 流式响应: 传统HTTP请求: ┌──────────────────────────────────────────────┐ │ 用户: "写一篇关于AI的文章" │ │ │ │ [等待... 等待... 等待... 30秒过去了] │ │ │ │ AI: "人工智能(Artificial Intelligence)是..." │ │ ← 用户一次性收到全部内容 │ │ │ │ 问题: │ │ · 30秒白屏,用户以为卡死了 │ │ · 长回答可能要等1-2分钟 │ │ · 用户体验极差 │ └──────────────────────────────────────────────┘ 流式响应 (SSE): ┌──────────────────────────────────────────────┐ │ 用户: "写一篇关于AI的文章" │ │ │ │ [0.5s] AI: "人工" │ │ [0.6s] AI: "智能" │ │ [0.7s] AI: "(Art" │ │ [0.8s] AI: "ificial" │ │ [0.9s] AI: " Intelligence" │ │ [1.0s] AI: ")是" │ │ ...逐字输出,像打字一样... │ │ │ │ 优势: │ │ · 用户立即看到AI在"思考" │ │ · 感觉更快(即使总时间一样) │ │ · 可以随时停止生成 │ └──────────────────────────────────────────────┘
| 技术 | 方向 | 协议 | 适用场景 | 本项目使用 |
|---|---|---|---|---|
| SSE | 服务端→客户端 | HTTP | AI流式响应、通知推送 | 主要技术 |
| WebSocket | 双向 | WS | 聊天室、实时游戏 | 未使用 |
| 长轮询 | 客户端→服务端 | HTTP | 兼容性要求高的场景 | 未使用 |
| gRPC Streaming | 双向 | HTTP/2 | 微服务间通信 | 未使用 |
SSE (Server-Sent Events) 协议格式:
HTTP Response:
┌──────────────────────────────────────────┐
│ HTTP/1.1 200 OK │
│ Content-Type: text/event-stream ← 关键│
│ Cache-Control: no-cache │
│ Connection: keep-alive │
│ │
│ ↓ 以下是流式数据 ↓ │
│ │
│ event: react ← 事件类型│
│ data: {"content":"我来帮你分析"} ← 数据 │
│ │
│ event: tool │
│ data: {"name":"search","status":"running"}│
│ │
│ event: final_answer │
│ data: {"content":"分析结果如下..."} │
│ │
│ event: done │
│ data: {} │
└──────────────────────────────────────────┘
规则:
· 每个事件以空行(\n\n)分隔
· event: 指定事件类型
· data: 携带数据(JSON格式)
· 连接保持直到服务端关闭
后端SSE流式实现:
Controller层:
@PostMapping("/conversations/{id}/messages")
public Flux<ServerSentEvent<String>> sendMessage(...) {
return orchestrator.execute(agent, request);
// 返回Flux流,Spring自动转为SSE格式
}
编排器包装:
┌──────────────────────────────────────────┐
│ AgentTaskOrchestrator.executeWithRetry() │
│ │
│ // 创建客户端SSE管道 │
│ Sinks.Many<ServerSentEvent> clientSink │
│ │
│ // 原始Agent返回的流 │
│ agent.chat(request) → Flux<SSE> source │
│ │
│ // 包装: 添加超时、取消、重试逻辑 │
│ source.subscribeWithProtection( │
│ · 活动超时: 每个事件重置计时器 │
│ · 硬超时: 3x活动超时强制终止 │
│ · 取消检测: 500ms轮询分布式取消标记 │
│ ) │
└──────────────────────────────────────────┘
前端SSE接收实现:
const response = await fetch(url, options);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 按SSE协议解析
const parts = buffer.split('\n\n');
buffer = parts.pop(); // 保留不完整的部分
for (const part of parts) {
let eventType = 'message';
let data = '';
for (const line of part.split('\n')) {
if (line.startsWith('event: ')) eventType = line.slice(7);
if (line.startsWith('data: ')) data = line.slice(6);
}
// 根据事件类型渲染
switch (eventType) {
case 'react': appendText(data); break;
case 'tool': showToolPanel(data); break;
case 'thinking': showThinkingBlock(data); break;
case 'final_answer': appendFinalAnswer(data); break;
case 'done': finishChat(); break;
}
}
}
飞书流式卡片 (2秒缓冲刷新): 挑战: 飞书不支持SSE,只支持卡片消息 方案: 用Flux.buffer()批量更新卡片 ┌──────────────────────────────────────────────┐ │ Flux流 → buffer(Duration.ofSeconds(2)) │ │ │ │ 0s: 创建卡片 "思考中..." │ │ 2s: 更新卡片 "我来帮你分析..." │ │ 4s: 更新卡片 "首先看你的教育背景..." │ │ 6s: 更新卡片 "其次,你的项目经验..." │ │ 完成: 最终卡片 + 推荐追问气泡 │ │ │ │ 技术细节: │ │ · 使用飞书卡片API的PATCH方法更新 │ │ · 2秒窗口批量聚合,减少API调用90% │ │ · 比逐Token更新更省资源 │ │ · 仍然给用户"实时生成"的感觉 │ └──────────────────────────────────────────────┘