如何将AI推理过程建模为有向图,实现灵活的任务编排?
传统的AI对话是"一问一答"模式:用户发消息 → LLM回复。但复杂的任务需要多步骤推理和工具调用,例如:
这就需要ReAct(Reasoning + Acting)模式:LLM先"思考"要做什么,然后"执行"工具,再"观察"结果,循环往复直到得出最终答案。
Graph执行引擎将这个过程建模为有向图(Directed Graph),每个节点是一个执行步骤,边定义了流转规则。
Graph执行引擎的核心数据结构: ┌────────────────────────────────────────────────────────┐ │ CompiledGraph (编译后的图) │ │ │ │ nodes: Map<String, GraphNode> ← 节点注册表 │ │ edges: List<GraphEdge> ← 边列表 │ │ startNodeId: String ← 起始节点 │ │ endNodeId: String ← 终止节点 │ │ │ │ ┌──────────┐ conditionKey ┌──────────┐ │ │ │ Node "A" │ ───────────────→ │ Node "B" │ │ │ │ executor │ conditionValue │ executor │ │ │ └──────────┘ = "tool" └──────────┘ │ │ │ │ GraphNode: nodeId + NodeExecutor (函数式接口) │ │ GraphEdge: fromNodeId → toNodeId + 可选条件 │ │ GraphState: ConcurrentHashMap + currentNodeId │ └────────────────────────────────────────────────────────┘
| 组件 | 类名 | 职责 |
|---|---|---|
| 图定义 | CompiledGraph | 不可变的图结构,包含所有节点和边 |
| 节点 | GraphNode | 执行单元,包装一个 NodeExecutor |
| 边 | GraphEdge | 有向连接,支持条件路由(conditionKey + conditionValue) |
| 状态 | GraphState | ConcurrentHashMap,线程安全的状态容器,支持字段级更新策略(REPLACE/APPEND/MERGE_MAP) |
| 执行器 | GraphRuntime | 驱动图执行的主循环 |
| 节点执行器 | NodeExecutor | 函数式接口,每个节点的实际逻辑 |
| 执行结果 | NodeExecutionResult | 包含 nextNodeId/terminal/message |
| 事件 | GraphEvent | 可观测性事件(GRAPH_START/END, NODE_START/END/ERROR) |
本项目使用固定的4节点ReAct拓扑,这是整个Agent系统的核心骨架:
ReAct Graph 4节点拓扑
═══════════════════
┌─────────────────────────────────────┐
│ START │
└──────────────┬──────────────────────┘
│
▼
┌──────────────────┐
│ │
│ LLM 节点 │ ← 调用大语言模型
│ (推理/生成) │ 获取响应文本或工具调用
│ │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ │
│ BRANCH 节点 │ ← 分析LLM输出
│ (分支判断) │ 判断: 有工具调用? 结束?
│ │
└───┬──────────┬───┘
│ │
react.branch react.branch
= "tool" = "end"
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ │ │ │
│ TOOL 节点 │ │ END 节点 │
│(工具执行) │ │(输出答案) │
│ │ │ │
└────┬─────┘ └──────────┘
│ │
│ ▼
│ 最终答案 → SSE推送给用户
│
└──→ 返回LLM节点 (循环!)
┌──────────────────┐
│ LLM 节点 │ ←── 工具结果注入消息列表
│ (继续推理) │ 继续推理或生成最终答案
└──────────────────┘
react.branch 字段实现。BRANCH节点执行后设置 state.put("react.branch", "tool") 或 "end",Graph引擎根据条件选择下一条边。
GraphRuntime.execute() 是图执行的核心驱动,其算法如下:
GraphRuntime.execute(graph, state, eventConsumer):
1. state.currentNodeId ??= graph.startNodeId // 初始化到起始节点
2. emit(GRAPH_START) // 发射开始事件
3. steps = 0
4. while (!state.isTerminal() && steps < maxSteps): // maxSteps默认64
│
├── a. node = graph.nodes[state.currentNodeId] // 查找当前节点
│ if (node == null) → markFailed → break
│
├── b. emit(NODE_START, nodeId) // 可观测性
│
├── c. result = node.executor.execute(state) // ★ 执行节点逻辑
│ duration = 计时
│
├── d. if (result.isTerminal()) → state.markTerminal → break
│
├── e. nextNodeId = result.nextNodeId // 确定下一节点
│ ?? graph.resolveNextNode(currentNodeId, state) // 边匹配
│
├── f. if (nextNodeId == null) → state.markTerminal → break
│
└── g. state.setCurrentNodeId(nextNodeId) // 前进到下一节点
steps++
5. emit(GRAPH_END) // 发射结束事件
6. return state
CompiledGraph.resolveNextNode() 的匹配逻辑:
resolveNextNode(currentNodeId, state):
for each edge in edges: // 按添加顺序遍历
if edge.fromNodeId == currentNodeId: // 匹配来源节点
if edge.conditionKey == null: // 无条件边 → 直接匹配
return edge.toNodeId
if state.get(edge.conditionKey) // 有条件边 → 检查状态值
== edge.conditionValue (忽略大小写):
return edge.toNodeId // 首个匹配即返回
return null // 无匹配 → 终止
这个"首个匹配"策略意味着边的添加顺序很重要,先添加的边优先匹配。
除了自研的GraphRuntime,项目还桥接了Spring AI Alibaba (SAA)的图引擎,以获得并行执行能力:
桥接层架构:
┌─────────────────────────────────────────────────────┐
│ 自研Graph层 │
│ GraphRuntime / CompiledGraph / GraphState │
│ NodeExecutor / GraphEdge / GraphEvent │
└──────────────────────┬──────────────────────────────┘
│ SaaGraphAdapter (双向状态适配)
│
┌──────────────────────▼──────────────────────────────┐
│ SAA Graph层 (Spring AI Alibaba) │
│ StateGraph / OverAllState / AsyncNodeAction │
│ ParallelNode / MemorySaver │
│ │
│ 优势: 原生支持并行节点、Checkpoint持久化 │
└──────────────────────────────────────────────────────┘
桥接的核心挑战是两种状态模型的互转:
自研 GraphState SAA OverAllState
┌──────────────────┐ ┌──────────────────┐
│ ConcurrentHashMap │──桥接键───→│ __bridge_graph_state__
│ currentNodeId │ │ __bridge_next_node__
│ terminal │ │ __bridge_terminal__
│ react.branch │ │ __bridge_terminal_msg__
│ 自定义数据 │ │ + SAA原生字段
└──────────────────┘ ←────────── └──────────────────┘
双向同步
AgentTaskOrchestrator 是整个Agent系统的调度中枢:
AgentTaskOrchestrator 核心职责:
┌──────────────┐
│ execute() │─── 主入口
└──────┬───────┘
│
┌────▼────────────────────────────────────┐
│ 1. 熔断检查 (AgentCircuitBreaker) │
│ 窗口:1分钟, 最低5次请求, 失败率>50% │
│ → 熔断打开 → 直接拒绝 │
│ → 30秒后半开 → 试探性放行 │
│ │
│ 2. 用户限流 (UserRateLimiter) │
│ 每分钟30次 / 最大5并发 │
│ │
│ 3. BEFORE Hook │
│ 插件可拦截/修改请求 │
│ │
│ 4. 创建 AgentTask │
│ PENDING → RUNNING │
│ 持久化到DB │
│ │
│ 5. 提交到 AgentExecutor │
│ 优先级线程池 + 双层信号量 │
└────┬────────────────────────────────────┘
│
┌────▼────────────────────────────────────┐
│ executeWithRetry() │
│ │
│ · 创建 SSE Sink (客户端推送管道) │
│ · 设置双层超时: │
│ - 活动超时: 每个SSE事件重置 │
│ - 硬超时: 3x活动超时, 不可重置 │
│ · 提交到线程池执行 │
│ · 成功: subscribeWithProtection包装 │
│ · 失败: 指数退避重试 (500ms~5s) │
└──────────────────────────────────────────┘
| 模式 | 枚举值 | 执行方式 | 特点 | 适用场景 |
|---|---|---|---|---|
| 传统模式 | LEGACY |
简单for循环 | 实现简单,稳定可靠 | 简单对话、不需要并行 |
| Graph模式 | GRAPH |
自研Graph状态机 | 结构化、可观测、支持条件分支 | 需要复杂流程控制 |
| 并行Graph模式 | GRAPH_PARALLEL |
SAA Graph + 并行节点 | 支持工具并行、子Agent并行、任务分解 | 需要高性能并行处理 |
GraphState 支持为特定字段注册更新策略(UpdateStrategy),控制数据的合并行为:
| 策略 | 行为 | 适用场景 |
|---|---|---|
REPLACE | 新值直接替换旧值(默认) | 普通字段 |
APPEND | 新值追加到列表末尾 | 消息历史、工具结果 |
MERGE_MAP | 新 Map 浅合并到旧 Map | 元数据、配置信息 |
AbstractLlmAgent 在创建 GraphState 时自动注册:tool_results → APPEND、metadata → MERGE_MAP。通过 merge() 方法写入的数据按策略合并,put() 方法仍直接覆盖(向后兼容)。mergeAll(Map) 支持批量合并。
graph-parallel 模式下,StateGraph 的编译配置(CompileConfig)支持外部注入多种基础设施组件,通过 GraphInfraConfiguration 统一管理。
GraphInfraConfiguration.graphCompileConfig() ← hub-api 配置类
│ 聚合: MysqlSaver / ObservationRegistry / Store
▼
AbstractLlmAgent.graphCompileConfig ← @Autowired(required=false)
│ 作为参数传递
▼
SaaReactGraphBuilder.build(..., compileConfig) ← 5 参数重载
或
SaaSubAgentGraphBuilder.build(..., compileConfig) ← 8 参数重载
│
▼
StateGraph.compile(compileConfig) ← SAA 编译时注入
| 组件 | 说明 | 配置 |
|---|---|---|
| MemorySaver | 内存 Checkpoint(默认) | checkpoint-saver: memory |
| MysqlSaver | MySQL 持久化 Checkpoint | checkpoint-saver: mysql(自动建表) |
| RedisCheckpointSaver | Redis 持久化 Checkpoint | checkpoint-saver: redis(需配置 spring.data.redis) |
| ObservationRegistry | Micrometer 可观测性 | observation-enabled: true |
| Store | 长期记忆存储 | 由 StoreConfiguration 提供 DatabaseStore |
agent:
defaults:
graph-infra:
checkpoint-saver: memory # memory | mysql | redis
observation-enabled: true # 图执行 Micrometer 可观测性
diagram-enabled: true # 图拓扑 Mermaid 可视化 API
| 端点 | 说明 |
|---|---|
GET /api/admin/agents/{agentId}/diagram | 单个 Agent 图拓扑(Mermaid) |
GET /api/admin/agents/diagrams | 所有 Agent 图拓扑 |
GET /api/admin/agents/graph-infra-status | 图基础设施配置总览 |
@Autowired(required = false),任何组件缺失不会导致启动失败。checkpoint-saver 默认 memory,与无配置时行为完全一致。