30 - 流式输出与结构化响应

SSE 流式推送机制、StreamOutputType 分类、结构化输出 Schema 注入与前端渲染适配

一、流式输出(SSE)机制

Server-Sent Events 架构

Agent Hub 使用 SSE(Server-Sent Events) 实现服务端到客户端的实时流式推送。SSE 基于标准 HTTP 协议,天然兼容负载均衡、CDN 和 Nginx 反向代理。

  SSE 流式输出架构:

  Client (Browser)                          Server (Agent Hub)
       │                                         │
       │  POST /api/chat (发起对话)               │
       │ ────────────────────────────────────────▶│
       │                                         │
       │  HTTP 200 (Content-Type: text/event-stream)
       │ ◀────────────────────────────────────────│
       │                                         │
       │  event: text                            │
       │  data: {"content": "根据"}              │
       │ ◀────────────────────────────────────────│
       │                                         │
       │  event: text                            │
       │  data: {"content": "分析"}              │
       │ ◀────────────────────────────────────────│
       │                                         │
       │  event: tool_call                       │
       │  data: {"name": "web_search", ...}      │
       │ ◀────────────────────────────────────────│
       │                                         │
       │  event: progress                        │
       │  data: {"step": 3, "total": 5}          │
       │ ◀────────────────────────────────────────│
       │                                         │
       │  event: done                            │
       │  data: {"token_usage": 1234}            │
       │ ◀────────────────────────────────────────│

事件类型(Event Types)

Event Type说明触发时机
text文本内容片段LLM 每生成一个 Token
tool_call工具调用信息Agent 决定调用工具时
task_decomp任务分解多 Agent 编排时的子任务拆分
sub_agent子 Agent 执行子 Agent 开始/完成执行
progress进度更新长时间任务的步骤进度
hitl_pending人机协同等待需要人类审批时
done完成标记整个响应结束时

SseFlushFilter

SseFlushFilter 是一个 Servlet Filter,确保 SSE 事件立即推送到客户端而非被缓冲区积压:

// SseFlushFilter — 自动 flush SSE 响应缓冲区
@Component
public class SseFlushFilter implements Filter {
    @Override
    public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain)
            throws IOException, ServletException {
        HttpServletResponse response = (HttpServletResponse) res;
        String accept = ((HttpServletRequest) req).getHeader("Accept");

        if ("text/event-stream".equals(accept)) {
            // 包装 response,每次 write 后自动 flush
            chain.doFilter(req, new AutoFlushResponseWrapper(response));
        } else {
            chain.doFilter(req, res);
        }
    }
}

超时配置

SSE 连接超时:600 秒(10 分钟)。覆盖 99% 场景(大部分 Agent 10-60 秒完成)。超长任务切换到 BackgroundTask 异步模式,SSE 仅推送 progress 事件。Keep-alive:每 30 秒发送空 comment(: keepalive)防止代理/CDN 超时断开。

二、StreamOutputType 分类

输出类型与前端渲染映射

系统根据 LLM 输出内容自动分类 StreamOutputType,前端据此选择对应的渲染组件:

TypeDescriptionFrontend Rendering
TEXT普通文本内容Markdown 渲染
CODE代码块语法高亮 + 复制按钮
TABLE结构化表格数据表格组件
CHART可视化数据ECharts 图表
IMAGE图片 URL / base64图片展示
FILE文件引用下载链接

类型识别机制

  StreamOutputType 识别流程:

  LLM 输出片段
       │
       ▼
  ┌─────────────────────────────────────────────────────┐
  │ OutputTypeClassifier                                 │
  │                                                     │
  │   ├── 检测 ```language → CODE                       │
  │   ├── 检测 | col | col | 格式 → TABLE              │
  │   ├── 检测 {"chart": ...} JSON → CHART              │
  │   ├── 检测 ![alt](url) 或 data:image → IMAGE       │
  │   ├── 检测 [file](url) 下载链接 → FILE              │
  │   └── 默认 → TEXT                                   │
  │                                                     │
  │   每个 SSE event 携带 outputType 字段               │
  │   data: {"content": "...", "outputType": "CODE"}    │
  └─────────────────────────────────────────────────────┘

TypedStreamEvent — 事件源分类(已接入)

除了按内容类型分类,系统还支持按事件源分类 SSE 事件。TypedStreamEvent 将每个 SSE 事件包装为带输出类型的事件,方便前端按来源差异化渲染:

StreamOutputTypeSSE event name来源前端推荐渲染
MODEL_OUTPUTmodel_outputLLM 生成文本Markdown 渲染 + 打字机效果
TOOL_OUTPUTtool_output工具调用过程和结果折叠面板 + 工具状态指示器
HOOK_OUTPUThook_outputHook 通知/提醒灰色系统消息,可折叠
GRAPH_OUTPUTgraph_outputGraph 节点状态变更流程图/状态面板
PLAN_OUTPUTplan_outputPlan Mode 方案/步骤Checklist + 进度条
CONTROLcontroldone/sources 等控制信号不展示,触发前端逻辑

启用方式:前端在发送消息时传 typedSse: true,SSE 事件格式变为:

event: tool_output
data: {"outputType":"TOOL_OUTPUT","originalEvent":"tool","payload":{...}}

不传 typedSse 或传 false 时保持原有 SSE 格式(完全向后兼容)。

三、结构化输出(Structured Output)

核心组件

结构化输出确保 LLM 返回符合预定义 Schema 的 JSON 数据,而非自由文本。

组件职责
OutputSchemaInjector将 JSON Schema 注入到 Prompt 中,引导 LLM 输出格式
StructuredOutputConfig定义期望的输出格式(Java Record / JSON Schema)
StructuredOutputParser解析 LLM 返回的 JSON,校验是否符合 Schema

执行流程

  结构化输出完整流程:

  ┌─────────────────────────────────────────────────────┐
  │ ① Define Schema                                     │
  │   @StructuredOutput 注解 或 JSON Schema 配置        │
  │                                                     │
  │   @StructuredOutput(schema = "research_report")     │
  │   public record ResearchReport(                     │
  │       String title,                                 │
  │       List<Finding> findings,                       │
  │       String conclusion                             │
  │   ) {}                                              │
  └──────────────────┬──────────────────────────────────┘
                     ▼
  ┌─────────────────────────────────────────────────────┐
  │ ② Inject into Prompt                                │
  │   OutputSchemaInjector 将 Schema 注入 system prompt │
  │                                                     │
  │   "请严格按以下 JSON 格式返回结果:                  │
  │   {                                                 │
  │     "title": "string - 报告标题",                   │
  │     "findings": [{"topic": "...", "detail": "..."}],│
  │     "conclusion": "string - 总结结论"               │
  │   }"                                                │
  └──────────────────┬──────────────────────────────────┘
                     ▼
  ┌─────────────────────────────────────────────────────┐
  │ ③ LLM Generates JSON                                │
  │   模型根据 Schema 提示生成结构化 JSON               │
  │   支持 native JSON mode (GPT-4o response_format)    │
  └──────────────────┬──────────────────────────────────┘
                     ▼
  ┌─────────────────────────────────────────────────────┐
  │ ④ Parse & Validate                                  │
  │   StructuredOutputParser 解析 JSON                   │
  │   ├── 格式校验(是否合法 JSON)                     │
  │   ├── Schema 校验(字段类型、必填项)               │
  │   └── 反序列化为 Java Record                        │
  └─────────────────────────────────────────────────────┘

代码示例

// 定义结构化输出格式
@StructuredOutput(schema = "research_report")
public record ResearchReport(
    String title,
    List<Finding> findings,
    String conclusion
) {}

public record Finding(
    String topic,
    String detail,
    double confidence
) {}

// Agent 使用结构化输出
@Component
public class ResearchAgent extends AbstractLlmAgent {

    @Override
    protected StructuredOutputConfig getOutputConfig() {
        return StructuredOutputConfig.builder()
            .outputClass(ResearchReport.class)
            .injectionStrategy(InjectionStrategy.SYSTEM_PROMPT)
            .retryOnParseFailure(true)
            .maxRetries(2)
            .build();
    }
}

API 接入(前端请求体传递)

前端可通过 /api/conversations/{id}/messages 请求体中的 structuredOutput 字段动态控制输出格式:

POST /api/conversations/{id}/messages
{
  "message": "分析上个月的销售数据",
  "structuredOutput": {
    "outputFormat": "JSON",
    "outputDescription": "返回包含 sales 和 trend 的 JSON",
    "exampleOutput": "{\"sales\": 12345, \"trend\": \"上升\"}",
    "requiredFields": ["sales", "trend"],
    "outputSchema": "{...JSON Schema...}"
  }
}
字段类型说明
outputFormatStringJSON / YAML / TEXT
outputDescriptionString输出描述,注入 prompt 帮助 LLM 理解格式要求
exampleOutputString示例输出,作为 few-shot 示例
requiredFieldsList必填字段,解析后校验
outputSchemaStringJSON Schema 定义

OutputSchemaInjector.inject() 自动将这些配置注入 system prompt(description → "说明:"、requiredFields → "必填字段:"、exampleOutput → "示例输出:"),LLM 据此输出格式化内容。

四、Schema 注入策略

注入位置选择

策略注入位置优点缺点
SYSTEM_PROMPT系统提示词末尾不占用用户消息位,Schema 持久生效较长 Schema 可能稀释系统提示词效果
USER_MESSAGE用户消息中追加每次请求可动态调整 Schema每轮都需注入,Token 开销更高
NATIVE_JSON_MODEAPI 参数 response_format最可靠,模型原生支持仅部分模型支持(GPT-4o、Claude 3.5+)

复杂 Schema 处理

解析失败重试

  JSON 解析失败重试策略(3 层):

  LLM 原始输出
       │
       ▼
  ┌─────────────────────────────────────────────────────┐
  │ 第 1 层:宽松解析                                    │
  │   允许 trailing comma、单引号、注释                  │
  │   成功 → 返回结果                                    │
  │   失败 ↓                                            │
  ├─────────────────────────────────────────────────────┤
  │ 第 2 层:提取修复                                    │
  │   正则提取 ```json ... ``` 块                       │
  │   去除 Markdown 包裹                                │
  │   成功 → 返回结果                                    │
  │   失败 ↓                                            │
  ├─────────────────────────────────────────────────────┤
  │ 第 3 层:重试(最多 2 次)                           │
  │   附加错误信息让 LLM 修正:                         │
  │   "上次输出的 JSON 格式错误:{error},请修正"       │
  │   成功 → 返回结果                                    │
  │   失败 ↓                                            │
  ├─────────────────────────────────────────────────────┤
  │ 降级:纯文本返回                                     │
  │   记录 WARN 日志,原文作为 TEXT 类型返回             │
  └─────────────────────────────────────────────────────┘

五、前端渲染适配

能力驱动 UI(Capability-Driven UI)

前端根据 SSE 事件中的 outputType 字段动态选择渲染组件,无需硬编码展示逻辑:

  前端渲染组件映射:

  SSE Event (outputType)
       │
       ▼
  ┌─────────────────────────────────────────────────────┐
  │ RenderRouter                                         │
  │                                                     │
  │   TEXT    → <MarkdownRenderer />                    │
  │             流式增量渲染,状态机跟踪代码块/列表      │
  │                                                     │
  │   CODE    → <CodeBlock language={lang} />           │
  │             Prism.js 语法高亮 + 一键复制             │
  │                                                     │
  │   TABLE   → <DataTable columns={...} rows={...} /> │
  │             可排序、可筛选的表格组件                  │
  │                                                     │
  │   CHART   → <EChartsRenderer option={...} />       │
  │             自动适配图表类型(柱状/折线/饼图)       │
  │                                                     │
  │   IMAGE   → <ImageViewer src={url} />              │
  │             懒加载 + 点击放大                        │
  │                                                     │
  │   FILE    → <FileDownload href={url} name={...} /> │
  │             文件名 + 大小 + 下载按钮                 │
  │                                                     │
  │   tool_call → <ToolCallCard name={...} args={...}/>│
  │               工具名 + 参数折叠展示                  │
  └─────────────────────────────────────────────────────┘

流式 Markdown 渲染

流式渲染的核心挑战:Token 逐个到达,Markdown 结构可能被截断(如代码块 ``` 只到达了前半部分)。

解决方案:状态机渲染器。维护当前渲染状态(是否在代码块内、列表内、表格内),buffer 不完整的块直到结束标记到达。每次只 append 新内容,不 re-render 全部,保证性能。

六、配置参考

# application.yml — 流式输出与结构化响应配置

# SSE 流式输出配置
agent:
  sse:
    timeout: 600s                 # SSE 连接超时(默认 600 秒)
    keepalive-interval: 30s       # Keep-alive 心跳间隔
    buffer-size: 8192             # 响应缓冲区大小
    retry-interval: 3000          # 客户端重连间隔(毫秒)

# 结构化输出配置
agent:
  structured-output:
    enabled: true
    default-injection: SYSTEM_PROMPT   # 默认 Schema 注入位置
    max-retries: 2                      # 解析失败最大重试次数
    lenient-parsing: true               # 允许宽松 JSON 解析
    native-json-mode:
      enabled: true                     # 支持的模型使用原生 JSON mode
      supported-models:
        - gpt-4o
        - gpt-4o-mini
        - claude-3.5-sonnet

# StreamOutputType 配置
agent:
  output-type:
    auto-detect: true              # 自动检测输出类型
    default-type: TEXT             # 默认输出类型
    chart:
      engine: echarts              # 图表引擎
      max-data-points: 1000       # 最大数据点数

七、面试高频问题

Q: SSE 和 WebSocket 的区别?为什么 Agent Hub 选择 SSE 而非 WebSocket?
A: SSE 单向(服务端 → 客户端),基于 HTTP,天然兼容负载均衡 / CDN / Nginx 反向代理,无需 sticky session。WebSocket 双向但运维复杂(需要 sticky session、特殊的负载均衡配置)。Agent 场景:用户发送请求后等待流式响应 = 单向推送足够。用户不需要在等待过程中实时推送数据给服务端。SSE 还支持自动重连(EventSource API 内建 retry),而 WebSocket 断线需手动实现重连逻辑。
Q: 流式输出时如何保证 Markdown 渲染的正确性?代码块可能被截断?
A: 状态机渲染器:跟踪当前是否在代码块内(``` 计数奇偶判定)、列表内、表格内。buffer 当前不完整的块直到结束标记到达。增量渲染:只 append 新内容,不 re-render 全部 DOM。具体策略:① 代码块:遇到 ``` 开始 buffer,遇到闭合 ``` 才整体渲染;② 表格:buffer 至少一行完整行才渲染;③ 内联样式(加粗、斜体):检测 ** 配对状态。
Q: StructuredOutput 的 JSON Schema 注入会占用多少 Token?如何优化?
A: 典型 Schema 200-500 tokens。优化策略:1. 简化 description(仅保留必要说明);2. 移除 optional 字段的详细说明;3. 使用 $ref 引用避免重复子结构定义;4. 对 GPT-4o 等支持原生 JSON mode 的模型直接使用 response_format 参数而非 Prompt 注入,节省 Schema Token 且格式更可靠。
Q: 如果 LLM 生成的 JSON 格式错误,重试策略是什么?
A: 3 层策略:1. 宽松解析(允许 trailing comma、单引号、注释等非标准 JSON);2. 提取修复(正则提取 ```json 代码块,去除 Markdown 包裹);3. 重试(附加错误信息让 LLM 修正:"上次输出 JSON 格式错误:{具体错误},请修正",最多重试 2 次)。超过重试次数:降级为纯文本返回,记录 WARN 日志,不阻断用户体验。
Q: SSE 连接断开后(网络波动),如何恢复?
A: EventSource API 自带重连机制(retry 字段控制重连间隔,默认 3 秒)。服务端:每个 event 附带 id(递增序号),客户端重连时自动发送 Last-Event-ID header。服务端从该位置续传,避免重复推送。Agent Hub 实现:checkpoint 最后发送的事件 ID,支持从断点续传。极端情况(服务端重启):客户端检测到重连失败,提示用户重新发送。
Q: 600 秒超时适合所有场景吗?复杂研究任务可能更长?
A: 600s 覆盖 99% 场景(大部分 Agent 10-60s 完成,复杂任务通常 2-5 分钟)。超长任务处理:切换到 BackgroundTask 异步执行模式,SSE 只推送 progress 事件(如"步骤 3/10:正在分析数据..."),最终结果通过 API 轮询或推送通知获取。防超时措施:SSE keep-alive 心跳(每 30s 发送空 comment : keepalive)防止中间代理/CDN 因空闲超时断开连接。