02 - Graph执行引擎与Agent任务编排

如何将AI推理过程建模为有向图,实现灵活的任务编排?

一、为什么需要Graph执行引擎?

传统的AI对话是"一问一答"模式:用户发消息 → LLM回复。但复杂的任务需要多步骤推理和工具调用,例如:

这就需要ReAct(Reasoning + Acting)模式:LLM先"思考"要做什么,然后"执行"工具,再"观察"结果,循环往复直到得出最终答案。

Graph执行引擎将这个过程建模为有向图(Directed Graph),每个节点是一个执行步骤,边定义了流转规则。

二、核心概念

Graph执行引擎的核心数据结构:

┌────────────────────────────────────────────────────────┐
│                    CompiledGraph (编译后的图)            │
│                                                        │
│   nodes: Map<String, GraphNode>     ← 节点注册表       │
│   edges: List<GraphEdge>            ← 边列表           │
│   startNodeId: String               ← 起始节点         │
│   endNodeId: String                 ← 终止节点         │
│                                                        │
│   ┌──────────┐  conditionKey    ┌──────────┐           │
│   │ Node "A" │ ───────────────→ │ Node "B" │           │
│   │ executor │  conditionValue  │ executor │           │
│   └──────────┘  = "tool"        └──────────┘           │
│                                                        │
│   GraphNode: nodeId + NodeExecutor (函数式接口)         │
│   GraphEdge: fromNodeId → toNodeId + 可选条件          │
│   GraphState: ConcurrentHashMap + currentNodeId        │
└────────────────────────────────────────────────────────┘
组件类名职责
图定义CompiledGraph不可变的图结构,包含所有节点和边
节点GraphNode执行单元,包装一个 NodeExecutor
GraphEdge有向连接,支持条件路由(conditionKey + conditionValue)
状态GraphStateConcurrentHashMap,线程安全的状态容器,支持字段级更新策略(REPLACE/APPEND/MERGE_MAP)
执行器GraphRuntime驱动图执行的主循环
节点执行器NodeExecutor函数式接口,每个节点的实际逻辑
执行结果NodeExecutionResult包含 nextNodeId/terminal/message
事件GraphEvent可观测性事件(GRAPH_START/END, NODE_START/END/ERROR)

三、ReAct Graph 拓扑结构

本项目使用固定的4节点ReAct拓扑,这是整个Agent系统的核心骨架:

                    ReAct Graph 4节点拓扑
                    ═══════════════════

         ┌─────────────────────────────────────┐
         │              START                   │
         └──────────────┬──────────────────────┘
                        │
                        ▼
              ┌──────────────────┐
              │                  │
              │   LLM 节点       │  ← 调用大语言模型
              │   (推理/生成)     │     获取响应文本或工具调用
              │                  │
              └────────┬─────────┘
                       │
                       ▼
              ┌──────────────────┐
              │                  │
              │  BRANCH 节点     │  ← 分析LLM输出
              │  (分支判断)       │     判断: 有工具调用? 结束?
              │                  │
              └───┬──────────┬───┘
                  │          │
       react.branch      react.branch
         = "tool"          = "end"
                  │          │
                  ▼          ▼
         ┌──────────┐  ┌──────────┐
         │          │  │          │
         │ TOOL 节点 │  │ END 节点 │
         │(工具执行) │  │(输出答案) │
         │          │  │          │
         └────┬─────┘  └──────────┘
              │              │
              │              ▼
              │           最终答案 → SSE推送给用户
              │
              └──→ 返回LLM节点 (循环!)
                   ┌──────────────────┐
                   │   LLM 节点       │ ←── 工具结果注入消息列表
                   │   (继续推理)     │     继续推理或生成最终答案
                   └──────────────────┘
关键设计: 边的条件路由通过 GraphState 中的 react.branch 字段实现。BRANCH节点执行后设置 state.put("react.branch", "tool") 或 "end",Graph引擎根据条件选择下一条边。

四、GraphRuntime 执行算法

GraphRuntime.execute() 是图执行的核心驱动,其算法如下:

GraphRuntime.execute(graph, state, eventConsumer):

  1. state.currentNodeId ??= graph.startNodeId    // 初始化到起始节点
  2. emit(GRAPH_START)                             // 发射开始事件
  3. steps = 0
  4. while (!state.isTerminal() && steps < maxSteps):  // maxSteps默认64
     │
     ├── a. node = graph.nodes[state.currentNodeId]     // 查找当前节点
     │      if (node == null) → markFailed → break
     │
     ├── b. emit(NODE_START, nodeId)                    // 可观测性
     │
     ├── c. result = node.executor.execute(state)       // ★ 执行节点逻辑
     │      duration = 计时
     │
     ├── d. if (result.isTerminal()) → state.markTerminal → break
     │
     ├── e. nextNodeId = result.nextNodeId              // 确定下一节点
     │      ?? graph.resolveNextNode(currentNodeId, state)  // 边匹配
     │
     ├── f. if (nextNodeId == null) → state.markTerminal → break
     │
     └── g. state.setCurrentNodeId(nextNodeId)          // 前进到下一节点
            steps++

  5. emit(GRAPH_END)                               // 发射结束事件
  6. return state

五、边匹配算法(条件路由)

CompiledGraph.resolveNextNode() 的匹配逻辑:

resolveNextNode(currentNodeId, state):
  for each edge in edges:                        // 按添加顺序遍历
    if edge.fromNodeId == currentNodeId:         // 匹配来源节点
      if edge.conditionKey == null:              // 无条件边 → 直接匹配
        return edge.toNodeId
      if state.get(edge.conditionKey)            // 有条件边 → 检查状态值
         == edge.conditionValue (忽略大小写):
        return edge.toNodeId                     // 首个匹配即返回
  return null                                    // 无匹配 → 终止

这个"首个匹配"策略意味着边的添加顺序很重要,先添加的边优先匹配。

六、SAA桥接层 — 对接Spring AI Alibaba

除了自研的GraphRuntime,项目还桥接了Spring AI Alibaba (SAA)的图引擎,以获得并行执行能力:

桥接层架构:

┌─────────────────────────────────────────────────────┐
│                   自研Graph层                        │
│   GraphRuntime / CompiledGraph / GraphState          │
│   NodeExecutor / GraphEdge / GraphEvent              │
└──────────────────────┬──────────────────────────────┘
                       │ SaaGraphAdapter (双向状态适配)
                       │
┌──────────────────────▼──────────────────────────────┐
│               SAA Graph层 (Spring AI Alibaba)        │
│   StateGraph / OverAllState / AsyncNodeAction        │
│   ParallelNode / MemorySaver                         │
│                                                      │
│   优势: 原生支持并行节点、Checkpoint持久化           │
└──────────────────────────────────────────────────────┘

SaaGraphAdapter 状态桥接

桥接的核心挑战是两种状态模型的互转:

自研 GraphState                    SAA OverAllState
┌──────────────────┐              ┌──────────────────┐
│ ConcurrentHashMap │──桥接键───→│ __bridge_graph_state__
│ currentNodeId     │              │ __bridge_next_node__
│ terminal          │              │ __bridge_terminal__
│ react.branch      │              │ __bridge_terminal_msg__
│ 自定义数据        │              │ + SAA原生字段
└──────────────────┘ ←────────── └──────────────────┘
                    双向同步

七、Agent编排器 — 任务调度中心

AgentTaskOrchestrator 是整个Agent系统的调度中枢:

AgentTaskOrchestrator 核心职责:

  ┌──────────────┐
  │  execute()   │─── 主入口
  └──────┬───────┘
         │
    ┌────▼────────────────────────────────────┐
    │  1. 熔断检查 (AgentCircuitBreaker)       │
    │     窗口:1分钟, 最低5次请求, 失败率>50%   │
    │     → 熔断打开 → 直接拒绝                │
    │     → 30秒后半开 → 试探性放行             │
    │                                          │
    │  2. 用户限流 (UserRateLimiter)            │
    │     每分钟30次 / 最大5并发                │
    │                                          │
    │  3. BEFORE Hook                          │
    │     插件可拦截/修改请求                   │
    │                                          │
    │  4. 创建 AgentTask                       │
    │     PENDING → RUNNING                    │
    │     持久化到DB                           │
    │                                          │
    │  5. 提交到 AgentExecutor                 │
    │     优先级线程池 + 双层信号量             │
    └────┬────────────────────────────────────┘
         │
    ┌────▼────────────────────────────────────┐
    │  executeWithRetry()                      │
    │                                          │
    │  · 创建 SSE Sink (客户端推送管道)        │
    │  · 设置双层超时:                         │
    │    - 活动超时: 每个SSE事件重置            │
    │    - 硬超时: 3x活动超时, 不可重置         │
    │  · 提交到线程池执行                      │
    │  · 成功: subscribeWithProtection包装     │
    │  · 失败: 指数退避重试 (500ms~5s)         │
    └──────────────────────────────────────────┘

八、三种执行模式对比

模式枚举值执行方式特点适用场景
传统模式 LEGACY 简单for循环 实现简单,稳定可靠 简单对话、不需要并行
Graph模式 GRAPH 自研Graph状态机 结构化、可观测、支持条件分支 需要复杂流程控制
并行Graph模式 GRAPH_PARALLEL SAA Graph + 并行节点 支持工具并行、子Agent并行、任务分解 需要高性能并行处理

九、GraphState 字段级更新策略

GraphState 支持为特定字段注册更新策略(UpdateStrategy),控制数据的合并行为:

策略行为适用场景
REPLACE新值直接替换旧值(默认)普通字段
APPEND新值追加到列表末尾消息历史、工具结果
MERGE_MAP新 Map 浅合并到旧 Map元数据、配置信息

AbstractLlmAgent 在创建 GraphState 时自动注册:tool_results → APPENDmetadata → MERGE_MAP。通过 merge() 方法写入的数据按策略合并,put() 方法仍直接覆盖(向后兼容)。mergeAll(Map) 支持批量合并。

十、面试高频问题

Q: 为什么要把ReAct建模为Graph而不是简单的for循环?
A: Graph模式相比for循环有三个核心优势:1) 可视化可追踪:每个节点的执行都可以发射事件(NODE_START/END),方便监控和调试;2) 灵活路由:通过条件边可以轻松扩展新的分支(如sub_agents、task_decomp),不需要修改主循环逻辑;3) 可持久化:GraphState可以序列化到DB,支持暂停/恢复(HITL审批场景),for循环做不到这一点。
Q: Graph引擎的maxSteps为什么设为64?
A: 防止ReAct死循环。LLM可能在工具调用和推理之间无限循环。64步已经远超正常需求(大多数Agent在3-5步内完成),但能在异常情况下及时终止。这是一个经验值,可以通过配置调整。
Q: 桥接SAA的Graph引擎解决了什么问题?
A: 自研GraphRuntime是串行执行节点的,无法并行。而SAA的StateGraph原生支持ParallelNode(并行节点),可以在图级别实现多个子节点同时执行。这对于"同时调用多个工具"或"同时分发到多个子Agent"的场景至关重要,可以大幅减少响应时间。桥接层通过SaaGraphAdapter实现了自研NodeExecutor和SAA AsyncNodeAction的双向转换,复用了已有的节点逻辑。

图基础设施配置(Graph Infrastructure)

graph-parallel 模式下,StateGraph 的编译配置(CompileConfig)支持外部注入多种基础设施组件,通过 GraphInfraConfiguration 统一管理。

CompileConfig 注入链路

GraphInfraConfiguration.graphCompileConfig()       ← hub-api 配置类
    │  聚合: MysqlSaver / ObservationRegistry / Store
    ▼
AbstractLlmAgent.graphCompileConfig                ← @Autowired(required=false)
    │  作为参数传递
    ▼
SaaReactGraphBuilder.build(..., compileConfig)     ← 5 参数重载
    或
SaaSubAgentGraphBuilder.build(..., compileConfig)  ← 8 参数重载
    │
    ▼
StateGraph.compile(compileConfig)                  ← SAA 编译时注入

支持的基础设施组件

组件说明配置
MemorySaver内存 Checkpoint(默认)checkpoint-saver: memory
MysqlSaverMySQL 持久化 Checkpointcheckpoint-saver: mysql(自动建表)
RedisCheckpointSaverRedis 持久化 Checkpointcheckpoint-saver: redis(需配置 spring.data.redis
ObservationRegistryMicrometer 可观测性observation-enabled: true
Store长期记忆存储由 StoreConfiguration 提供 DatabaseStore

配置示例

agent:
  defaults:
    graph-infra:
      checkpoint-saver: memory    # memory | mysql | redis
      observation-enabled: true   # 图执行 Micrometer 可观测性
      diagram-enabled: true       # 图拓扑 Mermaid 可视化 API

图拓扑可视化 API

端点说明
GET /api/admin/agents/{agentId}/diagram单个 Agent 图拓扑(Mermaid)
GET /api/admin/agents/diagrams所有 Agent 图拓扑
GET /api/admin/agents/graph-infra-status图基础设施配置总览
安全设计:CompileConfig Bean 所有依赖均为 @Autowired(required = false),任何组件缺失不会导致启动失败。checkpoint-saver 默认 memory,与无配置时行为完全一致。