15个分布式契约接口 + 19个运行态开关、Local/Redis双模式切换、启动校验与零改动扩展
面试核心考点:如何让一个项目从「单机模式」平滑过渡到「分布式部署」而不需要改动业务代码?答案是抽象契约层。
传统做法 vs 契约层做法:
传统做法 (硬编码):
┌─────────────────────────────────────────────┐
│ Service层直接调用 ConcurrentHashMap │
│ → 想换Redis? 改几十个文件! │
│ → 想加分布式锁? 又改几十个文件! │
│ → 每次扩展都是大工程 │
└─────────────────────────────────────────────┘
契约层做法 (本项目):
┌─────────────────────────────────────────────┐
│ Service层 → 契约接口 → 具体实现 │
│ │
│ 业务代码只依赖接口: │
│ @Autowired DistributedJobGuard jobGuard; │
│ jobGuard.tryAcquire("cleanup", lease) │
│ │
│ 运行时自动选择实现: │
│ LOCAL → ReentrantLock (单机内存) │
│ REDIS → RedisTemplate/Lua (分布式) │
│ │
│ 切换只需改一行配置: │
│ hub.distributed.runtime.xxx-mode=local │
│ → redis │
└─────────────────────────────────────────────┘
本项目当前在 hub-common/.../distributed/contract 定义了15个契约接口,并在配置层保留19个运行态模式开关:
| # | 契约接口 / 运行态 | 职责 | 配置键 |
|---|---|---|---|
| 1 | LoginRateLimiterRuntime | 登录失败次数、IP封禁 | login-rate-limiter-mode |
| 2 | UserRateLimiterRuntime | 用户级RPM与并发控制 | user-rate-limiter-mode |
| 3 | CircuitBreakerStateStore | Agent熔断器状态 | circuit-breaker-mode |
| 4 | KeyPoolRuntimeStateStore | Key池轮询、冷却、失败计数 | key-pool-runtime-mode |
| 5 | InboundDedupeStore | 入站消息幂等去重 | inbound-dedupe-mode |
| 6 | DistributedJobGuard | 定时任务单实例执行 | scheduled-job-mode |
| 7 | TaskRuntimeStore | 任务中心运行态快照 | task-center-mode |
| 8 | ChannelSessionStore | 通道会话绑定 | channel-session-mode |
| 9 | ActiveTaskStore | 活跃任务追踪与取消 | active-task-mode |
| 10 | CanaryTokenStore | 金丝雀令牌存储 | canary-token-mode |
| 11 | InjectionRateLimiterStore | 注入检测频次限流 | injection-rate-limiter-mode |
| 12 | DegradationStateStore | 模型降级状态 | degradation-mode |
| 13 | BackgroundTaskRuntimeStore | 后台任务进度与结果 | background-task-mode |
| 14 | ToolCacheStore | 工具调用缓存 | tool-cache-mode |
| 15 | AgentHealthStore | Agent健康快照 | agent-health-mode |
| 16~19 | MCP Health / LLM Tracker / Retrieval Cache / Orchestration Stats | 监控与缓存运行态开关 | mcp-health-mode 等 |
双模式实现架构:
┌─────────────────────────────────────────────────────────┐
│ 业务层 (Service) │
│ 只依赖接口,完全不感知底层实现 │
└───────────────────────┬─────────────────────────────────┘
│ @Autowired
▼
┌─────────────────────────────────────────────────────────┐
│ 契约接口 (XxxRuntime / XxxStore) │
│ check() / markIfFirst() / tryAcquire() / ... │
└──────────┬──────────────────────────────┬───────────────┘
│ │
mode=local/fallback mode=redis
│ │
▼ ▼
┌─────────────────────┐ ┌───────────────────────┐
│ LocalXxxImpl │ │ RedisXxxImpl │
│ · ConcurrentMap │ │ · RedisTemplate │
│ · ReentrantLock │ │ · Lua Script │
│ · AtomicLong │ │ · StringRedisTemplate│
│ · 单JVM有效 │ │ · 跨节点共享 │
└─────────────────────┘ └───────────────────────┘
关键设计原则:
┌──────────────────────────────────────────────┐
│ 1. 接口语义一致:两种实现的行为完全等价 │
│ 2. 零侵入切换:application.yml 一行改配置 │
│ 3. 单元测试友好:LOCAL模式无需外部依赖 │
│ 4. 渐进式迁移:可逐个模块从LOCAL切到REDIS │
└──────────────────────────────────────────────┘
@Component
public class RedisDistributedJobGuard implements DistributedJobGuard {
@Override
public boolean tryAcquire(String jobName, Duration lease) {
if (!redisModeEnabled()) {
return tryAcquireLocal(jobName, lease, "local");
}
if (redisTemplate == null) {
return tryAcquireLocal(jobName, lease, "local_fallback");
}
String key = redisKey(jobName);
String token = UUID.randomUUID().toString();
Boolean ok = redisTemplate.opsForValue()
.setIfAbsent(key, token, lease);
if (Boolean.TRUE.equals(ok)) {
lockTokens.put(key, token);
return true;
}
return false;
}
@Override
public void release(String jobName) {
// Redis模式使用token校验删除;local/fallback模式释放本地锁
}
}
@ConfigurationProperties(prefix = "hub.distributed")
public class DistributedRuntimeProperties {
private Runtime runtime = new Runtime();
private Redis redis = new Redis();
private RocketMq rocketmq = new RocketMq();
public static class Runtime {
private String loginRateLimiterMode = "redis";
private String userRateLimiterMode = "redis";
private String circuitBreakerMode = "redis";
private String keyPoolRuntimeMode = "redis";
private String inboundDedupeMode = "redis";
private String scheduledJobMode = "redis";
private String taskCenterMode = "redis";
private String channelSessionMode = "redis";
private String activeTaskMode = "redis";
private String canaryTokenMode = "redis";
private String injectionRateLimiterMode = "redis";
// ... 共19个运行态开关
}
}
application.yml 最简配置:
# 开发环境 - 单机模式,无需Redis
hub:
distributed:
runtime:
login-rate-limiter-mode: local
user-rate-limiter-mode: local
circuit-breaker-mode: local
scheduled-job-mode: local
---
# 生产环境 - 分布式模式
spring.config.activate.on-profile: prod
hub:
distributed:
runtime:
login-rate-limiter-mode: redis
user-rate-limiter-mode: redis
circuit-breaker-mode: redis
scheduled-job-mode: redis
redis:
key-prefix: "agenthub:dist:"
防止配置错误导致运行时崩溃——应用启动时就做全面校验:
启动校验流程 (ApplicationReadyEvent):
┌────────────────────────────────────────────────────┐
│ @EventListener(ApplicationReadyEvent.class) │
│ DistributedModeStartupValidator.validate() │
└────────────────────────┬───────────────────────────┘
│
▼
┌──────────── cluster profile ? ─────────────────────┐
│ │
│ YES: │
│ ├── 1. 扫描核心 runtime mode 配置 │
│ ├── 2. 收集仍为 local 的模式 │
│ └── 3. 如存在 local 模式则输出 WARN │
│ │
│ NO: │
│ └── 直接通过,允许本地开发使用 local/fallback │
│ │
│ 注意:当前实现是启动巡检/告警,不会强制失败启动 │
└─────────────────────────────────────────────────────┘
@Component
@RequiredArgsConstructor
public class DistributedModeStartupValidator {
private final DistributedRuntimeProperties props;
private final ApplicationContext context;
@EventListener(ApplicationReadyEvent.class)
public void validateDistributedModes() {
boolean clusterProfile = activeProfiles.contains("cluster");
Runtime runtime = distributedRuntimeProperties.getRuntime();
List<String> localModes = new ArrayList<>();
collectIfLocal(localModes, "login-rate-limiter-mode",
runtime.getLoginRateLimiterMode());
collectIfLocal(localModes, "user-rate-limiter-mode",
runtime.getUserRateLimiterMode());
collectIfLocal(localModes, "circuit-breaker-mode",
runtime.getCircuitBreakerMode());
// ... 继续检查 key-pool / inbound-dedupe / task 等模式
if (clusterProfile && !localModes.isEmpty()) {
log.warn("cluster profile 下仍有 local runtime mode: {}", localModes);
}
}
}
扩展新的分布式契约通常只需3步:
Step 1: 定义接口
┌────────────────────────────────────────────┐
│ public interface XxxRuntimeStore { │
│ void doSomething(String key); │
│ } │
└────────────────────────────────────────────┘
Step 2: 提供 local/fallback 与 redis 实现
┌────────────────────────────────────────────┐
│ @Component │
│ class RedisXxxRuntimeStore │
│ implements XxxRuntimeStore │
│ │
│ // 内部按 runtime.xxx-mode 选择 │
│ // redis 不可用时回退本地实现 │
└────────────────────────────────────────────┘
Step 3: 注入使用(已有代码零改动)
┌────────────────────────────────────────────┐
│ @Autowired XxxRuntimeStore store; │
│ store.doSomething("myKey"); │
└────────────────────────────────────────────┘
✅ 不需要改任何已有代码
✅ 只需补充对应 runtime mode 配置
✅ 通常不需要改启动类
✅ 业务侧不感知 local/redis 选择