23 - 多Agent编排引擎

4 种编排模式 — Sequential / Parallel / Loop / LLM Routing

一、编排引擎概述

复杂任务往往需要多个专业化 Agent 协同完成。例如:一个"市场分析报告"任务可能需要数据采集 Agent数据分析 Agent报告撰写 Agent 依次工作。编排引擎就是协调这些 Agent 执行顺序、并发方式和路由决策的核心组件。

核心设计:编排引擎将多个子 Agent 组合成一个"编排 Agent",对外表现为单个 Agent。调用方无需关心内部有多少子 Agent、如何调度——编排模式封装了所有复杂性。

二、4 种编排模式

模式执行方式Agent 数量典型场景
SequentialSequentialOrchestrationA → B → C 串行管线≥ 2多步骤流水线(采集→分析→报告)
ParallelParallelOrchestrationFan-out 并行 → 合并结果≥ 2多维度并行分析(情感+关键词+摘要)
LoopLoopOrchestration循环执行直到满足退出条件1迭代优化(代码审查→修改→再审查)
LLM RoutingLlmRoutingOrchestrationLLM 智能选择目标 Agent≥ 2意图路由(客服→技术/销售/投诉)

三、Sequential — 串行编排

最经典的管线模式,每个 Agent 的输出作为下一个 Agent 的输入,依次执行。

  Sequential 执行流程:

  用户消息
    │
    ▼
  ┌─────────┐    output    ┌─────────┐    output    ┌─────────┐
  │ Agent A  │ ──────────→ │ Agent B  │ ──────────→ │ Agent C  │
  │ (采集)   │             │ (分析)   │             │ (报告)   │
  └─────────┘             └─────────┘             └─────────┘
                                                       │
                                                       ▼
                                                   最终结果

  SSE 事件流:
  ┌─────────────────────────────────────────────────────────┐
  │ event: orchestration_step                               │
  │ data: {"step":1,"agent":"Agent A","status":"running"}   │
  │                                                         │
  │ event: orchestration_step                               │
  │ data: {"step":1,"agent":"Agent A","status":"completed"} │
  │                                                         │
  │ event: orchestration_step                               │
  │ data: {"step":2,"agent":"Agent B","status":"running"}   │
  │ ...                                                     │
  └─────────────────────────────────────────────────────────┘
数据传递:每个 Agent 执行完毕后,其输出文本会被包装为新的用户消息传给下一个 Agent。这意味着每个 Agent 只"看到"上一步的结果,而非完整历史。

四、Parallel — 并行编排

Fan-out / Fan-in 模式:将同一个消息发给所有子 Agent 并行处理,最后通过 ResultMerger 合并结果。

  Parallel 执行流程:

                         用户消息
                            │
              ┌─────────────┼─────────────┐
              ▼             ▼             ▼
        ┌──────────┐  ┌──────────┐  ┌──────────┐
        │ Agent A   │  │ Agent B   │  │ Agent C   │
        │ (情感分析) │  │ (关键词)  │  │ (摘要)    │
        └─────┬────┘  └─────┬────┘  └─────┬────┘
              │             │             │
              └─────────────┼─────────────┘
                            ▼
                     ┌─────────────┐
                     │ ResultMerger │  ← 合并所有结果
                     └──────┬──────┘
                            ▼
                        最终结果
配置说明
线程池PARALLEL_EXECUTORmax(4, CPU核心数) 线程
超时300 秒单个 Agent 最大执行时间
错误隔离try-catch 包装单个 Agent 失败不影响其他 Agent
结果合并ResultMerger默认换行拼接,可自定义

五、Loop — 循环编排

单 Agent 循环执行,每轮将上一轮输出作为新输入,直到满足退出条件或达到最大迭代次数。

  Loop 执行流程:

  用户消息 ──→ ┌──────────────────────────────────┐
               │         Loop Controller           │
               │                                    │
               │   iteration = 0                    │
               │   ┌────────────────────────────┐   │
               │   │        Agent 执行           │   │
               │   │   input → Agent → output    │   │
               │   └──────────┬─────────────────┘   │
               │              │                      │
               │              ▼                      │
               │   exitCondition.test(output)?       │
               │   ├── true  → 退出循环,返回结果    │
               │   └── false → iteration++           │
               │              │                      │
               │              ▼                      │
               │   iteration >= maxIterations(10)?   │
               │   ├── true  → 强制退出              │
               │   └── false → 继续循环 ↑            │
               └──────────────────────────────────┘

  安全边界: maxIterations = 10(防止无限循环)
exitCondition:是一个 Predicate<String>,接收 Agent 输出文本,返回 true 表示任务完成可以退出。例如:output -> output.contains("APPROVED")

六、LLM Routing — 智能路由

由 LLM 根据用户消息内容,智能选择最合适的子 Agent 来处理请求。

  LLM Routing 执行流程:

  用户消息: "我的订单发错了,要求退款"
       │
       ▼
  ┌──────────────────────────────────────────────┐
  │              LLM 路由决策                     │
  │                                              │
  │  Routing Prompt:                             │
  │  "根据用户消息选择最合适的 Agent:             │
  │   - agent_tech: 技术问题处理                  │
  │   - agent_sales: 销售咨询                    │
  │   - agent_complaint: 投诉与退款              │
  │   请只返回 agent ID"                         │
  │                                              │
  │  LLM 输出: "agent_complaint"                 │
  └──────────────┬───────────────────────────────┘
                 │
                 ▼
  ┌──────────────────────────┐
  │    agent_complaint       │
  │   (投诉与退款 Agent)      │
  └──────────┬───────────────┘
             │
             ▼
          最终结果
// 路由 Prompt 模板(关键片段)
String routingPrompt = String.format(
    "根据用户消息选择最合适的Agent处理:\n%s\n" +
    "只返回Agent ID,不要其他内容。如果都不合适返回 none",
    agentDescriptions  // 动态注入各 Agent 的 ID + 描述
);

// 安全处理: 防止 Agent 描述中的特殊字符注入
String safeDesc = Matcher.quoteReplacement(description);

// 降级策略: LLM 返回无法识别的 ID 或 "none"
// → fallback 到列表中第一个 Agent

七、OrchestrationService

OrchestrationService 是编排功能的服务入口,负责根据编排类型和子 Agent ID 列表构建编排 Agent。

@Service
public class OrchestrationService {

    // 根据编排类型创建编排 Agent
    public OrchestrationAgent createOrchestration(
        OrchestrationTypeEnum type,   // SEQUENTIAL / PARALLEL / LOOP / LLM_ROUTING
        List<String> agentIds,        // 子 Agent ID 列表
        OrchestrationConfig config    // 可选配置(exitCondition, merger 等)
    ) {
        List<Agent> agents = agentIds.stream()
            .map(agentService::getAgent)
            .collect(toList());

        return switch (type) {
            case SEQUENTIAL -> new SequentialOrchestration(agents);
            case PARALLEL   -> new ParallelOrchestration(agents, config.getMerger());
            case LOOP       -> new LoopOrchestration(agents.get(0), config);
            case LLM_ROUTING-> new LlmRoutingOrchestration(agents, llmService);
        };
    }
}

八、线程池管理

编排引擎使用 3 个独立线程池,隔离不同层次的并发需求:

线程池线程名前缀大小用途
ORCHESTRATION_EXECUTORorch-固定 4编排任务调度(顶层)
PARALLEL_EXECUTORparallel-max(4, CPU)Parallel 模式并行执行子 Agent
SUB_AGENT_EXECUTORsub-agent-固定 8子 Agent 内部异步操作
为什么 3 个独立线程池?防止级联阻塞。如果共享线程池,Parallel 模式启动 8 个子 Agent 可能耗尽所有线程,导致其他编排任务无法调度。分层隔离确保每层有独立的线程资源。

九、面试高频问题

Q: 为什么不用 ForkJoinPool.commonPool()?
A: 编排引擎中的子 Agent 执行涉及阻塞 I/O(LLM API 调用、网络请求),而 ForkJoinPool.commonPool() 是为CPU 密集型计算设计的,线程数等于 CPU 核心数。大量阻塞 I/O 会耗尽 common pool 线程,影响整个 JVM 中依赖 common pool 的其他组件(CompletableFuture、Stream.parallel 等)。独立线程池实现了故障隔离。
Q: Parallel 模式如何做错误隔离?
A: 每个子 Agent 的执行都被 try-catch 包装。如果某个 Agent 抛出异常,catch 块会将异常信息转换为错误描述字符串(如 "Agent X failed: timeout"),而不是抛出异常。这样 ResultMerger 收到的是"N 个成功结果 + M 个错误描述",调用方可以看到哪些 Agent 成功、哪些失败,而不是因为一个失败导致整体失败。
Q: LLM Routing 的降级策略?
A: 当 LLM 返回的 Agent ID 无法匹配任何已注册 Agent(可能是幻觉输出),或者返回 "none"(表示没有合适的 Agent),路由引擎会 fallback 到列表中的第一个 Agent。这确保了即使路由决策失败,用户请求仍然能得到处理,而不是返回错误。同时使用 Matcher.quoteReplacement() 防止 Agent 描述中的 $\ 等特殊字符导致正则替换注入。

十、🌐 A2A 远程 Agent 协议 — 跨服务 Agent 通信

当 Agent 部署在不同服务实例上时,如何让它们像本地 Agent 一样透明地互相调用?A2A(Agent-to-Agent)远程协议解决了这个问题。它基于 Nacos 注册中心实现自动注册与发现,通过 HTTP/SSE 代理远程 Agent 的流式响应。

核心价值:远程 Agent 实现了 Agent 接口(A2ARemoteAgent implements Agent),调用方完全无感知本地/远程差异。编排引擎可以混合调度本地和远程 Agent,零代码改动。

10.1 架构总览

  应用实例 A (port:8080)                    应用实例 B (port:8081)
  ┌────────────────────┐                   ┌────────────────────┐
  │  Local Agent 1     │                   │  Local Agent 3     │
  │  Local Agent 2     │                   │  Local Agent 4     │
  │                    │                   │                    │
  │  A2AAutoRegistration                   │  A2AAutoRegistration
  │    ↓ 启动时自动注册                     │    ↓ 启动时自动注册
  └────────┬───────────┘                   └────────┬───────────┘
           │                                        │
           ▼                                        ▼
  ┌──────────────────────────────────────────────────────────┐
  │                    Nacos 注册中心                         │
  │  service: enterprise-agent-hub  /  group: AGENT_HUB      │
  │                                                          │
  │  Instance 1: Agent 1 (8080), Agent 2 (8080)              │
  │  Instance 2: Agent 3 (8081), Agent 4 (8081)              │
  └──────────────────────────────────────────────────────────┘
           │                                        │
           ▼                                        ▼
  ┌────────────────────┐                   ┌────────────────────┐
  │  discoverAll()     │                   │  discoverAll()     │
  │  ↓                 │                   │  ↓                 │
  │  A2ARemoteAgent 3  │  ← HTTP/SSE →    │  A2ARemoteAgent 1  │
  │  A2ARemoteAgent 4  │                   │  A2ARemoteAgent 2  │
  └────────────────────┘                   └────────────────────┘

工作流程:① 启动时 A2AAutoRegistration 将本地 Agent 注册到 Nacos → ② NacosA2ARegistry.discoverAll() 发现远程 Agent → ③ 为每个远程 Agent 创建 A2ARemoteAgent 本地代理 → ④ 通过 A2AClient 发起 HTTP/SSE 远程调用 → ⑤ 关闭时 @PreDestroy 自动注销

10.2 核心类

类名职责关键特性
A2AAgentDescriptorAgent 描述符(Java record)不可变,包含 agentId / name / description / capabilities / serviceUrl
A2ARegistry注册发现接口register / deregister / discoverAll / discover / refresh
NacosA2ARegistryNacos 注册中心实现元数据映射、ConcurrentHashMap 缓存、降级返回缓存
A2AClientHTTP/SSE 远程调用客户端WebClient、超时控制、错误降级为 SSE 事件
A2ARemoteAgent远程 Agent 本地代理implements Agent,chat() 委托给 A2AClient
A2AAutoRegistration自动注册/注销生命周期@EventListener(ApplicationReadyEvent)、@PreDestroy、跳过远程代理
A2AConfigurationSpring Boot 自动配置@ConditionalOnProperty(enabled=true)、@ConditionalOnClass(NacosSDK)
A2AProperties配置属性前缀 agent-hub.a2a,默认关闭

10.3 Agent 描述符(A2AAgentDescriptor)

public record A2AAgentDescriptor(
    String agentId,                    // "rag", "translate"
    String name,                       // "知识库助手"
    String description,                // "基于 RAG 的问答 Agent"
    Set<AgentCapability> capabilities, // {DOCUMENT_UPLOAD, SOURCE_CITATION}
    String serviceUrl                  // "http://192.168.1.10:8080"
) {}

描述符在注册时序列化为 Nacos 实例的 metadata 字段,发现时从 metadata 反序列化恢复。capabilities 存储为逗号分隔的枚举名(如 "DOCUMENT_UPLOAD,SOURCE_CITATION")。

10.4 Nacos 注册与发现

注册流程:每个本地 Agent 被封装为一个 Nacos Instance,元数据存储 Agent 的完整描述信息。

Nacos 元数据 Key来源示例值
agentIdAgent.getId()rag
nameAgent.getName()知识库助手
descriptionAgent.getDescription()基于 RAG 检索增强的问答
capabilities逗号分隔枚举名DOCUMENT_UPLOAD,SOURCE_CITATION
serviceUrlA2AProperties.baseUrlhttp://192.168.1.10:8080
降级策略discoverAll() 内部维护 cachedDescriptors 缓存。如果 Nacos 查询失败(网络抖动),返回上一次成功缓存的结果,避免服务中断。

10.5 远程调用流程(HTTP/SSE 代理)

  实例 A                                      实例 B
  ┌──────────────┐                           ┌──────────────┐
  │ 业务代码      │                           │ A2A 端点      │
  │ agent.chat() │                           │ /api/a2a/chat │
  └──────┬───────┘                           └──────┬───────┘
         │                                          │
         ▼                                          │
  A2ARemoteAgent.chat(request)                      │
         │                                          │
         ▼                                          │
  A2AClient.call(target, request)                   │
         │                                          │
         │  POST {serviceUrl}/api/a2a/chat/{agentId}│
         │  Content-Type: application/json          │
         │  Body: AgentRequest JSON                 │
         │ ────────────────────────────────────→     │
         │                                          │
         │         SSE Stream Response              │
         │ ←────────────────────────────────────     │
         │  data: 你好                              │
         │  data: ,这是回答                         │
         │  event: done                             │
         │  data: [DONE]                            │
  ┌──────┴───────┐                                  │
  │ AgentResponse │                                  │
  │ Flux<SSE>    │                                  │
  └──────────────┘                                  │
// A2AClient — 远程调用核心逻辑
public AgentResponse call(A2AAgentDescriptor target, AgentRequest request) {
    String requestBody = objectMapper.writeValueAsString(request);

    Flux<ServerSentEvent<String>> sseStream = webClient.post()
        .uri(target.serviceUrl() + "/api/a2a/chat/" + target.agentId())
        .contentType(MediaType.APPLICATION_JSON)
        .bodyValue(requestBody)
        .retrieve()
        .bodyToFlux(String.class)
        .timeout(Duration.ofSeconds(properties.getTimeoutSeconds()))
        .map(chunk -> ServerSentEvent.<String>builder().data(chunk).build())
        .onErrorResume(e -> Flux.just(
            ServerSentEvent.<String>builder()
                .event("error").data("远程调用失败: " + e.getMessage()).build()
        ))
        .concatWith(Flux.just(
            ServerSentEvent.<String>builder().event("done").data("[DONE]").build()
        ));

    return AgentResponse.of(sseStream);
}
错误场景处理方式
请求序列化失败返回包含错误消息的文本 Flux
网络超时timeout(Duration.ofSeconds(60)) → 降级为 error SSE 事件
远程不可达onErrorResume → 返回 event:error 的 SSE 事件
Nacos 发现失败降级返回缓存的 cachedDescriptors

10.6 自动注册机制

// 启动时注册 — @EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
    List<Agent> agents = agentRegistry.listAgents();
    for (Agent agent : agents) {
        if (agent instanceof A2ARemoteAgent) continue;  // ← 防循环注册
        A2AAgentDescriptor descriptor = new A2AAgentDescriptor(
            agent.getId(), agent.getName(), agent.getDescription(),
            agent.getCapabilities(), buildServiceUrl(agent)
        );
        a2aRegistry.register(descriptor);
        registeredAgentIds.add(agent.getId());
    }
}

// 关闭时注销 — @PreDestroy
public void onShutdown() {
    for (String agentId : registeredAgentIds) {
        a2aRegistry.deregister(agentId);
    }
    registeredAgentIds.clear();
}
Q: 为什么要跳过 A2ARemoteAgent
A: 防止循环注册。假设实例 A 发现了实例 B 的 Agent,创建了 A2ARemoteAgent(B-agent)。如果自动注册不过滤它,就会把 B 的 Agent 以 A 的 serviceUrl 再注册回 Nacos,导致其他实例发现到的是错误的地址。instanceof A2ARemoteAgent 检查确保只注册真正的本地 Agent。

10.7 配置参考

agent-hub:
  a2a:
    enabled: true                            # 启用 A2A 协议(默认 false)
    service-name: enterprise-agent-hub       # Nacos 注册服务名
    group: AGENT_HUB                         # Nacos 服务分组
    base-url: http://192.168.1.10:8080       # 当前实例的基础 URL
    timeout-seconds: 60                      # 远程调用超时(秒)
属性类型默认值说明
enabledbooleanfalse是否启用 A2A 协议,需显式设为 true
service-nameStringenterprise-agent-hubNacos 注册服务名
groupStringAGENT_HUBNacos 服务分组
base-urlString""当前实例基础 URL(必须配置)
timeout-secondsint60远程调用超时时间(秒)
条件装配:整个 A2A 模块通过 @ConditionalOnProperty(prefix="agent-hub.a2a", name="enabled", havingValue="true") 控制。Nacos 注册中心还额外依赖 @ConditionalOnClass(NamingService),确保 Nacos SDK 在 classpath 中。

10.8 与编排引擎的协作

A2ARemoteAgent 实现了 Agent 接口,因此可以无缝参与所有编排模式。编排引擎在调度 Agent 时无需区分本地/远程。

  OrchestrationService.execute("sequential", agentIds, message)
          │
          ▼
  agentRegistry.getAgent("local-translate")   → TranslateAgent (本地)
  agentRegistry.getAgent("remote-polish")     → A2ARemoteAgent (远程)
  agentRegistry.getAgent("local-format")      → FormatAgent (本地)
          │
          ▼
  Sequential 编排:
  TranslateAgent.chat() → A2ARemoteAgent.chat() → FormatAgent.chat()
       (本地执行)            (HTTP/SSE 远程)          (本地执行)
          │                      │                      │
          └──────── 透明串联,调用方无感知 ────────────┘
场景扩展:远程 Agent 同样可以参与 Parallel(并行)、Loop(循环)、LLM Routing(智能路由)编排。只要注册到 AgentRegistry,编排引擎一视同仁。

10.9 文件清单

hub-agent-core/.../a2a/
├── A2AAgentDescriptor.java    — Agent 描述符(record,不可变)
├── A2ARegistry.java           — 注册发现接口
├── NacosA2ARegistry.java      — Nacos 注册中心实现(缓存 + 降级)
├── A2AClient.java             — HTTP/SSE 远程调用客户端(WebClient)
├── A2ARemoteAgent.java        — 远程 Agent 本地代理(implements Agent)
├── A2AAutoRegistration.java   — 自动注册/注销(ApplicationReadyEvent + PreDestroy)
├── A2AConfiguration.java      — Spring Boot 自动配置(条件装配)
└── A2AProperties.java         — 配置属性(agent-hub.a2a.*)