理解熔断、限流、降级、消息队列等分布式核心概念
单机 vs 分布式: 单机 (一台服务器): ┌──────────────────────────┐ │ · 简单、开发快 │ │ · 一台机器就够了 │ │ · 但: │ │ - 挂了就全挂了 │ │ - 用户多了扛不住 │ │ - 无法水平扩展 │ └──────────────────────────┘ 分布式 (多台服务器): ┌──────────────────────────┐ │ · 多台机器协同工作 │ │ · 一台挂了其他还能服务 │ │ · 可以加机器扩容 │ │ · 但: │ │ - 状态需要共享 │ │ - 消息可能重复 │ │ - 网络可能分区 │ │ - 设计复杂度高 │ └──────────────────────────┘ 本项目的策略: 先单机后分布式 ┌─────────────────────────────────────────┐ │ 开发/小流量: LOCAL模式 (单机) │ │ · 所有状态在内存中 │ │ · 不需要Redis/RocketMQ │ │ │ │ 生产/大流量: REDIS模式 (分布式) │ │ · 状态通过Redis共享 │ │ · 消息通过RocketMQ传递 │ │ · 逐步迁移,不需要改业务代码 │ └─────────────────────────────────────────┘
类比:家里的保险丝——电流过大时自动断开,保护电器不被烧坏。
熔断器三状态:
┌────────────┐ 失败率>50% ┌────────────┐
│ CLOSED │ ──────────────→ │ OPEN │
│ (正常通行) │ │ (拒绝请求) │
└─────┬──────┘ └─────┬──────┘
↑ │
│ 30秒后半开 │
│ 尝试放行1个请求 │
│ ┌────────────┐ │
└─────────│ HALF_OPEN │←─────────┘
│ (试探放行) │
└────────────┘
成功→CLOSED 失败→OPEN
本项目实现 (AgentCircuitBreaker):
· 窗口: 1分钟
· 最低请求数: 5次 (太少统计不准)
· 失败率阈值: 50%
· 恢复超时: 30秒
· 支持LOCAL(内存)和REDIS(分布式)两种模式
· Redis使用Lua脚本保证原子性
本项目的多层限流: Layer 1: 用户级限流 ┌──────────────────────────────────────┐ │ UserRateLimiter │ │ · 每分钟30次请求 │ │ · 最大5个并发 │ │ · Semaphore + 计数器 │ └──────────────────────────────────────┘ Layer 2: 注入攻击限流 ┌──────────────────────────────────────┐ │ InjectionRateLimiter │ │ · 每分钟20次注入尝试 │ │ · 超限封禁5分钟 │ │ · Redis Sorted Set实现滑动窗口 │ └──────────────────────────────────────┘ Layer 3: 登录限流 ┌──────────────────────────────────────┐ │ LoginRateLimiter │ │ · 防止暴力破解密码 │ │ · 每分钟最多5次登录失败 │ └──────────────────────────────────────┘ Layer 4: LLM API限流 ┌──────────────────────────────────────┐ │ ApiKeyPool + Cooling机制 │ │ · API Key被429限流 → 冷却60秒 │ │ · 所有Key耗尽 → 三态熔断降级链路由 │ └──────────────────────────────────────┘
降级策略: 正常流程: 用户 → zhipu (智谱AI) → 回复 zhipu挂了怎么办? ┌──────────────────────────────────────────────┐ │ DegradationManager 检测到 zhipu 不可用 │ │ │ │ 自动降级: │ │ 用户 → zhipu(不可用) → deepseek(备用) → 回复 │ │ │ │ 对用户完全透明! 用户不知道后端换了供应商 │ │ │ │ 触发条件: │ │ · pool_exhausted: 所有Key都用完 │ │ · error_rate: 滑动窗口错误率>50% │ │ │ │ 恢复条件: │ │ · 等待300秒 │ │ · 至少1个Key恢复ACTIVE │ │ · 备用通道没有也降级 │ └──────────────────────────────────────────────┘
为什么需要消息队列? 同步处理 (无MQ): ┌────────┐ ┌────────┐ ┌────────┐ │ 用户请求│───→│ 主服务 │───→│ 发邮件 │ (慢! 3秒) │ 等待... │ └────────┘ └────────┘ └────────┘ 用户等了3秒才收到响应 异步处理 (有MQ): ┌────────┐ ┌────────┐ ┌─────────┐ ┌────────┐ │ 用户请求│───→│ 主服务 │───→│ RocketMQ│───→│ 消费者 │ │ 立即返回│ └────────┘ └─────────┘ │ 发邮件 │ └────────┘ └────────┘ 用户立即得到响应,邮件后台慢慢发 本项目的MQ应用: ┌──────────────────────────────────────────┐ │ Topic: TASK_RETRY │ │ 任务失败 → 发送重试消息 → 消费者重试 │ │ │ │ Topic: KEY_POOL_REFRESH │ │ Key配置变更 → 广播所有实例 → 热更新 │ │ │ │ Topic: CHANNEL_INGEST │ │ 通道消息 → 异步处理 → 调用Agent │ └──────────────────────────────────────────┘
幂等性: 同一个操作执行1次和执行N次,效果一样 非幂等问题 (分布式常见): ┌──────────────────────────────────────┐ │ 用户点了"支付" │ │ → 请求超时,用户又点了一次 │ │ → 扣了两次钱! 💰💰 │ └──────────────────────────────────────┘ 幂等性解决方案: ┌──────────────────────────────────────┐ │ 本项目的入站消息去重: │ │ │ │ RedisInboundDedupeStore: │ │ · 收到消息 → Redis SETNX(messageId) │ │ · 首次: SETNX成功 → 处理消息 │ │ · 重复: SETNX失败 → 丢弃消息 │ │ · TTL: 5分钟后自动过期 │ │ │ │ 保证: 同一条消息只处理一次 │ └──────────────────────────────────────┘
| 设计模式 | 是什么 | 本项目在哪里用 |
|---|---|---|
| 策略模式 | 定义算法族,运行时选择 | 文档解析、LLM Provider |
| 装饰器模式 | 动态添加功能而不改原类 | TrackedChatModel、PooledChatModel |
| 模板方法 | 父类定义骨架,子类实现细节 | AbstractLlmAgent |
| 观察者模式 | 一对多的事件通知 | Hook系统、SSE事件流 |
| 工厂方法 | 由子类决定创建哪个对象 | ChatModelFactory各实现 |
| 状态机 | 对象在不同状态间切换 | GraphState、ApiKeyEntry、熔断器 |
| 责任链 | 多个处理器串联处理请求 | InputScanner检查链 |
| 建造者 | 分步构建复杂对象 | CompiledGraph.Builder |
| 单例 | 全局只有一个实例 | AgentRegistry、ModelRegistry |
| 代理模式 | 控制对对象的访问 | PooledChatModel代理真实ChatModel |