SSE 流式推送机制、StreamOutputType 分类、结构化输出 Schema 注入与前端渲染适配
Agent Hub 使用 SSE(Server-Sent Events) 实现服务端到客户端的实时流式推送。SSE 基于标准 HTTP 协议,天然兼容负载均衡、CDN 和 Nginx 反向代理。
SSE 流式输出架构:
Client (Browser) Server (Agent Hub)
│ │
│ POST /api/chat (发起对话) │
│ ────────────────────────────────────────▶│
│ │
│ HTTP 200 (Content-Type: text/event-stream)
│ ◀────────────────────────────────────────│
│ │
│ event: text │
│ data: {"content": "根据"} │
│ ◀────────────────────────────────────────│
│ │
│ event: text │
│ data: {"content": "分析"} │
│ ◀────────────────────────────────────────│
│ │
│ event: tool_call │
│ data: {"name": "web_search", ...} │
│ ◀────────────────────────────────────────│
│ │
│ event: progress │
│ data: {"step": 3, "total": 5} │
│ ◀────────────────────────────────────────│
│ │
│ event: done │
│ data: {"token_usage": 1234} │
│ ◀────────────────────────────────────────│
| Event Type | 说明 | 触发时机 |
|---|---|---|
text | 文本内容片段 | LLM 每生成一个 Token |
tool_call | 工具调用信息 | Agent 决定调用工具时 |
task_decomp | 任务分解 | 多 Agent 编排时的子任务拆分 |
sub_agent | 子 Agent 执行 | 子 Agent 开始/完成执行 |
progress | 进度更新 | 长时间任务的步骤进度 |
hitl_pending | 人机协同等待 | 需要人类审批时 |
done | 完成标记 | 整个响应结束时 |
SseFlushFilter 是一个 Servlet Filter,确保 SSE 事件立即推送到客户端而非被缓冲区积压:
// SseFlushFilter — 自动 flush SSE 响应缓冲区
@Component
public class SseFlushFilter implements Filter {
@Override
public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain)
throws IOException, ServletException {
HttpServletResponse response = (HttpServletResponse) res;
String accept = ((HttpServletRequest) req).getHeader("Accept");
if ("text/event-stream".equals(accept)) {
// 包装 response,每次 write 后自动 flush
chain.doFilter(req, new AutoFlushResponseWrapper(response));
} else {
chain.doFilter(req, res);
}
}
}
: keepalive)防止代理/CDN 超时断开。
系统根据 LLM 输出内容自动分类 StreamOutputType,前端据此选择对应的渲染组件:
| Type | Description | Frontend Rendering |
|---|---|---|
TEXT | 普通文本内容 | Markdown 渲染 |
CODE | 代码块 | 语法高亮 + 复制按钮 |
TABLE | 结构化表格数据 | 表格组件 |
CHART | 可视化数据 | ECharts 图表 |
IMAGE | 图片 URL / base64 | 图片展示 |
FILE | 文件引用 | 下载链接 |
StreamOutputType 识别流程:
LLM 输出片段
│
▼
┌─────────────────────────────────────────────────────┐
│ OutputTypeClassifier │
│ │
│ ├── 检测 ```language → CODE │
│ ├── 检测 | col | col | 格式 → TABLE │
│ ├── 检测 {"chart": ...} JSON → CHART │
│ ├── 检测  或 data:image → IMAGE │
│ ├── 检测 [file](url) 下载链接 → FILE │
│ └── 默认 → TEXT │
│ │
│ 每个 SSE event 携带 outputType 字段 │
│ data: {"content": "...", "outputType": "CODE"} │
└─────────────────────────────────────────────────────┘
除了按内容类型分类,系统还支持按事件源分类 SSE 事件。TypedStreamEvent 将每个 SSE 事件包装为带输出类型的事件,方便前端按来源差异化渲染:
| StreamOutputType | SSE event name | 来源 | 前端推荐渲染 |
|---|---|---|---|
MODEL_OUTPUT | model_output | LLM 生成文本 | Markdown 渲染 + 打字机效果 |
TOOL_OUTPUT | tool_output | 工具调用过程和结果 | 折叠面板 + 工具状态指示器 |
HOOK_OUTPUT | hook_output | Hook 通知/提醒 | 灰色系统消息,可折叠 |
GRAPH_OUTPUT | graph_output | Graph 节点状态变更 | 流程图/状态面板 |
PLAN_OUTPUT | plan_output | Plan Mode 方案/步骤 | Checklist + 进度条 |
CONTROL | control | done/sources 等控制信号 | 不展示,触发前端逻辑 |
启用方式:前端在发送消息时传 typedSse: true,SSE 事件格式变为:
event: tool_output
data: {"outputType":"TOOL_OUTPUT","originalEvent":"tool","payload":{...}}
不传 typedSse 或传 false 时保持原有 SSE 格式(完全向后兼容)。
结构化输出确保 LLM 返回符合预定义 Schema 的 JSON 数据,而非自由文本。
| 组件 | 职责 |
|---|---|
| OutputSchemaInjector | 将 JSON Schema 注入到 Prompt 中,引导 LLM 输出格式 |
| StructuredOutputConfig | 定义期望的输出格式(Java Record / JSON Schema) |
| StructuredOutputParser | 解析 LLM 返回的 JSON,校验是否符合 Schema |
结构化输出完整流程:
┌─────────────────────────────────────────────────────┐
│ ① Define Schema │
│ @StructuredOutput 注解 或 JSON Schema 配置 │
│ │
│ @StructuredOutput(schema = "research_report") │
│ public record ResearchReport( │
│ String title, │
│ List<Finding> findings, │
│ String conclusion │
│ ) {} │
└──────────────────┬──────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────┐
│ ② Inject into Prompt │
│ OutputSchemaInjector 将 Schema 注入 system prompt │
│ │
│ "请严格按以下 JSON 格式返回结果: │
│ { │
│ "title": "string - 报告标题", │
│ "findings": [{"topic": "...", "detail": "..."}],│
│ "conclusion": "string - 总结结论" │
│ }" │
└──────────────────┬──────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────┐
│ ③ LLM Generates JSON │
│ 模型根据 Schema 提示生成结构化 JSON │
│ 支持 native JSON mode (GPT-4o response_format) │
└──────────────────┬──────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────┐
│ ④ Parse & Validate │
│ StructuredOutputParser 解析 JSON │
│ ├── 格式校验(是否合法 JSON) │
│ ├── Schema 校验(字段类型、必填项) │
│ └── 反序列化为 Java Record │
└─────────────────────────────────────────────────────┘
// 定义结构化输出格式
@StructuredOutput(schema = "research_report")
public record ResearchReport(
String title,
List<Finding> findings,
String conclusion
) {}
public record Finding(
String topic,
String detail,
double confidence
) {}
// Agent 使用结构化输出
@Component
public class ResearchAgent extends AbstractLlmAgent {
@Override
protected StructuredOutputConfig getOutputConfig() {
return StructuredOutputConfig.builder()
.outputClass(ResearchReport.class)
.injectionStrategy(InjectionStrategy.SYSTEM_PROMPT)
.retryOnParseFailure(true)
.maxRetries(2)
.build();
}
}
前端可通过 /api/conversations/{id}/messages 请求体中的 structuredOutput 字段动态控制输出格式:
POST /api/conversations/{id}/messages
{
"message": "分析上个月的销售数据",
"structuredOutput": {
"outputFormat": "JSON",
"outputDescription": "返回包含 sales 和 trend 的 JSON",
"exampleOutput": "{\"sales\": 12345, \"trend\": \"上升\"}",
"requiredFields": ["sales", "trend"],
"outputSchema": "{...JSON Schema...}"
}
}
| 字段 | 类型 | 说明 |
|---|---|---|
outputFormat | String | JSON / YAML / TEXT |
outputDescription | String | 输出描述,注入 prompt 帮助 LLM 理解格式要求 |
exampleOutput | String | 示例输出,作为 few-shot 示例 |
requiredFields | List | 必填字段,解析后校验 |
outputSchema | String | JSON Schema 定义 |
OutputSchemaInjector.inject() 自动将这些配置注入 system prompt(description → "说明:"、requiredFields → "必填字段:"、exampleOutput → "示例输出:"),LLM 据此输出格式化内容。
| 策略 | 注入位置 | 优点 | 缺点 |
|---|---|---|---|
| SYSTEM_PROMPT | 系统提示词末尾 | 不占用用户消息位,Schema 持久生效 | 较长 Schema 可能稀释系统提示词效果 |
| USER_MESSAGE | 用户消息中追加 | 每次请求可动态调整 Schema | 每轮都需注入,Token 开销更高 |
| NATIVE_JSON_MODE | API 参数 response_format | 最可靠,模型原生支持 | 仅部分模型支持(GPT-4o、Claude 3.5+) |
items 的元素类型和最大长度
JSON 解析失败重试策略(3 层):
LLM 原始输出
│
▼
┌─────────────────────────────────────────────────────┐
│ 第 1 层:宽松解析 │
│ 允许 trailing comma、单引号、注释 │
│ 成功 → 返回结果 │
│ 失败 ↓ │
├─────────────────────────────────────────────────────┤
│ 第 2 层:提取修复 │
│ 正则提取 ```json ... ``` 块 │
│ 去除 Markdown 包裹 │
│ 成功 → 返回结果 │
│ 失败 ↓ │
├─────────────────────────────────────────────────────┤
│ 第 3 层:重试(最多 2 次) │
│ 附加错误信息让 LLM 修正: │
│ "上次输出的 JSON 格式错误:{error},请修正" │
│ 成功 → 返回结果 │
│ 失败 ↓ │
├─────────────────────────────────────────────────────┤
│ 降级:纯文本返回 │
│ 记录 WARN 日志,原文作为 TEXT 类型返回 │
└─────────────────────────────────────────────────────┘
前端根据 SSE 事件中的 outputType 字段动态选择渲染组件,无需硬编码展示逻辑:
前端渲染组件映射:
SSE Event (outputType)
│
▼
┌─────────────────────────────────────────────────────┐
│ RenderRouter │
│ │
│ TEXT → <MarkdownRenderer /> │
│ 流式增量渲染,状态机跟踪代码块/列表 │
│ │
│ CODE → <CodeBlock language={lang} /> │
│ Prism.js 语法高亮 + 一键复制 │
│ │
│ TABLE → <DataTable columns={...} rows={...} /> │
│ 可排序、可筛选的表格组件 │
│ │
│ CHART → <EChartsRenderer option={...} /> │
│ 自动适配图表类型(柱状/折线/饼图) │
│ │
│ IMAGE → <ImageViewer src={url} /> │
│ 懒加载 + 点击放大 │
│ │
│ FILE → <FileDownload href={url} name={...} /> │
│ 文件名 + 大小 + 下载按钮 │
│ │
│ tool_call → <ToolCallCard name={...} args={...}/>│
│ 工具名 + 参数折叠展示 │
└─────────────────────────────────────────────────────┘
流式渲染的核心挑战:Token 逐个到达,Markdown 结构可能被截断(如代码块 ``` 只到达了前半部分)。
# application.yml — 流式输出与结构化响应配置
# SSE 流式输出配置
agent:
sse:
timeout: 600s # SSE 连接超时(默认 600 秒)
keepalive-interval: 30s # Keep-alive 心跳间隔
buffer-size: 8192 # 响应缓冲区大小
retry-interval: 3000 # 客户端重连间隔(毫秒)
# 结构化输出配置
agent:
structured-output:
enabled: true
default-injection: SYSTEM_PROMPT # 默认 Schema 注入位置
max-retries: 2 # 解析失败最大重试次数
lenient-parsing: true # 允许宽松 JSON 解析
native-json-mode:
enabled: true # 支持的模型使用原生 JSON mode
supported-models:
- gpt-4o
- gpt-4o-mini
- claude-3.5-sonnet
# StreamOutputType 配置
agent:
output-type:
auto-detect: true # 自动检测输出类型
default-type: TEXT # 默认输出类型
chart:
engine: echarts # 图表引擎
max-data-points: 1000 # 最大数据点数
``` 计数奇偶判定)、列表内、表格内。buffer 当前不完整的块直到结束标记到达。增量渲染:只 append 新内容,不 re-render 全部 DOM。具体策略:① 代码块:遇到 ``` 开始 buffer,遇到闭合 ``` 才整体渲染;② 表格:buffer 至少一行完整行才渲染;③ 内联样式(加粗、斜体):检测 ** 配对状态。
$ref 引用避免重复子结构定义;4. 对 GPT-4o 等支持原生 JSON mode 的模型直接使用 response_format 参数而非 Prompt 注入,节省 Schema Token 且格式更可靠。
```json 代码块,去除 Markdown 包裹);3. 重试(附加错误信息让 LLM 修正:"上次输出 JSON 格式错误:{具体错误},请修正",最多重试 2 次)。超过重试次数:降级为纯文本返回,记录 WARN 日志,不阻断用户体验。
id(递增序号),客户端重连时自动发送 Last-Event-ID header。服务端从该位置续传,避免重复推送。Agent Hub 实现:checkpoint 最后发送的事件 ID,支持从断点续传。极端情况(服务端重启):客户端检测到重连失败,提示用户重新发送。
: keepalive)防止中间代理/CDN 因空闲超时断开连接。