04 - LLM核心引擎

多模型适配、流式响应、ReAct推理循环的完整实现

一、LLM引擎架构

                    LLM核心引擎架构
                    ════════════════

  Agent层            ← AbstractLlmAgent.chat()
    │
    ├── SystemPromptWrapper ──→ PromptIsolationService (安全包装)
    ├── Hook System ──→ ON_SYSTEM_PROMPT / BEFORE_LLM_CALL / AFTER_LLM_CALL
    ├── Tool Registry ──→ MCP / Skill / Shell / NOS 工具描述注入
    │
    ▼
  ModelRegistry      ← 中央模型注册表
    │
    ├── 降级感知路由 ──→ 如果channel已熔断(OPEN/HALF_OPEN), 透明切换到降级链
    ├── TrackedChatModel装饰 ──→ 记录Token/延迟
    │
    ▼
  Provider Factory层
    ┌──────────┬──────────┬──────────┬──────────┬──────────┐
    │ ZhiPu    │ DashScope│ MiniMax  │ Anthropic│ GPT4Free │
    │ (原生)   │(OpenAI兼容)│(Anthropic兼容)│(原生)  │(OpenAI兼容)│
    └──────────┴──────────┴──────────┴──────────┴──────────┘
    │
    ▼
  PooledChatModel    ← 密钥池轮转 + 降级
    │
    ▼
  实际LLM API调用    ← 流式SSE响应

二、多Provider适配(策略模式)

项目支持5个LLM供应商,每个供应商有独立的工厂类:

Provider工厂类协议模型
智谱AI (ZhiPu)ZhiPuAiChatModelFactorySpring AI ZhiPu原生glm-5.1, glm-4-flash
通义千问 (DashScope)DashScopeChatModelFactoryOpenAI兼容协议qwen-max, qwen-plus
MiniMaxMiniMaxChatModelFactoryAnthropic兼容协议M2.7, M2.7-highspeed
AnthropicAnthropicChatModelFactoryAnthropic原生claude-sonnet-4-6
GPT4FreeOpenAI兼容OpenAI兼容协议gpt-4o, deepseek-chat
设计亮点: 所有工厂实现 ChatModelFactory 接口,通过Spring的依赖注入自动发现。新增Provider只需实现工厂接口+@Component,无需修改任何现有代码。这就是开闭原则(OCP)的典型应用。

三、ModelRegistry — 模型注册与路由

ModelRegistry 是LLM调用的核心路由:

ModelRegistry.getModel(modelId) 执行流程:

  modelId = "glm-5.1"
      │
      ├── 1. 直接查找 providers["glm-5.1"]
      │
      ├── 2. 别名查找 modelAliasToChannel["glm-5.1"] → channel → provider
      │
      ├── 3. 数据库查找 model-name → channel → provider
      │
      ├── 4. 降级检查: DegradationManager.isDegraded(channel)?
      │   ├── 是 → 获取fallback channel的provider
      │   └── 否 → 使用当前provider
      │
      └── 5. TrackedChatModel装饰 → 返回带遥测的ChatModel

每Agent默认模型

可以为每个Agent配置默认模型,通过 AgentModelConfig

四、TrackedChatModel — 调用追踪(装饰器模式)

每次LLM调用都被 TrackedChatModel 包装,自动记录:

TrackedChatModel 工作原理:

  原始调用:                    TrackedChatModel装饰后:
  ┌──────────────┐            ┌──────────────────────────────┐
  │ chatModel.call()│         │ 1. 记录开始时间               │
  │              │            │ 2. 调用 delegate.call()       │
  │              │            │ 3. 记录结束时间               │
  └──────────────┘            │ 4. 提取Token使用量:           │
                               │    - inputTokens              │
                               │    - outputTokens             │
                               │    - cacheTokens (缓存命中)    │
                               │    - reasoningTokens (推理)    │
                               │ 5. 记录成功/失败              │
                               │ 6. 写入 LlmCallTracker        │
                               └──────────────────────────────┘

  LlmCallTracker (内存统计):
  Key = hourKey|modelId|agentId|userId
  每个"小时桶"统计: callCount, successCount, failureCount,
                    totalLatencyMs, maxLatencyMs,
                    totalInputTokens, totalOutputTokens,
                    totalCacheTokens, totalReasoningTokens
  定期drain到数据库持久化

五、AbstractLlmAgent — Agent基类(模板方法模式)

AbstractLlmAgent 是2343行的核心基类,所有LLM Agent都继承它。它用模板方法模式定义了执行骨架:

AbstractLlmAgent.chat(request) 模板方法:

  ┌─────────────────────────────────────────────────────┐
  │ 1. 构建系统提示词                                    │
  │    ├── getSystemPrompt()  ← 子类实现                │
  │    ├── SystemPromptWrapper.wrap()  ← 安全包装       │
  │    ├── Hook: ON_SYSTEM_PROMPT                      │
  │    └── 注入工具描述 (MCP/Skill/Shell/NOS)           │
  ├─────────────────────────────────────────────────────┤
  │ 2. 构建消息列表                                      │
  │    ├── SystemMessage (系统提示词)                    │
  │    ├── 上下文消息 (RAG检索结果)                      │
  │    ├── 历史消息 (Token预算截断)                      │
  │    └── UserMessage + 附件                            │
  ├─────────────────────────────────────────────────────┤
  │ 3. 选择模型                                          │
  │    └── ModelRegistry.getModel() → 降级感知          │
  ├─────────────────────────────────────────────────────┤
  │ 4. 选择执行模式 (子类可覆盖)                         │
  │    ├── LEGACY: 传统for循环                           │
  │    ├── GRAPH: 自研Graph引擎                          │
  │    └── GRAPH_PARALLEL: SAA并行引擎                   │
  ├─────────────────────────────────────────────────────┤
  │ 5. 执行ReAct循环                                     │
  │    └── 三种模式的统一接口                            │
  ├─────────────────────────────────────────────────────┤
  │ 6. 返回 AgentResponse (含SSE流)                     │
  └─────────────────────────────────────────────────────┘

六、工具调用检测 — 双模式

不同LLM对工具调用的支持不同,项目采用双模式检测

工具调用检测流程:

  LLM响应
    │
    ├── 优先: 原生ToolCall检测
    │   response.getResult().getOutput().getToolCalls()
    │   → 返回结构化的 ToolCall(id, name, arguments)
    │   ✅ 支持: ZhiPu, Anthropic, DashScope (OpenAI兼容)
    │
    └── 兜底: 正则解析 (ToolCallParser)
        从LLM文本输出中提取 JSON:
        {"tool": "search", "arguments": {"query": "..."}}
        ✅ 支持: 不支持function calling的老模型

  检测到工具调用后:
    ├── skill_* → SkillToolProvider
    ├── shell_* → ShellToolProvider
    ├── nos_*   → NosUploadToolProvider
    └── 其他     → McpClientManager (MCP协议)

七、流式响应合并

LLM的流式输出中,工具调用的参数是分多个chunk发送的。需要将这些fragment合并为完整的工具调用:

// 流式合并逻辑 (MergedToolCall)
// chunk1: {"tool_calls": [{"id":"tc1", "function":{"name":"search","arguments":""}}]}
// chunk2: {"tool_calls": [{"function":{"arguments":"{\"qu"}}]}
// chunk3: {"tool_calls": [{"function":{"arguments":"ery\":\"AI\"}"}}]}
//
// 合并后: ToolCall(id="tc1", name="search", arguments={"query":"AI"})

// 每个chunk到来时:
MergedToolCall accumulator:
  - 按 toolCallId 分组
  - 累积 argument fragments
  - 最终合并为完整的 AssistantMessage + ToolCall列表

八、Anthropic流式修复

Spring AI 1.0.0的Anthropic集成不支持thinking类型的ContentBlock,项目通过反射注入修复:

AnthropicStreamingFixConfig 做了三件事:
1. Jackson Mixin:注册 thinking 为合法的 ContentBlockBody 子类型
2. ThinkingAwareStreamHelper:覆盖 eventToChatCompletionResponse(),处理 CONTENT_BLOCK_START(type=thinking) 和 CONTENT_BLOCK_DELTA(type=thinking_delta/signature_delta)
3. 反射注入:通过反射替换 AnthropicApi.streamHelper 字段
这个修复让项目能正确处理支持"扩展思考"的模型(如MiniMax M2.7)。

九、面试高频问题

Q: 为什么需要多Provider适配?直接用OpenAI的SDK不行吗?
A: 因为不同供应商的API协议不同:智谱有自己的ZhiPu协议,MiniMax用Anthropic兼容协议,通义千问用OpenAI兼容协议。而且不同模型的能力差异很大(有的支持function calling,有的不支持)。项目通过工厂模式抽象了这些差异,上层代码只需调用统一的ChatModel接口,不需要关心底层是哪个Provider。此外,多Provider还提供了供应商级别的容灾能力——一个Provider挂了可以自动切换到另一个。
Q: TrackedChatModel是怎么获取Token使用量的?
A: 不同Provider返回Token信息的方式不同。TrackedChatModel通过反射提取:OpenAI格式从usage对象获取,ZhiPu格式从promptTokensDetails.getCachedTokens()获取缓存Token,Anthropic格式从getCompletionTokensDetails().getReasoningTokens()获取推理Token。这种方式虽然用了反射,但避免了硬编码依赖具体Provider的实现类。
Q: 为什么需要双模式工具调用检测?
A: 不是所有LLM都支持原生的function calling。比如GPT4Free提供的一些免费模型就不支持。对于这些模型,我们在系统提示词中描述工具格式,让LLM在文本中以JSON形式输出工具调用,然后通过正则解析提取。两种模式自动切换,上层代码无感知。

十、长期记忆注入

AbstractLlmAgent 在每次 LLM 调用前,自动检索与当前用户消息语义相关的长期记忆,并注入到 System Prompt 中。长期记忆来源于 LongTermMemoryStore(基于向量数据库),通过 Embedding 模型将用户消息转为向量,检索 Top-K 条最相关的历史记忆片段。

注入发生在 buildSystemPrompt() 阶段,位于基础系统提示词之后、工具描述之前。如果 Store Bean 不存在则安全跳过,不影响正常对话流程。

长期记忆注入流程:

  User Message
      │
      ▼
  EmbeddingModel.embed(userMessage)
      │
      ▼
  VectorStore.similaritySearch(embedding, topK=5, threshold=0.75)
      │
      ▼
  Top-K Memories (按相似度排序)
      │
      ▼
  Inject to System Prompt
      │
      ▼
  LLM Call (含记忆上下文)
// 长期记忆注入格式 (追加到 System Prompt 末尾)
<long_term_memory>
以下是与当前对话相关的历史记忆,可作为回答参考:

[Memory 1] 2026-04-15: 用户偏好使用中文回答技术问题
[Memory 2] 2026-04-20: 用户正在开发一个基于Spring Boot的微服务项目
[Memory 3] 2026-04-22: 用户对RAG检索增强生成技术非常感兴趣
</long_term_memory>

十一、AsyncToolCallback

AsyncToolCallback 基于 CompletableFuture 接口,为工具执行提供异步能力。每个工具调用返回一个 CompletableFuture<ToolResult>,支持协作式取消(cancel(true))和独立超时控制(orTimeout(60, SECONDS))。

GRAPH_PARALLEL 模式下,多个工具调用被提交到 AsyncToolExecutor 的有界线程池,通过 CompletableFuture.allOf() 等待全部完成后合并结果。单个工具超时不影响其他工具执行——超时工具返回错误 result,图引擎继续处理。

AsyncToolCallback 并行执行流程:

  Agent (ReAct 迭代)
      │
      ▼
  AsyncToolExecutor
      │
      ├──→ ThreadPool ──→ Tool1 ──→ CompletableFuture<Result1>
      ├──→ ThreadPool ──→ Tool2 ──→ CompletableFuture<Result2>
      └──→ ThreadPool ──→ Tool3 ──→ CompletableFuture<Result3>
                                          │
                                          ▼
                              CompletableFuture.allOf()
                                          │
                                          ▼
                                    Merge Results
                                          │
                                          ▼
                              返回合并后的 ToolResponse 列表
两级超时保护:工具级超时(async-tool.default-timeout-ms=60000)保护单个工具不会无限阻塞;图级超时(总图执行超时)保护整体任务。工具超时 → 单工具失败(错误 result)→ 图继续执行;图超时 → 全部中止。

十二、多模态支持

工具执行结果不仅限于文本,还可以返回图片、音频、视频等多模态内容。MultimodalToolResultConverter 负责自动转换工具返回的多模态数据,使其适配不同模型的输入格式。

对于不支持多模态输入的模型,Converter 自动执行降级:图片转为文本描述、音频转为转写文本、视频转为关键帧描述。降级后的原始多模态内容存入 metadata,供前端展示。

MIME Type描述降级策略
image/pngPNG 图片→ "[Image: description]" 文本描述
image/jpegJPEG 图片→ "[Image: description]" 文本描述
audio/mp3MP3 音频→ ASR 转写文本
video/mp4MP4 视频→ 关键帧描述文本

工具结果处理链集成:MultimodalToolResultConverter 已接入 AbstractLlmAgent 的工具结果处理链(processToolResultContent() 方法)。在工具执行完成后、结果注入 LLM 上下文之前,系统自动检测工具返回内容是否包含多模态标记(---附件---),并记录检测日志。convertForLlm() / convertForFrontend() / convertForAudit() 三个转换方法已就绪,待 MCP/Skill 工具扩展支持原生 MultimodalToolResult 返回类型后启用。集成覆盖三条执行路径:串行工具执行、并行工具执行(ParallelToolExecutor)、审批工具执行。

十二.5、异步工具与状态感知工具框架

异步工具框架(AsyncToolCallback)

支持长时间运行的工具(数据导出、批量 API 调用)以异步方式执行,带超时和协作式取消。

组件职责
AsyncToolCallback@FunctionalInterface,定义 callAsync(name, args, token) → CompletableFuture<AsyncToolResult>
NamedAsyncToolCallback扩展接口,添加 getName() / getDescription() / getInputSchema() / getTimeoutMs()
AsyncToolCollector@Component 收集器,自动注册 Bean,提供 isAsync() / executeAsync()
CancellationToken协作式取消令牌(AtomicBoolean),工具在循环点检查 isCancelled()
AsyncToolResult执行结果 record,含 success/failure/cancelled/timeout 工厂方法

执行流程:executeToolCallInternal() 检测到 asyncToolCollector.isAsync(toolName) → 创建 CancellationToken → 调用 tool.callAsync()future.get(timeoutMs) 阻塞等待 → 超时自动 token.cancel()。ReAct 循环是同步的,异步框架的价值在于超时+取消控制,而非真正异步返回。

示例实现:AsyncDataExportToolhub-agent-core/.../react/async/example/),模拟分批数据导出,每批检查 token.throwIfCancelled(),超时 60 秒。

状态感知工具框架(StateAwareToolCallback)

支持工具在多轮对话间维持状态,通过 3 层回退机制保证状态不丢失。

组件职责
StateAwareToolCallback接口,call(name, args, state) 签名带 ToolStateAccessor 参数
ToolStateAccessor3 层状态访问器:pendingWrites → graphState → localStore
ToolStateCollector@Component 收集器,提供 isStateAware() / executeWithState();内置 fallbackState 保证非 Graph 模式下跨轮次状态持久化

3 层状态回退:读取时优先从 pendingWrites(本次执行写入) → graphState(跨工具共享) → localStore(本地内存后备)。写入时通过 state.put() 同时更新 pendingWriteslocalStoregraphState(如存在)。当 executeToolCallInternal() 传入的 graphState 为 null 时(非 Graph 模式),ToolStateCollector 使用内置的 fallbackState(单例 GraphState)确保跨轮次状态不丢失。

示例实现:StatefulFormFillerToolhub-agent-core/.../react/state/example/),跨轮次渐进收集表单字段(姓名→电话→地址),通过 state.put("form_name", value) 保存,state.getString("form_name") 读取。

工具路由优先级

executeToolCallInternal() 按以下顺序路由:状态感知工具 → 异步工具 → Skill → Shell → NOS → 记忆 → MCP。状态感知和异步工具按名称精确匹配(不依赖前缀),其他工具按前缀匹配。

十三、面试高频问题(续)

Q: AbstractLlmAgent的executeReactIteration()方法流程是什么?
A: 完整流程共10步:1.加载长期记忆 → 2.构建6层system prompt → 3.Hook: BEFORE_LLM_CALL → 4.Interceptor: beforeModel → 5.调用ChatModel → 6.Interceptor: afterModel → 7.Hook: AFTER_LLM_CALL → 8.解析工具调用 → 9.并行执行工具 → 10.提取新记忆。其中步骤3-7形成了"Hook包裹Interceptor包裹Model调用"的三层嵌套结构,步骤9在GRAPH_PARALLEL模式下通过AsyncToolCallback实现真正并行。
Q: 长期记忆的语义检索为什么选择Cosine相似度而非欧氏距离?
A: Cosine度量的是向量方向的一致性(语义相关性),对向量长度不敏感。欧氏距离受向量模长影响,在高维空间中差异不明显(维度灾难——高维空间中所有点的欧氏距离趋于相同)。当embedding归一化后,Cosine和欧氏距离在数学上等价(||a-b||² = 2(1-cos(a,b))),但Cosine更直观且计算更高效(只需点积)。
Q: AsyncToolCallback的超时和graph-parallel的超时是什么关系?
A: 两级超时机制互补:工具级超时(async-tool.default-timeout-ms=60s)保护单个工具调用不会无限阻塞;图级超时(总图执行超时)保护整体任务执行时间。工具超时触发时,仅该工具返回错误result,图引擎继续处理其他正常工具的结果;图超时触发时,所有进行中的工具调用被取消(future.cancel(true)),整个图执行终止。
Q: 多模态tool result如何传递给不支持多模态的模型?
A: MultimodalToolResultConverter 自动降级处理:图片 → "[Image: description]" 文本描述;音频 → ASR转写文本;视频 → 关键帧描述文本。降级信息存入message的metadata字段(originalMediaTypeoriginalMediaUrl),供前端展示原始多媒体内容。模型只看到文本,但用户在UI上能看到原始图片/音频/视频。
Q: 3种Runtime模式如何切换?切换时正在执行的任务怎么办?
A: 通过 agent-config.yml 配置 runtime-mode(支持全局默认 + per-agent覆盖)。切换生效时机:新对话使用新模式,进行中的对话保持原模式直到结束(对话级绑定)。这意味着模式切换不会中断任何正在进行的任务,无中断风险。三种模式:LEGACY(传统for循环)、GRAPH(图引擎串行)、GRAPH_PARALLEL(图引擎并行)。
Q: TrackedChatModel的token统计如何处理流式(streaming)场景?
A: 流式场景下token统计延迟到最后一个chunk完成后汇总。usage信息通常在最后一个chunk的metadata中返回(OpenAI和ZhiPu均在stream的最后一个delta中包含usage字段)。如果模型不返回usage(如部分GPT4Free代理),则使用 TokenEstimator 基于文本长度估算(中文约1.5 token/字,英文约0.75 token/word)。估算值标记 estimated=true 以区分精确统计。

🔌 MiniMax 集成 — 项目实战

集成概述

MiniMax 作为一级 LLM 提供商集成到项目中,核心设计亮点是利用 API 协议兼容性——MiniMax 在 /anthropic 路径暴露了 Anthropic 兼容的 Messages API 端点,因此通过 Spring AI 的 AnthropicChatModel 作为适配器接入,无需编写私有客户端。

设计哲学:协议兼容 > 私有集成。复用已有的 Anthropic 协议实现,大幅减少开发和维护成本。

架构层次

  MiniMax 集成架构:

  配置层                  启动初始化                    运行时
    ↓                       ↓                           ↓
  application.yml  ──→  KeyPoolAutoConfiguration  ──→  Agent 请求聊天
  (minimax 配置)         (注册工厂与通道)
                              ↓                           ↓
                        MiniMaxChatModelFactory      PooledChatModel
                        (创建 AnthropicChatModel      (轮询 API Key)
                         实例, baseUrl → MiniMax)
                              ↓                           ↓
                        Spring AI AnthropicApi       处理限速与降级
                              ↓
                  https://api.minimaxi.com/anthropic/...

工厂模式实现

@Component("minimaxChatModelFactory")
public class MiniMaxChatModelFactory implements ChatModelFactory {

    @Override
    public ChatModel create(String apiKey, ChannelConfig config) {
        AnthropicApi api = AnthropicApi.builder()
                .apiKey(apiKey)
                .baseUrl(config.getBaseUrl())  // https://api.minimaxi.com/anthropic
                .build();

        AnthropicChatOptions options = AnthropicChatOptions.builder()
                .model(config.getModel())       // MiniMax-M2.7
                .temperature(config.getTemperature())
                .maxTokens(config.getMaxTokens())
                .build();

        return AnthropicChatModel.builder()
                .anthropicApi(api)
                .defaultOptions(options)
                .build();
    }
}

工厂发现机制provider = "minimax" → 查找 "minimaxChatModelFactory" Bean → 自动匹配。

多协议工厂对比

提供商协议Spring AI 类
ZhiPu GLM原生 ZhiPuZhiPuAiChatModel
DashScope/QwenOpenAI 兼容OpenAiChatModel
DeepSeekOpenAI 兼容OpenAiChatModel
MiniMaxAnthropic 兼容AnthropicChatModel
Ollama (本地)OpenAI 兼容OpenAiChatModel
项目中只有 3 种工厂模式:原生提供商(ZhiPu)、OpenAI 兼容协议、Anthropic 兼容协议。新增同协议提供商只需配置 + 预设,无需新工厂代码。

降级与韧性

  MiniMax 降级流程:

  所有 Key 用尽 / 429 限速
      ↓
  DegradationManager 检查 fallback 配置
      ↓
  channel="minimax" → fallback="deepseek"
      ↓
  切换到 deepseek 通道重试
      ↓
  成功 → 返回 | 失败 → 抛出异常
      ↓
  恢复超时 300 秒后,重新尝试 MiniMax

多层韧性保障:密钥轮转(round-robin)→ 冷却跳过(60s cooldown)→ 降级切换(fallback channel)→ 自动恢复(recovery timeout)。

支持的模型

模型上下文速度说明
MiniMax-M2.7204K标准旗舰通用模型
MiniMax-M2.7-highspeed204K~100 tps旗舰高速版
MiniMax-M2.5204K标准高性价比版本
Q: 为什么 MiniMax 使用 Anthropic 协议而非 OpenAI 兼容协议?
A: 这取决于 MiniMax 服务端暴露的 API 端点。MiniMax 在 /anthropic 路径提供了 Anthropic Messages API 兼容端点,因此项目复用 Spring AI 已有的 AnthropicChatModel 实现。工厂类 MiniMaxChatModelFactory 只需将 baseUrl 指向 MiniMax 的端点,其余逻辑(请求构建、流式解析、工具调用)全部由 Spring AI 的 Anthropic 客户端处理。这体现了"协议兼容 > 私有集成"的设计哲学——零协议适配成本,只需一个工厂类 + YAML 配置。