4 种编排模式 — Sequential / Parallel / Loop / LLM Routing
复杂任务往往需要多个专业化 Agent 协同完成。例如:一个"市场分析报告"任务可能需要数据采集 Agent、数据分析 Agent、报告撰写 Agent 依次工作。编排引擎就是协调这些 Agent 执行顺序、并发方式和路由决策的核心组件。
| 模式 | 类 | 执行方式 | Agent 数量 | 典型场景 |
|---|---|---|---|---|
| Sequential | SequentialOrchestration | A → B → C 串行管线 | ≥ 2 | 多步骤流水线(采集→分析→报告) |
| Parallel | ParallelOrchestration | Fan-out 并行 → 合并结果 | ≥ 2 | 多维度并行分析(情感+关键词+摘要) |
| Loop | LoopOrchestration | 循环执行直到满足退出条件 | 1 | 迭代优化(代码审查→修改→再审查) |
| LLM Routing | LlmRoutingOrchestration | LLM 智能选择目标 Agent | ≥ 2 | 意图路由(客服→技术/销售/投诉) |
最经典的管线模式,每个 Agent 的输出作为下一个 Agent 的输入,依次执行。
Sequential 执行流程:
用户消息
│
▼
┌─────────┐ output ┌─────────┐ output ┌─────────┐
│ Agent A │ ──────────→ │ Agent B │ ──────────→ │ Agent C │
│ (采集) │ │ (分析) │ │ (报告) │
└─────────┘ └─────────┘ └─────────┘
│
▼
最终结果
SSE 事件流:
┌─────────────────────────────────────────────────────────┐
│ event: orchestration_step │
│ data: {"step":1,"agent":"Agent A","status":"running"} │
│ │
│ event: orchestration_step │
│ data: {"step":1,"agent":"Agent A","status":"completed"} │
│ │
│ event: orchestration_step │
│ data: {"step":2,"agent":"Agent B","status":"running"} │
│ ... │
└─────────────────────────────────────────────────────────┘
Fan-out / Fan-in 模式:将同一个消息发给所有子 Agent 并行处理,最后通过 ResultMerger 合并结果。
Parallel 执行流程:
用户消息
│
┌─────────────┼─────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│ (情感分析) │ │ (关键词) │ │ (摘要) │
└─────┬────┘ └─────┬────┘ └─────┬────┘
│ │ │
└─────────────┼─────────────┘
▼
┌─────────────┐
│ ResultMerger │ ← 合并所有结果
└──────┬──────┘
▼
最终结果
| 配置 | 值 | 说明 |
|---|---|---|
| 线程池 | PARALLEL_EXECUTOR | max(4, CPU核心数) 线程 |
| 超时 | 300 秒 | 单个 Agent 最大执行时间 |
| 错误隔离 | try-catch 包装 | 单个 Agent 失败不影响其他 Agent |
| 结果合并 | ResultMerger | 默认换行拼接,可自定义 |
单 Agent 循环执行,每轮将上一轮输出作为新输入,直到满足退出条件或达到最大迭代次数。
Loop 执行流程:
用户消息 ──→ ┌──────────────────────────────────┐
│ Loop Controller │
│ │
│ iteration = 0 │
│ ┌────────────────────────────┐ │
│ │ Agent 执行 │ │
│ │ input → Agent → output │ │
│ └──────────┬─────────────────┘ │
│ │ │
│ ▼ │
│ exitCondition.test(output)? │
│ ├── true → 退出循环,返回结果 │
│ └── false → iteration++ │
│ │ │
│ ▼ │
│ iteration >= maxIterations(10)? │
│ ├── true → 强制退出 │
│ └── false → 继续循环 ↑ │
└──────────────────────────────────┘
安全边界: maxIterations = 10(防止无限循环)
Predicate<String>,接收 Agent 输出文本,返回 true 表示任务完成可以退出。例如:output -> output.contains("APPROVED")。
由 LLM 根据用户消息内容,智能选择最合适的子 Agent 来处理请求。
LLM Routing 执行流程:
用户消息: "我的订单发错了,要求退款"
│
▼
┌──────────────────────────────────────────────┐
│ LLM 路由决策 │
│ │
│ Routing Prompt: │
│ "根据用户消息选择最合适的 Agent: │
│ - agent_tech: 技术问题处理 │
│ - agent_sales: 销售咨询 │
│ - agent_complaint: 投诉与退款 │
│ 请只返回 agent ID" │
│ │
│ LLM 输出: "agent_complaint" │
└──────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────┐
│ agent_complaint │
│ (投诉与退款 Agent) │
└──────────┬───────────────┘
│
▼
最终结果
// 路由 Prompt 模板(关键片段)
String routingPrompt = String.format(
"根据用户消息选择最合适的Agent处理:\n%s\n" +
"只返回Agent ID,不要其他内容。如果都不合适返回 none",
agentDescriptions // 动态注入各 Agent 的 ID + 描述
);
// 安全处理: 防止 Agent 描述中的特殊字符注入
String safeDesc = Matcher.quoteReplacement(description);
// 降级策略: LLM 返回无法识别的 ID 或 "none"
// → fallback 到列表中第一个 Agent
OrchestrationService 是编排功能的服务入口,负责根据编排类型和子 Agent ID 列表构建编排 Agent。
@Service
public class OrchestrationService {
// 根据编排类型创建编排 Agent
public OrchestrationAgent createOrchestration(
OrchestrationTypeEnum type, // SEQUENTIAL / PARALLEL / LOOP / LLM_ROUTING
List<String> agentIds, // 子 Agent ID 列表
OrchestrationConfig config // 可选配置(exitCondition, merger 等)
) {
List<Agent> agents = agentIds.stream()
.map(agentService::getAgent)
.collect(toList());
return switch (type) {
case SEQUENTIAL -> new SequentialOrchestration(agents);
case PARALLEL -> new ParallelOrchestration(agents, config.getMerger());
case LOOP -> new LoopOrchestration(agents.get(0), config);
case LLM_ROUTING-> new LlmRoutingOrchestration(agents, llmService);
};
}
}
编排引擎使用 3 个独立线程池,隔离不同层次的并发需求:
| 线程池 | 线程名前缀 | 大小 | 用途 |
|---|---|---|---|
| ORCHESTRATION_EXECUTOR | orch- | 固定 4 | 编排任务调度(顶层) |
| PARALLEL_EXECUTOR | parallel- | max(4, CPU) | Parallel 模式并行执行子 Agent |
| SUB_AGENT_EXECUTOR | sub-agent- | 固定 8 | 子 Agent 内部异步操作 |
"Agent X failed: timeout"),而不是抛出异常。这样 ResultMerger 收到的是"N 个成功结果 + M 个错误描述",调用方可以看到哪些 Agent 成功、哪些失败,而不是因为一个失败导致整体失败。
"none"(表示没有合适的 Agent),路由引擎会 fallback 到列表中的第一个 Agent。这确保了即使路由决策失败,用户请求仍然能得到处理,而不是返回错误。同时使用 Matcher.quoteReplacement() 防止 Agent 描述中的 $、\ 等特殊字符导致正则替换注入。
当 Agent 部署在不同服务实例上时,如何让它们像本地 Agent 一样透明地互相调用?A2A(Agent-to-Agent)远程协议解决了这个问题。它基于 Nacos 注册中心实现自动注册与发现,通过 HTTP/SSE 代理远程 Agent 的流式响应。
Agent 接口(A2ARemoteAgent implements Agent),调用方完全无感知本地/远程差异。编排引擎可以混合调度本地和远程 Agent,零代码改动。
应用实例 A (port:8080) 应用实例 B (port:8081)
┌────────────────────┐ ┌────────────────────┐
│ Local Agent 1 │ │ Local Agent 3 │
│ Local Agent 2 │ │ Local Agent 4 │
│ │ │ │
│ A2AAutoRegistration │ A2AAutoRegistration
│ ↓ 启动时自动注册 │ ↓ 启动时自动注册
└────────┬───────────┘ └────────┬───────────┘
│ │
▼ ▼
┌──────────────────────────────────────────────────────────┐
│ Nacos 注册中心 │
│ service: enterprise-agent-hub / group: AGENT_HUB │
│ │
│ Instance 1: Agent 1 (8080), Agent 2 (8080) │
│ Instance 2: Agent 3 (8081), Agent 4 (8081) │
└──────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌────────────────────┐ ┌────────────────────┐
│ discoverAll() │ │ discoverAll() │
│ ↓ │ │ ↓ │
│ A2ARemoteAgent 3 │ ← HTTP/SSE → │ A2ARemoteAgent 1 │
│ A2ARemoteAgent 4 │ │ A2ARemoteAgent 2 │
└────────────────────┘ └────────────────────┘
工作流程:① 启动时 A2AAutoRegistration 将本地 Agent 注册到 Nacos → ② NacosA2ARegistry.discoverAll() 发现远程 Agent → ③ 为每个远程 Agent 创建 A2ARemoteAgent 本地代理 → ④ 通过 A2AClient 发起 HTTP/SSE 远程调用 → ⑤ 关闭时 @PreDestroy 自动注销
| 类名 | 职责 | 关键特性 |
|---|---|---|
A2AAgentDescriptor | Agent 描述符(Java record) | 不可变,包含 agentId / name / description / capabilities / serviceUrl |
A2ARegistry | 注册发现接口 | register / deregister / discoverAll / discover / refresh |
NacosA2ARegistry | Nacos 注册中心实现 | 元数据映射、ConcurrentHashMap 缓存、降级返回缓存 |
A2AClient | HTTP/SSE 远程调用客户端 | WebClient、超时控制、错误降级为 SSE 事件 |
A2ARemoteAgent | 远程 Agent 本地代理 | implements Agent,chat() 委托给 A2AClient |
A2AAutoRegistration | 自动注册/注销生命周期 | @EventListener(ApplicationReadyEvent)、@PreDestroy、跳过远程代理 |
A2AConfiguration | Spring Boot 自动配置 | @ConditionalOnProperty(enabled=true)、@ConditionalOnClass(NacosSDK) |
A2AProperties | 配置属性 | 前缀 agent-hub.a2a,默认关闭 |
public record A2AAgentDescriptor(
String agentId, // "rag", "translate"
String name, // "知识库助手"
String description, // "基于 RAG 的问答 Agent"
Set<AgentCapability> capabilities, // {DOCUMENT_UPLOAD, SOURCE_CITATION}
String serviceUrl // "http://192.168.1.10:8080"
) {}
描述符在注册时序列化为 Nacos 实例的 metadata 字段,发现时从 metadata 反序列化恢复。capabilities 存储为逗号分隔的枚举名(如 "DOCUMENT_UPLOAD,SOURCE_CITATION")。
注册流程:每个本地 Agent 被封装为一个 Nacos Instance,元数据存储 Agent 的完整描述信息。
| Nacos 元数据 Key | 来源 | 示例值 |
|---|---|---|
agentId | Agent.getId() | rag |
name | Agent.getName() | 知识库助手 |
description | Agent.getDescription() | 基于 RAG 检索增强的问答 |
capabilities | 逗号分隔枚举名 | DOCUMENT_UPLOAD,SOURCE_CITATION |
serviceUrl | A2AProperties.baseUrl | http://192.168.1.10:8080 |
discoverAll() 内部维护 cachedDescriptors 缓存。如果 Nacos 查询失败(网络抖动),返回上一次成功缓存的结果,避免服务中断。
实例 A 实例 B
┌──────────────┐ ┌──────────────┐
│ 业务代码 │ │ A2A 端点 │
│ agent.chat() │ │ /api/a2a/chat │
└──────┬───────┘ └──────┬───────┘
│ │
▼ │
A2ARemoteAgent.chat(request) │
│ │
▼ │
A2AClient.call(target, request) │
│ │
│ POST {serviceUrl}/api/a2a/chat/{agentId}│
│ Content-Type: application/json │
│ Body: AgentRequest JSON │
│ ────────────────────────────────────→ │
│ │
│ SSE Stream Response │
│ ←──────────────────────────────────── │
│ data: 你好 │
│ data: ,这是回答 │
│ event: done │
│ data: [DONE] │
┌──────┴───────┐ │
│ AgentResponse │ │
│ Flux<SSE> │ │
└──────────────┘ │
// A2AClient — 远程调用核心逻辑
public AgentResponse call(A2AAgentDescriptor target, AgentRequest request) {
String requestBody = objectMapper.writeValueAsString(request);
Flux<ServerSentEvent<String>> sseStream = webClient.post()
.uri(target.serviceUrl() + "/api/a2a/chat/" + target.agentId())
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(requestBody)
.retrieve()
.bodyToFlux(String.class)
.timeout(Duration.ofSeconds(properties.getTimeoutSeconds()))
.map(chunk -> ServerSentEvent.<String>builder().data(chunk).build())
.onErrorResume(e -> Flux.just(
ServerSentEvent.<String>builder()
.event("error").data("远程调用失败: " + e.getMessage()).build()
))
.concatWith(Flux.just(
ServerSentEvent.<String>builder().event("done").data("[DONE]").build()
));
return AgentResponse.of(sseStream);
}
| 错误场景 | 处理方式 |
|---|---|
| 请求序列化失败 | 返回包含错误消息的文本 Flux |
| 网络超时 | timeout(Duration.ofSeconds(60)) → 降级为 error SSE 事件 |
| 远程不可达 | onErrorResume → 返回 event:error 的 SSE 事件 |
| Nacos 发现失败 | 降级返回缓存的 cachedDescriptors |
// 启动时注册 — @EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
List<Agent> agents = agentRegistry.listAgents();
for (Agent agent : agents) {
if (agent instanceof A2ARemoteAgent) continue; // ← 防循环注册
A2AAgentDescriptor descriptor = new A2AAgentDescriptor(
agent.getId(), agent.getName(), agent.getDescription(),
agent.getCapabilities(), buildServiceUrl(agent)
);
a2aRegistry.register(descriptor);
registeredAgentIds.add(agent.getId());
}
}
// 关闭时注销 — @PreDestroy
public void onShutdown() {
for (String agentId : registeredAgentIds) {
a2aRegistry.deregister(agentId);
}
registeredAgentIds.clear();
}
A2ARemoteAgent?A2ARemoteAgent(B-agent)。如果自动注册不过滤它,就会把 B 的 Agent 以 A 的 serviceUrl 再注册回 Nacos,导致其他实例发现到的是错误的地址。instanceof A2ARemoteAgent 检查确保只注册真正的本地 Agent。
agent-hub:
a2a:
enabled: true # 启用 A2A 协议(默认 false)
service-name: enterprise-agent-hub # Nacos 注册服务名
group: AGENT_HUB # Nacos 服务分组
base-url: http://192.168.1.10:8080 # 当前实例的基础 URL
timeout-seconds: 60 # 远程调用超时(秒)
| 属性 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enabled | boolean | false | 是否启用 A2A 协议,需显式设为 true |
service-name | String | enterprise-agent-hub | Nacos 注册服务名 |
group | String | AGENT_HUB | Nacos 服务分组 |
base-url | String | "" | 当前实例基础 URL(必须配置) |
timeout-seconds | int | 60 | 远程调用超时时间(秒) |
@ConditionalOnProperty(prefix="agent-hub.a2a", name="enabled", havingValue="true") 控制。Nacos 注册中心还额外依赖 @ConditionalOnClass(NamingService),确保 Nacos SDK 在 classpath 中。
A2ARemoteAgent 实现了 Agent 接口,因此可以无缝参与所有编排模式。编排引擎在调度 Agent 时无需区分本地/远程。
OrchestrationService.execute("sequential", agentIds, message)
│
▼
agentRegistry.getAgent("local-translate") → TranslateAgent (本地)
agentRegistry.getAgent("remote-polish") → A2ARemoteAgent (远程)
agentRegistry.getAgent("local-format") → FormatAgent (本地)
│
▼
Sequential 编排:
TranslateAgent.chat() → A2ARemoteAgent.chat() → FormatAgent.chat()
(本地执行) (HTTP/SSE 远程) (本地执行)
│ │ │
└──────── 透明串联,调用方无感知 ────────────┘
AgentRegistry,编排引擎一视同仁。
hub-agent-core/.../a2a/ ├── A2AAgentDescriptor.java — Agent 描述符(record,不可变) ├── A2ARegistry.java — 注册发现接口 ├── NacosA2ARegistry.java — Nacos 注册中心实现(缓存 + 降级) ├── A2AClient.java — HTTP/SSE 远程调用客户端(WebClient) ├── A2ARemoteAgent.java — 远程 Agent 本地代理(implements Agent) ├── A2AAutoRegistration.java — 自动注册/注销(ApplicationReadyEvent + PreDestroy) ├── A2AConfiguration.java — Spring Boot 自动配置(条件装配) └── A2AProperties.java — 配置属性(agent-hub.a2a.*)