多模型适配、流式响应、ReAct推理循环的完整实现
LLM核心引擎架构
════════════════
Agent层 ← AbstractLlmAgent.chat()
│
├── SystemPromptWrapper ──→ PromptIsolationService (安全包装)
├── Hook System ──→ ON_SYSTEM_PROMPT / BEFORE_LLM_CALL / AFTER_LLM_CALL
├── Tool Registry ──→ MCP / Skill / Shell / NOS 工具描述注入
│
▼
ModelRegistry ← 中央模型注册表
│
├── 降级感知路由 ──→ 如果channel已熔断(OPEN/HALF_OPEN), 透明切换到降级链
├── TrackedChatModel装饰 ──→ 记录Token/延迟
│
▼
Provider Factory层
┌──────────┬──────────┬──────────┬──────────┬──────────┐
│ ZhiPu │ DashScope│ MiniMax │ Anthropic│ GPT4Free │
│ (原生) │(OpenAI兼容)│(Anthropic兼容)│(原生) │(OpenAI兼容)│
└──────────┴──────────┴──────────┴──────────┴──────────┘
│
▼
PooledChatModel ← 密钥池轮转 + 降级
│
▼
实际LLM API调用 ← 流式SSE响应
项目支持5个LLM供应商,每个供应商有独立的工厂类:
| Provider | 工厂类 | 协议 | 模型 |
|---|---|---|---|
| 智谱AI (ZhiPu) | ZhiPuAiChatModelFactory | Spring AI ZhiPu原生 | glm-5.1, glm-4-flash |
| 通义千问 (DashScope) | DashScopeChatModelFactory | OpenAI兼容协议 | qwen-max, qwen-plus |
| MiniMax | MiniMaxChatModelFactory | Anthropic兼容协议 | M2.7, M2.7-highspeed |
| Anthropic | AnthropicChatModelFactory | Anthropic原生 | claude-sonnet-4-6 |
| GPT4Free | OpenAI兼容 | OpenAI兼容协议 | gpt-4o, deepseek-chat |
ChatModelFactory 接口,通过Spring的依赖注入自动发现。新增Provider只需实现工厂接口+@Component,无需修改任何现有代码。这就是开闭原则(OCP)的典型应用。
ModelRegistry 是LLM调用的核心路由:
ModelRegistry.getModel(modelId) 执行流程:
modelId = "glm-5.1"
│
├── 1. 直接查找 providers["glm-5.1"]
│
├── 2. 别名查找 modelAliasToChannel["glm-5.1"] → channel → provider
│
├── 3. 数据库查找 model-name → channel → provider
│
├── 4. 降级检查: DegradationManager.isDegraded(channel)?
│ ├── 是 → 获取fallback channel的provider
│ └── 否 → 使用当前provider
│
└── 5. TrackedChatModel装饰 → 返回带遥测的ChatModel
可以为每个Agent配置默认模型,通过 AgentModelConfig:
每次LLM调用都被 TrackedChatModel 包装,自动记录:
TrackedChatModel 工作原理:
原始调用: TrackedChatModel装饰后:
┌──────────────┐ ┌──────────────────────────────┐
│ chatModel.call()│ │ 1. 记录开始时间 │
│ │ │ 2. 调用 delegate.call() │
│ │ │ 3. 记录结束时间 │
└──────────────┘ │ 4. 提取Token使用量: │
│ - inputTokens │
│ - outputTokens │
│ - cacheTokens (缓存命中) │
│ - reasoningTokens (推理) │
│ 5. 记录成功/失败 │
│ 6. 写入 LlmCallTracker │
└──────────────────────────────┘
LlmCallTracker (内存统计):
Key = hourKey|modelId|agentId|userId
每个"小时桶"统计: callCount, successCount, failureCount,
totalLatencyMs, maxLatencyMs,
totalInputTokens, totalOutputTokens,
totalCacheTokens, totalReasoningTokens
定期drain到数据库持久化
AbstractLlmAgent 是2343行的核心基类,所有LLM Agent都继承它。它用模板方法模式定义了执行骨架:
AbstractLlmAgent.chat(request) 模板方法: ┌─────────────────────────────────────────────────────┐ │ 1. 构建系统提示词 │ │ ├── getSystemPrompt() ← 子类实现 │ │ ├── SystemPromptWrapper.wrap() ← 安全包装 │ │ ├── Hook: ON_SYSTEM_PROMPT │ │ └── 注入工具描述 (MCP/Skill/Shell/NOS) │ ├─────────────────────────────────────────────────────┤ │ 2. 构建消息列表 │ │ ├── SystemMessage (系统提示词) │ │ ├── 上下文消息 (RAG检索结果) │ │ ├── 历史消息 (Token预算截断) │ │ └── UserMessage + 附件 │ ├─────────────────────────────────────────────────────┤ │ 3. 选择模型 │ │ └── ModelRegistry.getModel() → 降级感知 │ ├─────────────────────────────────────────────────────┤ │ 4. 选择执行模式 (子类可覆盖) │ │ ├── LEGACY: 传统for循环 │ │ ├── GRAPH: 自研Graph引擎 │ │ └── GRAPH_PARALLEL: SAA并行引擎 │ ├─────────────────────────────────────────────────────┤ │ 5. 执行ReAct循环 │ │ └── 三种模式的统一接口 │ ├─────────────────────────────────────────────────────┤ │ 6. 返回 AgentResponse (含SSE流) │ └─────────────────────────────────────────────────────┘
不同LLM对工具调用的支持不同,项目采用双模式检测:
工具调用检测流程:
LLM响应
│
├── 优先: 原生ToolCall检测
│ response.getResult().getOutput().getToolCalls()
│ → 返回结构化的 ToolCall(id, name, arguments)
│ ✅ 支持: ZhiPu, Anthropic, DashScope (OpenAI兼容)
│
└── 兜底: 正则解析 (ToolCallParser)
从LLM文本输出中提取 JSON:
{"tool": "search", "arguments": {"query": "..."}}
✅ 支持: 不支持function calling的老模型
检测到工具调用后:
├── skill_* → SkillToolProvider
├── shell_* → ShellToolProvider
├── nos_* → NosUploadToolProvider
└── 其他 → McpClientManager (MCP协议)
LLM的流式输出中,工具调用的参数是分多个chunk发送的。需要将这些fragment合并为完整的工具调用:
// 流式合并逻辑 (MergedToolCall)
// chunk1: {"tool_calls": [{"id":"tc1", "function":{"name":"search","arguments":""}}]}
// chunk2: {"tool_calls": [{"function":{"arguments":"{\"qu"}}]}
// chunk3: {"tool_calls": [{"function":{"arguments":"ery\":\"AI\"}"}}]}
//
// 合并后: ToolCall(id="tc1", name="search", arguments={"query":"AI"})
// 每个chunk到来时:
MergedToolCall accumulator:
- 按 toolCallId 分组
- 累积 argument fragments
- 最终合并为完整的 AssistantMessage + ToolCall列表
Spring AI 1.0.0的Anthropic集成不支持thinking类型的ContentBlock,项目通过反射注入修复:
thinking 为合法的 ContentBlockBody 子类型eventToChatCompletionResponse(),处理 CONTENT_BLOCK_START(type=thinking) 和 CONTENT_BLOCK_DELTA(type=thinking_delta/signature_delta)AnthropicApi.streamHelper 字段AbstractLlmAgent 在每次 LLM 调用前,自动检索与当前用户消息语义相关的长期记忆,并注入到 System Prompt 中。长期记忆来源于 LongTermMemoryStore(基于向量数据库),通过 Embedding 模型将用户消息转为向量,检索 Top-K 条最相关的历史记忆片段。
注入发生在 buildSystemPrompt() 阶段,位于基础系统提示词之后、工具描述之前。如果 Store Bean 不存在则安全跳过,不影响正常对话流程。
长期记忆注入流程:
User Message
│
▼
EmbeddingModel.embed(userMessage)
│
▼
VectorStore.similaritySearch(embedding, topK=5, threshold=0.75)
│
▼
Top-K Memories (按相似度排序)
│
▼
Inject to System Prompt
│
▼
LLM Call (含记忆上下文)
// 长期记忆注入格式 (追加到 System Prompt 末尾) <long_term_memory> 以下是与当前对话相关的历史记忆,可作为回答参考: [Memory 1] 2026-04-15: 用户偏好使用中文回答技术问题 [Memory 2] 2026-04-20: 用户正在开发一个基于Spring Boot的微服务项目 [Memory 3] 2026-04-22: 用户对RAG检索增强生成技术非常感兴趣 </long_term_memory>
AsyncToolCallback 基于 CompletableFuture 接口,为工具执行提供异步能力。每个工具调用返回一个 CompletableFuture<ToolResult>,支持协作式取消(cancel(true))和独立超时控制(orTimeout(60, SECONDS))。
在 GRAPH_PARALLEL 模式下,多个工具调用被提交到 AsyncToolExecutor 的有界线程池,通过 CompletableFuture.allOf() 等待全部完成后合并结果。单个工具超时不影响其他工具执行——超时工具返回错误 result,图引擎继续处理。
AsyncToolCallback 并行执行流程:
Agent (ReAct 迭代)
│
▼
AsyncToolExecutor
│
├──→ ThreadPool ──→ Tool1 ──→ CompletableFuture<Result1>
├──→ ThreadPool ──→ Tool2 ──→ CompletableFuture<Result2>
└──→ ThreadPool ──→ Tool3 ──→ CompletableFuture<Result3>
│
▼
CompletableFuture.allOf()
│
▼
Merge Results
│
▼
返回合并后的 ToolResponse 列表
async-tool.default-timeout-ms=60000)保护单个工具不会无限阻塞;图级超时(总图执行超时)保护整体任务。工具超时 → 单工具失败(错误 result)→ 图继续执行;图超时 → 全部中止。
工具执行结果不仅限于文本,还可以返回图片、音频、视频等多模态内容。MultimodalToolResultConverter 负责自动转换工具返回的多模态数据,使其适配不同模型的输入格式。
对于不支持多模态输入的模型,Converter 自动执行降级:图片转为文本描述、音频转为转写文本、视频转为关键帧描述。降级后的原始多模态内容存入 metadata,供前端展示。
| MIME Type | 描述 | 降级策略 |
|---|---|---|
image/png | PNG 图片 | → "[Image: description]" 文本描述 |
image/jpeg | JPEG 图片 | → "[Image: description]" 文本描述 |
audio/mp3 | MP3 音频 | → ASR 转写文本 |
video/mp4 | MP4 视频 | → 关键帧描述文本 |
工具结果处理链集成:MultimodalToolResultConverter 已接入 AbstractLlmAgent 的工具结果处理链(processToolResultContent() 方法)。在工具执行完成后、结果注入 LLM 上下文之前,系统自动检测工具返回内容是否包含多模态标记(---附件---),并记录检测日志。convertForLlm() / convertForFrontend() / convertForAudit() 三个转换方法已就绪,待 MCP/Skill 工具扩展支持原生 MultimodalToolResult 返回类型后启用。集成覆盖三条执行路径:串行工具执行、并行工具执行(ParallelToolExecutor)、审批工具执行。
支持长时间运行的工具(数据导出、批量 API 调用)以异步方式执行,带超时和协作式取消。
| 组件 | 职责 |
|---|---|
AsyncToolCallback | @FunctionalInterface,定义 callAsync(name, args, token) → CompletableFuture<AsyncToolResult> |
NamedAsyncToolCallback | 扩展接口,添加 getName() / getDescription() / getInputSchema() / getTimeoutMs() |
AsyncToolCollector | @Component 收集器,自动注册 Bean,提供 isAsync() / executeAsync() |
CancellationToken | 协作式取消令牌(AtomicBoolean),工具在循环点检查 isCancelled() |
AsyncToolResult | 执行结果 record,含 success/failure/cancelled/timeout 工厂方法 |
执行流程:executeToolCallInternal() 检测到 asyncToolCollector.isAsync(toolName) → 创建 CancellationToken → 调用 tool.callAsync() → future.get(timeoutMs) 阻塞等待 → 超时自动 token.cancel()。ReAct 循环是同步的,异步框架的价值在于超时+取消控制,而非真正异步返回。
示例实现:AsyncDataExportTool(hub-agent-core/.../react/async/example/),模拟分批数据导出,每批检查 token.throwIfCancelled(),超时 60 秒。
支持工具在多轮对话间维持状态,通过 3 层回退机制保证状态不丢失。
| 组件 | 职责 |
|---|---|
StateAwareToolCallback | 接口,call(name, args, state) 签名带 ToolStateAccessor 参数 |
ToolStateAccessor | 3 层状态访问器:pendingWrites → graphState → localStore |
ToolStateCollector | @Component 收集器,提供 isStateAware() / executeWithState();内置 fallbackState 保证非 Graph 模式下跨轮次状态持久化 |
3 层状态回退:读取时优先从 pendingWrites(本次执行写入) → graphState(跨工具共享) → localStore(本地内存后备)。写入时通过 state.put() 同时更新 pendingWrites、localStore 和 graphState(如存在)。当 executeToolCallInternal() 传入的 graphState 为 null 时(非 Graph 模式),ToolStateCollector 使用内置的 fallbackState(单例 GraphState)确保跨轮次状态不丢失。
示例实现:StatefulFormFillerTool(hub-agent-core/.../react/state/example/),跨轮次渐进收集表单字段(姓名→电话→地址),通过 state.put("form_name", value) 保存,state.getString("form_name") 读取。
executeToolCallInternal() 按以下顺序路由:状态感知工具 → 异步工具 → Skill → Shell → NOS → 记忆 → MCP。状态感知和异步工具按名称精确匹配(不依赖前缀),其他工具按前缀匹配。
||a-b||² = 2(1-cos(a,b))),但Cosine更直观且计算更高效(只需点积)。
async-tool.default-timeout-ms=60s)保护单个工具调用不会无限阻塞;图级超时(总图执行超时)保护整体任务执行时间。工具超时触发时,仅该工具返回错误result,图引擎继续处理其他正常工具的结果;图超时触发时,所有进行中的工具调用被取消(future.cancel(true)),整个图执行终止。
MultimodalToolResultConverter 自动降级处理:图片 → "[Image: description]" 文本描述;音频 → ASR转写文本;视频 → 关键帧描述文本。降级信息存入message的metadata字段(originalMediaType、originalMediaUrl),供前端展示原始多媒体内容。模型只看到文本,但用户在UI上能看到原始图片/音频/视频。
agent-config.yml 配置 runtime-mode(支持全局默认 + per-agent覆盖)。切换生效时机:新对话使用新模式,进行中的对话保持原模式直到结束(对话级绑定)。这意味着模式切换不会中断任何正在进行的任务,无中断风险。三种模式:LEGACY(传统for循环)、GRAPH(图引擎串行)、GRAPH_PARALLEL(图引擎并行)。
TokenEstimator 基于文本长度估算(中文约1.5 token/字,英文约0.75 token/word)。估算值标记 estimated=true 以区分精确统计。
MiniMax 作为一级 LLM 提供商集成到项目中,核心设计亮点是利用 API 协议兼容性——MiniMax 在 /anthropic 路径暴露了 Anthropic 兼容的 Messages API 端点,因此通过 Spring AI 的 AnthropicChatModel 作为适配器接入,无需编写私有客户端。
MiniMax 集成架构:
配置层 启动初始化 运行时
↓ ↓ ↓
application.yml ──→ KeyPoolAutoConfiguration ──→ Agent 请求聊天
(minimax 配置) (注册工厂与通道)
↓ ↓
MiniMaxChatModelFactory PooledChatModel
(创建 AnthropicChatModel (轮询 API Key)
实例, baseUrl → MiniMax)
↓ ↓
Spring AI AnthropicApi 处理限速与降级
↓
https://api.minimaxi.com/anthropic/...
@Component("minimaxChatModelFactory")
public class MiniMaxChatModelFactory implements ChatModelFactory {
@Override
public ChatModel create(String apiKey, ChannelConfig config) {
AnthropicApi api = AnthropicApi.builder()
.apiKey(apiKey)
.baseUrl(config.getBaseUrl()) // https://api.minimaxi.com/anthropic
.build();
AnthropicChatOptions options = AnthropicChatOptions.builder()
.model(config.getModel()) // MiniMax-M2.7
.temperature(config.getTemperature())
.maxTokens(config.getMaxTokens())
.build();
return AnthropicChatModel.builder()
.anthropicApi(api)
.defaultOptions(options)
.build();
}
}
工厂发现机制:provider = "minimax" → 查找 "minimaxChatModelFactory" Bean → 自动匹配。
| 提供商 | 协议 | Spring AI 类 |
|---|---|---|
| ZhiPu GLM | 原生 ZhiPu | ZhiPuAiChatModel |
| DashScope/Qwen | OpenAI 兼容 | OpenAiChatModel |
| DeepSeek | OpenAI 兼容 | OpenAiChatModel |
| MiniMax | Anthropic 兼容 | AnthropicChatModel |
| Ollama (本地) | OpenAI 兼容 | OpenAiChatModel |
MiniMax 降级流程:
所有 Key 用尽 / 429 限速
↓
DegradationManager 检查 fallback 配置
↓
channel="minimax" → fallback="deepseek"
↓
切换到 deepseek 通道重试
↓
成功 → 返回 | 失败 → 抛出异常
↓
恢复超时 300 秒后,重新尝试 MiniMax
多层韧性保障:密钥轮转(round-robin)→ 冷却跳过(60s cooldown)→ 降级切换(fallback channel)→ 自动恢复(recovery timeout)。
| 模型 | 上下文 | 速度 | 说明 |
|---|---|---|---|
| MiniMax-M2.7 | 204K | 标准 | 旗舰通用模型 |
| MiniMax-M2.7-highspeed | 204K | ~100 tps | 旗舰高速版 |
| MiniMax-M2.5 | 204K | 标准 | 高性价比版本 |
/anthropic 路径提供了 Anthropic Messages API 兼容端点,因此项目复用 Spring AI 已有的 AnthropicChatModel 实现。工厂类 MiniMaxChatModelFactory 只需将 baseUrl 指向 MiniMax 的端点,其余逻辑(请求构建、流式解析、工具调用)全部由 Spring AI 的 Anthropic 客户端处理。这体现了"协议兼容 > 私有集成"的设计哲学——零协议适配成本,只需一个工厂类 + YAML 配置。