三种并行级别:工具级并行、子Agent并行、任务分解并行
在AI Agent执行复杂任务时,很多操作天然可以并行:
并行执行可以将响应时间从 T1+T2+T3 压缩到 max(T1,T2,T3),大幅提升效率。
三级并行架构
════════════
Level 1: 工具级并行 (ParallelToolExecutor)
─────────────────────────────────────────
一个LLM响应中包含多个工具调用 → 同时执行
LLM响应
┌───┼───┐
│ │ │
Tool1 Tool2 Tool3 ← CompletableFuture并行
│ │ │
└───┼───┘
│
汇总结果 → 返回LLM继续推理
Level 2: 子Agent并行 (SaaSubAgentGraphBuilder)
──────────────────────────────────────────────
LLM决定需要多个子Agent协作 → 并行派发
LLM响应: dispatch_agents
┌───┼───┐
│ │ │
Agent_A Agent_B Agent_C ← SAA ParallelNode
│ │ │
└───┼───┘
│
Merger汇总 → 返回LLM最终综合
Level 3: 任务分解并行 (TaskDecompExecutor)
──────────────────────────────────────────
将大任务拆分为N个独立子任务 → 并行执行
用户: "分析10份简历"
│
LLM拆分为5个子任务
┌───┼───┐
│ │ │
Task1 Task2 Task3... ← 独立ReAct循环
│ │ │
└───┼───┘
│
汇总所有结果 → 最终输出
ParallelToolExecutor.executeParallel(toolCalls, executor, approvalHandler):
if (toolCalls.size() == 1):
直接执行,无并行开销
return executor.apply(toolCall)
// 分区: 需要审批 vs 可直接执行
┌──────────────────────────────────────────┐
│ 分区阶段 (Partition) │
│ │
│ toolCalls.forEach: │
│ if (tool.requiresApproval): │
│ → serialQueue (串行,需人工审批) │
│ else: │
│ → parallelQueue (可并行执行) │
└──────────────────────────────────────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────────────┐
│ 串行执行区 │ │ 并行执行区 │
│ │ │ │
│ for each: │ │ List<CompletableFuture>│
│ 审批等待 │ │ │
│ ↓ │ │ tool1 → CF.supplyAsync│
│ 用户批准? │ │ tool2 → CF.supplyAsync│
│ ↓ 是 │ │ tool3 → CF.supplyAsync│
│ 执行工具 │ │ │
│ │ │ CF.allOf().join() │
└──────────────┘ └──────────────────────┘
│ │
└────────┬───────────┘
▼
按原始顺序合并结果
// 核心线程: max(CPU*2, 4)
// 最大线程: min(CPU*4, 200)
// 队列容量: 500
// 拒绝策略: CallerRunsPolicy (调用者线程执行)
static ExecutorService POOL = new ThreadPoolExecutor(
max(CPU*2, 4), min(CPU*4, 200),
60, SECONDS,
new LinkedBlockingQueue<>(500),
new CallerRunsPolicy()
);
这是最复杂的并行模式,将ReAct图从4节点扩展到7+节点:
子Agent并行图拓扑 (扩展ReAct)
══════════════════════════════
START
│
▼
┌──────────────┐
│ LLM 节点 │
└───────┬──────┘
│
┌──────────────┐
│ BRANCH 节点 │
└──┬───┬───┬───┘
│ │ │
┌────────┘ │ └────────┐
│ │ │
branch="tool" branch="end" branch="sub_agents"
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ TOOL 节点 │ │ END 节点 │ │ ROUTER │
└────┬─────┘ └──────────┘ │(路由器) │
│ └────┬─────┘
│ │
│ ┌──────────┼──────────┐
│ │ │ │
│ ┌────▼───┐ ┌───▼────┐ ┌───▼────┐
│ │Agent_A │ │Agent_B │ │Agent_C │
│ │(并行) │ │(并行) │ │(并行) │
│ └────┬───┘ └───┬────┘ └───┬────┘
│ │ │ │
│ └─────────┼──────────┘
│ │
│ ┌──────▼──────┐
│ │ MERGER │
│ │ (汇总节点) │
│ └──────┬──────┘
│ │
└────────────────────────┘
│
▼
返回LLM节点 (综合各子Agent结果)
| 模式 | 说明 | 实现方式 |
|---|---|---|
| 静态并行 | 编译时确定要并行哪些子Agent | stateGraph.addEdge(ROUTER, [agent1, agent2, agent3]),SAA自动合成ParallelNode |
| 动态并行 | 运行时由LLM决定调用哪些子Agent | addParallelConditionalEdges,读取state中的sub_agent_dispatch_agents列表动态fan-out |
SubAgentNodeAction.apply(overAllState, config):
1. 从AgentRegistry查找子Agent
2. 从state提取dispatch提示词 (或使用原始用户消息)
3. 构建独立AgentRequest:
- 继承父请求的 userId, modelId, conversationId
- forceReactLegacy = true ← 避免嵌套图复杂性
4. 通过orchestrator.execute(subAgent, subRequest)
5. 收集完整SSE流文本响应
6. 写入state: sub_agent_result_{agentId}
⚠️ 错误隔离: 失败时写入错误描述而非抛异常
确保其他并行子Agent不受影响
SubAgentMergerAction.apply(overAllState, config):
1. 扫描所有 state key 以 "sub_agent_result_" 开头
2. 格式化为结构化文本:
"子Agent {agentId} 的执行结果:
{result}
---"
3. 写入 state: sub_agent_merged_result
4. 清理派发状态 (防止重复触发):
- 删除 sub_agent_dispatch_agents
- 删除 sub_agent_dispatch_prompt
5. 边: MERGER → LLM (回到LLM节点综合输出)
用于将一个大的同质任务(如批量处理)拆分为多个独立子任务并行执行:
TaskDecompExecutor.executeParallel(subTasks, agent, ...):
用户: "帮我分析这10份简历"
│
▼
┌────────────────────────────────────────────┐
│ LLM拆分为N个子任务: │
│ SubTask(id=1, "分析简历A") │
│ SubTask(id=2, "分析简历B") │
│ SubTask(id=3, "分析简历C") │
│ ... │
└────────────────────────────────────────────┘
│
▼
Semaphore(maxConcurrency) 控制并发上限
│
┌────┼────┬────┐
│ │ │ │
▼ ▼ ▼ ▼
Task1 Task2 Task3 Task4 ← 每个都是独立ReAct循环
│ │ │ │
▼ ▼ ▼ ▼
结果1 结果2 结果3 结果4 ← 错误隔离: 失败不影响其他
│ │ │ │
└────┴────┴────┘
│
▼
汇总所有结果 → 最终输出
forceReactLegacy = true — 强制使用传统模式,避免嵌套SAA图taskDecompDisabled = true — 禁止子任务再次分解
// 与ParallelToolExecutor隔离的线程池
// 原因: 子任务运行完整ReAct循环,耗时长得多
// 核心: max(CPU, 4), 最大: min(CPU*2, 64)
// 队列: 100, Keep-alive: 120s
static ExecutorService TASK_POOL = new ThreadPoolExecutor(
max(CPU, 4), min(CPU*2, 64),
120, SECONDS,
new LinkedBlockingQueue<>(100)
);
| 场景 | 串行耗时 | 并行耗时 | 加速比 |
|---|---|---|---|
| 3个工具调用 (各2秒) | 6秒 | 2秒 | 3x |
| 3个子Agent (各5秒) | 15秒 | 5秒 | 3x |
| 10个子任务 (各8秒) | 80秒 | ~16秒(并发5) | 5x |
graph-parallel 模式下,SaaReactGraphBuilder 和 SaaSubAgentGraphBuilder 均支持接受外部 CompileConfig,由 GraphInfraConfiguration 统一创建并注入。
GraphInfraConfiguration.graphCompileConfig() ← hub-api 配置类
│ 聚合: MysqlSaver / ObservationRegistry / Store
▼
AbstractLlmAgent.graphCompileConfig ← @Autowired(required=false)
│ 作为参数传递
▼
SaaReactGraphBuilder.build(..., compileConfig) ← 5 参数重载
或
SaaSubAgentGraphBuilder.build(..., compileConfig) ← 8 参数重载
│
▼
StateGraph.compile(compileConfig) ← SAA 编译时注入
// 标准 4 节点 ReAct 图 — 5 参数重载
SaaReactGraphBuilder.build(
llmExec, toolExec, branchExec, endExec,
graphCompileConfig // CompileConfig,null 时自动回退 MemorySaver
);
// 扩展 7+ 节点图(含子 Agent 并行)— 8 参数重载
SaaSubAgentGraphBuilder.build(
llmExec, toolExec, branchExec, endExec,
subNodes, merger, subConfig,
graphCompileConfig // CompileConfig,null 时自动回退 MemorySaver
);
| 组件 | 说明 | 配置 |
|---|---|---|
| MemorySaver | 内存 Checkpoint(默认) | checkpoint-saver: memory |
| MysqlSaver | MySQL 持久化 Checkpoint | checkpoint-saver: mysql(自动建表) |
| ObservationRegistry | Micrometer 图执行可观测性 | observation-enabled: true |
| Store | 长期记忆存储 | 由 StoreConfiguration 提供 |
agent:
defaults:
graph-infra:
checkpoint-saver: memory # memory | mysql
observation-enabled: true # 图执行 Micrometer 可观测性
diagram-enabled: true # 图拓扑 Mermaid 可视化 API
@Autowired(required = false),任何组件缺失不会导致启动失败。4 参数 build() 方法保留不变(委托调用 5/8 参数重载传 null),确保向后兼容。