03 - 并行执行引擎

三种并行级别:工具级并行、子Agent并行、任务分解并行

一、为什么需要并行执行?

在AI Agent执行复杂任务时,很多操作天然可以并行:

并行执行可以将响应时间从 T1+T2+T3 压缩到 max(T1,T2,T3),大幅提升效率。

二、三级并行架构总览

                三级并行架构
                ════════════

  Level 1: 工具级并行 (ParallelToolExecutor)
  ─────────────────────────────────────────
  一个LLM响应中包含多个工具调用 → 同时执行

         LLM响应
        ┌───┼───┐
        │   │   │
     Tool1 Tool2 Tool3     ← CompletableFuture并行
        │   │   │
        └───┼───┘
            │
        汇总结果 → 返回LLM继续推理

  Level 2: 子Agent并行 (SaaSubAgentGraphBuilder)
  ──────────────────────────────────────────────
  LLM决定需要多个子Agent协作 → 并行派发

         LLM响应: dispatch_agents
        ┌───┼───┐
        │   │   │
    Agent_A Agent_B Agent_C  ← SAA ParallelNode
        │   │   │
        └───┼───┘
            │
        Merger汇总 → 返回LLM最终综合

  Level 3: 任务分解并行 (TaskDecompExecutor)
  ──────────────────────────────────────────
  将大任务拆分为N个独立子任务 → 并行执行

         用户: "分析10份简历"
            │
         LLM拆分为5个子任务
        ┌───┼───┐
        │   │   │
    Task1 Task2 Task3...     ← 独立ReAct循环
        │   │   │
        └───┼───┘
            │
        汇总所有结果 → 最终输出

三、Level 1 — 工具级并行 (ParallelToolExecutor)

算法流程

ParallelToolExecutor.executeParallel(toolCalls, executor, approvalHandler):

  if (toolCalls.size() == 1):
      直接执行,无并行开销
      return executor.apply(toolCall)

  // 分区: 需要审批 vs 可直接执行
  ┌──────────────────────────────────────────┐
  │  分区阶段 (Partition)                    │
  │                                          │
  │  toolCalls.forEach:                      │
  │    if (tool.requiresApproval):           │
  │      → serialQueue (串行,需人工审批)    │
  │    else:                                 │
  │      → parallelQueue (可并行执行)        │
  └──────────────────────────────────────────┘
         │                    │
         ▼                    ▼
  ┌──────────────┐   ┌──────────────────────┐
  │ 串行执行区    │   │ 并行执行区            │
  │              │   │                      │
  │ for each:    │   │ List<CompletableFuture>│
  │  审批等待     │   │                      │
  │  ↓           │   │ tool1 → CF.supplyAsync│
  │  用户批准?    │   │ tool2 → CF.supplyAsync│
  │  ↓ 是        │   │ tool3 → CF.supplyAsync│
  │  执行工具     │   │                      │
  │              │   │ CF.allOf().join()     │
  └──────────────┘   └──────────────────────┘
         │                    │
         └────────┬───────────┘
                  ▼
         按原始顺序合并结果

线程池配置

// 核心线程: max(CPU*2, 4)
// 最大线程: min(CPU*4, 200)
// 队列容量: 500
// 拒绝策略: CallerRunsPolicy (调用者线程执行)
static ExecutorService POOL = new ThreadPoolExecutor(
    max(CPU*2, 4), min(CPU*4, 200),
    60, SECONDS,
    new LinkedBlockingQueue<>(500),
    new CallerRunsPolicy()
);

四、Level 2 — 子Agent并行 (SaaSubAgentGraphBuilder)

这是最复杂的并行模式,将ReAct图从4节点扩展到7+节点:

            子Agent并行图拓扑 (扩展ReAct)
            ══════════════════════════════

                    START
                      │
                      ▼
              ┌──────────────┐
              │   LLM 节点    │
              └───────┬──────┘
                      │
              ┌──────────────┐
              │  BRANCH 节点  │
              └──┬───┬───┬───┘
                 │   │   │
        ┌────────┘   │   └────────┐
        │            │            │
   branch="tool" branch="end" branch="sub_agents"
        │            │            │
        ▼            ▼            ▼
  ┌──────────┐ ┌──────────┐ ┌──────────┐
  │ TOOL 节点 │ │ END 节点  │ │ ROUTER   │
  └────┬─────┘ └──────────┘ │(路由器)   │
       │                    └────┬─────┘
       │                         │
       │              ┌──────────┼──────────┐
       │              │          │          │
       │         ┌────▼───┐ ┌───▼────┐ ┌───▼────┐
       │         │Agent_A │ │Agent_B │ │Agent_C │
       │         │(并行)  │ │(并行)  │ │(并行)  │
       │         └────┬───┘ └───┬────┘ └───┬────┘
       │              │         │          │
       │              └─────────┼──────────┘
       │                        │
       │                 ┌──────▼──────┐
       │                 │   MERGER    │
       │                 │ (汇总节点)  │
       │                 └──────┬──────┘
       │                        │
       └────────────────────────┘
                      │
                      ▼
              返回LLM节点 (综合各子Agent结果)

静态 vs 动态并行

模式说明实现方式
静态并行 编译时确定要并行哪些子Agent stateGraph.addEdge(ROUTER, [agent1, agent2, agent3]),SAA自动合成ParallelNode
动态并行 运行时由LLM决定调用哪些子Agent addParallelConditionalEdges,读取state中的sub_agent_dispatch_agents列表动态fan-out

SubAgentNodeAction — 单个子Agent执行

SubAgentNodeAction.apply(overAllState, config):

  1. 从AgentRegistry查找子Agent
  2. 从state提取dispatch提示词 (或使用原始用户消息)
  3. 构建独立AgentRequest:
     - 继承父请求的 userId, modelId, conversationId
     - forceReactLegacy = true  ← 避免嵌套图复杂性
  4. 通过orchestrator.execute(subAgent, subRequest)
  5. 收集完整SSE流文本响应
  6. 写入state: sub_agent_result_{agentId}

  ⚠️ 错误隔离: 失败时写入错误描述而非抛异常
     确保其他并行子Agent不受影响

SubAgentMergerAction — 结果汇总

SubAgentMergerAction.apply(overAllState, config):

  1. 扫描所有 state key 以 "sub_agent_result_" 开头
  2. 格式化为结构化文本:
     "子Agent {agentId} 的执行结果:
      {result}
      ---"
  3. 写入 state: sub_agent_merged_result
  4. 清理派发状态 (防止重复触发):
     - 删除 sub_agent_dispatch_agents
     - 删除 sub_agent_dispatch_prompt
  5. 边: MERGER → LLM (回到LLM节点综合输出)

五、Level 3 — 任务分解并行 (TaskDecompExecutor)

用于将一个大的同质任务(如批量处理)拆分为多个独立子任务并行执行:

TaskDecompExecutor.executeParallel(subTasks, agent, ...):

  用户: "帮我分析这10份简历"
         │
         ▼
  ┌────────────────────────────────────────────┐
  │ LLM拆分为N个子任务:                         │
  │ SubTask(id=1, "分析简历A")                  │
  │ SubTask(id=2, "分析简历B")                  │
  │ SubTask(id=3, "分析简历C")                  │
  │ ...                                         │
  └────────────────────────────────────────────┘
         │
         ▼
  Semaphore(maxConcurrency) 控制并发上限
         │
    ┌────┼────┬────┐
    │    │    │    │
    ▼    ▼    ▼    ▼
  Task1 Task2 Task3 Task4  ← 每个都是独立ReAct循环
    │    │    │    │
    ▼    ▼    ▼    ▼
  结果1 结果2 结果3 结果4  ← 错误隔离: 失败不影响其他
    │    │    │    │
    └────┴────┴────┘
         │
         ▼
    汇总所有结果 → 最终输出

三重防递归机制

重要设计: 每个子任务的AgentRequest设置了三重防递归标志:
1. forceReactLegacy = true — 强制使用传统模式,避免嵌套SAA图
2. taskDecompDisabled = true — 禁止子任务再次分解
3. 清空历史消息 — 避免上下文膨胀
这确保了任务分解不会产生无限递归。

独立线程池隔离

// 与ParallelToolExecutor隔离的线程池
// 原因: 子任务运行完整ReAct循环,耗时长得多
// 核心: max(CPU, 4),  最大: min(CPU*2, 64)
// 队列: 100, Keep-alive: 120s
static ExecutorService TASK_POOL = new ThreadPoolExecutor(
    max(CPU, 4), min(CPU*2, 64),
    120, SECONDS,
    new LinkedBlockingQueue<>(100)
);

六、性能对比

场景串行耗时并行耗时加速比
3个工具调用 (各2秒)6秒2秒3x
3个子Agent (各5秒)15秒5秒3x
10个子任务 (各8秒)80秒~16秒(并发5)5x

七、面试高频问题

Q: 三级并行各适用于什么场景?
A: 工具级并行适用于LLM一次返回多个工具调用(如同时搜索3个关键词);子Agent并行适用于需要不同专业Agent协作的场景(如翻译+总结+分析);任务分解并行适用于批量处理同质任务(如筛选100份简历)。三者可以嵌套组合使用。
Q: 并行执行时如何保证错误隔离?
A: 三个层面:1) 每个并行任务在独立的CompletableFuture中执行,异常不会传播到其他任务;2) SubAgentNodeAction捕获所有异常并写入错误描述到state,而非抛出;3) TaskDecompExecutor的每个子任务设置taskDecompDisabled=true,防止递归分解导致雪崩。
Q: 为什么工具级和任务分解使用不同的线程池?
A: 工具调用通常很快(毫秒到秒级),但任务分解的每个子任务是完整的ReAct推理循环(可能数十秒到分钟级)。如果共用线程池,长时间运行的任务会占满线程,导致工具调用排队等待。隔离后各自独立伸缩,互不影响。

八、图基础设施与 CompileConfig 外部注入

graph-parallel 模式下,SaaReactGraphBuilderSaaSubAgentGraphBuilder 均支持接受外部 CompileConfig,由 GraphInfraConfiguration 统一创建并注入。

CompileConfig 注入链路

GraphInfraConfiguration.graphCompileConfig()       ← hub-api 配置类
    │  聚合: MysqlSaver / ObservationRegistry / Store
    ▼
AbstractLlmAgent.graphCompileConfig                ← @Autowired(required=false)
    │  作为参数传递
    ▼
SaaReactGraphBuilder.build(..., compileConfig)     ← 5 参数重载
    或
SaaSubAgentGraphBuilder.build(..., compileConfig)  ← 8 参数重载
    │
    ▼
StateGraph.compile(compileConfig)                  ← SAA 编译时注入

Builder 重载方法

// 标准 4 节点 ReAct 图 — 5 参数重载
SaaReactGraphBuilder.build(
    llmExec, toolExec, branchExec, endExec,
    graphCompileConfig    // CompileConfig,null 时自动回退 MemorySaver
);

// 扩展 7+ 节点图(含子 Agent 并行)— 8 参数重载
SaaSubAgentGraphBuilder.build(
    llmExec, toolExec, branchExec, endExec,
    subNodes, merger, subConfig,
    graphCompileConfig    // CompileConfig,null 时自动回退 MemorySaver
);

支持的基础设施组件

组件说明配置
MemorySaver内存 Checkpoint(默认)checkpoint-saver: memory
MysqlSaverMySQL 持久化 Checkpointcheckpoint-saver: mysql(自动建表)
ObservationRegistryMicrometer 图执行可观测性observation-enabled: true
Store长期记忆存储StoreConfiguration 提供

配置示例

agent:
  defaults:
    graph-infra:
      checkpoint-saver: memory    # memory | mysql
      observation-enabled: true   # 图执行 Micrometer 可观测性
      diagram-enabled: true       # 图拓扑 Mermaid 可视化 API
安全设计:CompileConfig Bean 所有依赖均为 @Autowired(required = false),任何组件缺失不会导致启动失败。4 参数 build() 方法保留不变(委托调用 5/8 参数重载传 null),确保向后兼容。