18 - 流式通信与SSE技术

从"等半天"到"实时打字"——AI流式响应背后的技术原理

一、为什么AI回复需要"流式"?

  传统请求 vs 流式响应:

  传统HTTP请求:
  ┌──────────────────────────────────────────────┐
  │ 用户: "写一篇关于AI的文章"                     │
  │                                              │
  │ [等待... 等待... 等待... 30秒过去了]          │
  │                                              │
  │ AI: "人工智能(Artificial Intelligence)是..."  │
  │     ← 用户一次性收到全部内容                  │
  │                                              │
  │ 问题:                                        │
  │ · 30秒白屏,用户以为卡死了                   │
  │ · 长回答可能要等1-2分钟                      │
  │ · 用户体验极差                               │
  └──────────────────────────────────────────────┘

  流式响应 (SSE):
  ┌──────────────────────────────────────────────┐
  │ 用户: "写一篇关于AI的文章"                     │
  │                                              │
  │ [0.5s] AI: "人工"                             │
  │ [0.6s] AI: "智能"                             │
  │ [0.7s] AI: "(Art"                             │
  │ [0.8s] AI: "ificial"                          │
  │ [0.9s] AI: " Intelligence"                    │
  │ [1.0s] AI: ")是"                              │
  │ ...逐字输出,像打字一样...                    │
  │                                              │
  │ 优势:                                        │
  │ · 用户立即看到AI在"思考"                      │
  │ · 感觉更快(即使总时间一样)                  │
  │ · 可以随时停止生成                            │
  └──────────────────────────────────────────────┘

二、实时通信技术对比

技术方向协议适用场景本项目使用
SSE服务端→客户端HTTPAI流式响应、通知推送主要技术
WebSocket双向WS聊天室、实时游戏未使用
长轮询客户端→服务端HTTP兼容性要求高的场景未使用
gRPC Streaming双向HTTP/2微服务间通信未使用

为什么本项目选择SSE而不是WebSocket?

三、SSE协议详解

SSE (Server-Sent Events) 协议格式:

  HTTP Response:
  ┌──────────────────────────────────────────┐
  │ HTTP/1.1 200 OK                          │
  │ Content-Type: text/event-stream    ← 关键│
  │ Cache-Control: no-cache                  │
  │ Connection: keep-alive                    │
  │                                          │
  │ ↓ 以下是流式数据 ↓                       │
  │                                          │
  │ event: react                        ← 事件类型│
  │ data: {"content":"我来帮你分析"}     ← 数据 │
  │                                          │
  │ event: tool                              │
  │ data: {"name":"search","status":"running"}│
  │                                          │
  │ event: final_answer                      │
  │ data: {"content":"分析结果如下..."}       │
  │                                          │
  │ event: done                               │
  │ data: {}                                  │
  └──────────────────────────────────────────┘

  规则:
  · 每个事件以空行(\n\n)分隔
  · event: 指定事件类型
  · data: 携带数据(JSON格式)
  · 连接保持直到服务端关闭

四、本项目的SSE实现

后端:Spring WebFlux + Flux

后端SSE流式实现:

  Controller层:
  @PostMapping("/conversations/{id}/messages")
  public Flux<ServerSentEvent<String>> sendMessage(...) {
      return orchestrator.execute(agent, request);
      // 返回Flux流,Spring自动转为SSE格式
  }

  编排器包装:
  ┌──────────────────────────────────────────┐
  │ AgentTaskOrchestrator.executeWithRetry()  │
  │                                          │
  │  // 创建客户端SSE管道                    │
  │  Sinks.Many<ServerSentEvent> clientSink  │
  │                                          │
  │  // 原始Agent返回的流                    │
  │  agent.chat(request) → Flux<SSE> source  │
  │                                          │
  │  // 包装: 添加超时、取消、重试逻辑       │
  │  source.subscribeWithProtection(         │
  │    · 活动超时: 每个事件重置计时器        │
  │    · 硬超时: 3x活动超时强制终止          │
  │    · 取消检测: 500ms轮询分布式取消标记   │
  │  )                                       │
  └──────────────────────────────────────────┘

前端:fetch + ReadableStream

前端SSE接收实现:

  const response = await fetch(url, options);
  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  let buffer = '';
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // 按SSE协议解析
    const parts = buffer.split('\n\n');
    buffer = parts.pop(); // 保留不完整的部分

    for (const part of parts) {
      let eventType = 'message';
      let data = '';

      for (const line of part.split('\n')) {
        if (line.startsWith('event: ')) eventType = line.slice(7);
        if (line.startsWith('data: ')) data = line.slice(6);
      }

      // 根据事件类型渲染
      switch (eventType) {
        case 'react':          appendText(data); break;
        case 'tool':           showToolPanel(data); break;
        case 'thinking':       showThinkingBlock(data); break;
        case 'final_answer':   appendFinalAnswer(data); break;
        case 'done':           finishChat(); break;
      }
    }
  }

五、飞书的特殊流式实现

飞书流式卡片 (2秒缓冲刷新):

  挑战: 飞书不支持SSE,只支持卡片消息
  方案: 用Flux.buffer()批量更新卡片

  ┌──────────────────────────────────────────────┐
  │ Flux流 → buffer(Duration.ofSeconds(2))       │
  │                                              │
  │ 0s: 创建卡片 "思考中..."                     │
  │ 2s: 更新卡片 "我来帮你分析..."               │
  │ 4s: 更新卡片 "首先看你的教育背景..."         │
  │ 6s: 更新卡片 "其次,你的项目经验..."         │
  │ 完成: 最终卡片 + 推荐追问气泡               │
  │                                              │
  │ 技术细节:                                    │
  │ · 使用飞书卡片API的PATCH方法更新             │
  │ · 2秒窗口批量聚合,减少API调用90%           │
  │ · 比逐Token更新更省资源                      │
  │ · 仍然给用户"实时生成"的感觉                │
  └──────────────────────────────────────────────┘

六、面试高频问题

Q: SSE和WebSocket有什么区别?
A: SSE是单向(服务端→客户端)、基于HTTP、文本协议;WebSocket是双向、独立协议(WSS)、支持二进制。AI对话场景只需要服务端向客户端推送数据,SSE更简单:1) 不需要额外的握手和状态管理;2) 天然兼容HTTP基础设施(代理、CDN、LB);3) 浏览器原生支持自动重连。WebSocket更适合聊天室、游戏等双向实时场景。
Q: 流式响应的超时怎么处理?
A: 本项目采用双层超时机制:1) 活动超时(默认120s):每收到一个SSE事件就重置计时器。ReAct循环中工具调用可能很慢,但只要LLM还在返回事件,就说明"活着";2) 硬超时(3倍活动超时):无论如何超过这个时间就强制终止,防止真正的死循环。此外还有maxSteps=64防止Graph死循环。