25 - 图时间旅行与调试
Graph 执行引擎的状态快照与回溯,支持任意节点重放
一、什么是时间旅行调试?
时间旅行调试就像 Git 对于代码 一样——在 Graph 执行过程中,每经过一个节点就保存一份完整的状态快照(snapshot)。当需要调试时,可以回溯到任意历史节点,查看当时的状态,甚至从该节点重新执行。
核心价值:Graph 执行涉及多个节点、条件分支、状态变换,一旦出错很难定位。时间旅行让你可以"倒带"到任意时刻,查看每一步的输入输出,实现精确的问题定位和可重放调试。
二、核心组件
组件清单
| 组件 | 类型 | 职责 |
| TimeTravelService | @Component | 快照管理核心服务(保存/查询/恢复) |
| GraphSnapshot | Record | 不可变快照数据结构 |
GraphSnapshot 结构
public record GraphSnapshot(
String snapshotId, // UUID,快照唯一标识
String nodeId, // 当前执行到的节点 ID
int stepIndex, // 第几步(从 0 开始)
Map<String, Object> stateData, // 完整状态数据的深拷贝
Instant timestamp // 快照创建时间戳
) {}
文件结构
time-travel/
├── TimeTravelService.java ← 核心服务(@Component)
├── GraphSnapshot.java ← 快照 Record
└── TimeTravelController.java ← REST API(ADMIN 权限)
三、快照保存流程
快照保存 — 在每个节点执行前自动触发:
GraphRuntime.execute(graph, initialState)
│
▼
┌─────────────────────────────────────────────────────────┐
│ for each node in execution order: │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ ① timeTravelService.saveSnapshot( │ │
│ │ executionId, nodeId, stepIndex, state │ │
│ │ ) │ │
│ │ │ │
│ │ 内部流程: │ │
│ │ ├── deep copy stateData (防止后续修改影响快照) │ │
│ │ ├── 生成 UUID 作为 snapshotId │ │
│ │ ├── computeIfAbsent(executionId, new List) │ │
│ │ │ ← 原子操作,线程安全 │ │
│ │ └── list.add(new GraphSnapshot(...)) │ │
│ │ ← CopyOnWriteArrayList │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ② node.execute(state) ← 正常执行节点逻辑 │
│ │
│ ③ stepIndex++ │
└─────────────────────────────────────────────────────────┘
Deep Copy:快照中的 stateData 是深拷贝,与运行时状态完全独立。即使后续节点修改了状态,历史快照中保存的仍是当时的原始数据。
四、快照恢复流程
快照恢复 — restoreSnapshot(executionId, snapshotId):
┌───────────────────────────────────────────────────────────────┐
│ ① 查找快照 │
│ snapshots = store.get(executionId) │
│ target = snapshots.stream() │
│ .filter(s -> s.snapshotId().equals(snapshotId)) │
│ .findFirst() │
│ │
│ ② 未找到 → 抛出 SnapshotNotFoundException │
│ │
│ ③ 找到 → 恢复状态: │
│ ├── 创建新的 GraphState 实例 │
│ ├── 将 snapshot.stateData() 的所有键值对写入新状态 │
│ ├── 设置 currentNodeId = snapshot.nodeId() │
│ └── 返回恢复后的 GraphState │
│ │
│ 调用方可以用恢复的状态重新从该节点开始执行 Graph │
└───────────────────────────────────────────────────────────────┘
五、存储限制与淘汰
| 配置 | 值 | 说明 |
| MAX_EXECUTIONS | 100 | 最多保留 100 个执行的快照集合 |
| MAX_SNAPSHOTS_PER_EXECUTION | 200 | 每个执行最多 200 个快照 |
| 淘汰策略 | 最旧优先 | 超限时按 timestamp 删除最旧的执行/快照 |
| 并发安全 | computeIfAbsent | 原子性保证,避免 TOCTOU 竞态 |
存储结构:
ConcurrentHashMap<executionId, CopyOnWriteArrayList<GraphSnapshot>>
│
├── exec_001 → [snap_0, snap_1, snap_2, ..., snap_N] (≤200)
├── exec_002 → [snap_0, snap_1, ...]
├── ...
└── exec_100 → [snap_0, snap_1, ...] (≤100 个 execution)
超限淘汰:
├── execution 数 > 100 → 删除 timestamp 最旧的 execution 整组
└── snapshot 数 > 200 → 删除该 execution 下 timestamp 最旧的 snapshot
六、调试 API
所有时间旅行 API 均需要 ADMIN 权限,仅供调试使用。
| 方法 | 路径 | 描述 |
| GET | /api/debug/timetravel/{executionId} | 获取指定执行的所有快照列表 |
| POST | /api/debug/timetravel/{executionId}/restore/{snapshotId} | 恢复到指定快照,返回恢复后的状态 |
| GET | /api/debug/timetravel-status | 获取时间旅行系统状态(执行数、快照总数等) |
// 查询执行快照列表
GET /api/debug/timetravel/exec_001
{
"executionId": "exec_001",
"snapshotCount": 5,
"snapshots": [
{"snapshotId": "snap_a1b2", "nodeId": "start", "stepIndex": 0, "timestamp": "..."},
{"snapshotId": "snap_c3d4", "nodeId": "analyze", "stepIndex": 1, "timestamp": "..."},
{"snapshotId": "snap_e5f6", "nodeId": "decide", "stepIndex": 2, "timestamp": "..."},
{"snapshotId": "snap_g7h8", "nodeId": "generate", "stepIndex": 3, "timestamp": "..."},
{"snapshotId": "snap_i9j0", "nodeId": "end", "stepIndex": 4, "timestamp": "..."}
]
}
// 恢复到 "decide" 节点
POST /api/debug/timetravel/exec_001/restore/snap_e5f6
{
"restored": true,
"nodeId": "decide",
"stepIndex": 2,
"stateData": { "input": "...", "analysisResult": "..." }
}
七、面试高频问题
Q: 为什么用 CopyOnWriteArrayList?
A: 快照的访问模式是典型的读多写少:快照一旦创建很少修改,但调试时会频繁查询遍历。CopyOnWriteArrayList 在写入时复制整个数组(成本高但不频繁),在读取时无需加锁(零成本且高频)。如果用普通 ArrayList + synchronized,每次调试查询都需要加锁,严重影响读取性能。
Q: computeIfAbsent 解决了什么问题?
A: 解决了 TOCTOU(Time-of-check to time-of-use)竞态条件。如果不用 computeIfAbsent,代码会写成:if (!map.containsKey(id)) map.put(id, new List()); map.get(id).add(snap);——在高并发下,两个线程可能同时判断 key 不存在,各自创建一个新 List,后者覆盖前者导致快照丢失。computeIfAbsent 将"检查 + 创建"合为一个原子操作,彻底消除竞态。
Q: 生产环境如何使用?
A: 当前实现使用进程内内存存储快照,适合开发和调试场景。生产环境可以通过实现 SnapshotStore 接口扩展为持久化存储(Redis 或数据库):Redis 适合快速读写但有容量限制,数据库适合长期保存和审计。同时需要增加快照的自动过期清理(TTL)和按需采样(不是每个节点都保存快照)来控制存储成本。
八、Graph Observation 指标体系
Micrometer 集成架构
GraphRuntime ──▶ ObservationRegistry ──▶ MeterRegistry ──▶ Prometheus ──▶ Grafana
┌──────────────┐ ┌─────────────────────┐ ┌───────────────┐ ┌────────────┐ ┌─────────┐
│ GraphRuntime │───▶│ ObservationRegistry │───▶│ MeterRegistry │───▶│ Prometheus │───▶│ Grafana │
│ │ │ │ │ │ │ │ │ │
│ 节点执行 │ │ Observation API │ │ Timer │ │ /actuator │ │ 可视化 │
│ 状态变更 │ │ start/stop/error │ │ Counter │ │ /prometheus│ │ 告警 │
│ Checkpoint │ │ ObservationHandler │ │ Gauge │ │ pull模式 │ │ 面板 │
└──────────────┘ └─────────────────────┘ └───────────────┘ └────────────┘ └─────────┘
核心指标表
| Metric Name | Type | Tags | Description |
graph.node.execution.time | Timer | agentId, nodeId, status | 节点执行耗时 |
graph.total.execution.time | Timer | agentId, graphId | 图总执行时间 |
graph.node.error.count | Counter | agentId, nodeId, errorType | 节点错误计数 |
graph.iteration.count | Counter | agentId | ReAct迭代次数 |
graph.checkpoint.save.time | Timer | agentId, saverType | Checkpoint保存耗时 |
graph.hitl.pending.count | Gauge | agentId | 当前等待审批数 |
Prometheus 查询示例
# P95 节点执行耗时
histogram_quantile(0.95, rate(graph_node_execution_time_seconds_bucket[5m]))
# 每分钟错误率
rate(graph_node_error_count_total[1m])
# 活跃HITL等待数
graph_hitl_pending_count
Grafana Dashboard 面板建议
| 面板名称 | 图表类型 | 数据来源 |
| 节点耗时热力图 | Heatmap | graph.node.execution.time |
| 错误率趋势线 | Time Series | graph.node.error.count |
| 迭代次数分布直方图 | Histogram | graph.iteration.count |
| Checkpoint保存延迟 | Time Series | graph.checkpoint.save.time |
九、Redis/MySQL Checkpoint 对比
双存储架构
GraphRuntime
│
┌─────┴─────┐
▼ ▼
┌───────────┐ ┌───────────┐
│ Redis │ │ MySQL │
│ Saver │ │ Saver │
├───────────┤ ├───────────┤
│ 高性能 │ │ 持久化 │
│ TTL自清理 │ │ 可查询 │
│ 活跃会话 │ │ 审计回放 │
└───────────┘ └───────────┘
Redis vs MySQL 对比
| Dimension | Redis | MySQL |
| 读写速度 | <1ms | 5-20ms |
| 持久性 | 内存+AOF | 磁盘持久 |
| 容量 | 有限(内存) | 无限(磁盘) |
| 自动清理 | TTL过期 | 手动/定时 |
| 适用场景 | 活跃会话 | 历史审计 |
| 故障恢复 | 重启丢失风险 | 完全恢复 |
| 查询能力 | Key-Value | SQL查询 |
GraphInfraConfiguration 条件装配
@Configuration
@ConditionalOnProperty(prefix = "agent.defaults.graph",
name = "checkpoint-enabled", havingValue = "true")
public class GraphInfraConfiguration {
@Bean
@ConditionalOnProperty(name = "agent.defaults.graph.checkpoint-type",
havingValue = "redis")
public BaseSaver redisSaver(...) { ... }
@Bean
@ConditionalOnProperty(name = "agent.defaults.graph.checkpoint-type",
havingValue = "mysql")
public BaseSaver mysqlSaver(...) { ... }
}
条件装配:通过 @ConditionalOnProperty 实现 Redis/MySQL Saver 的互斥加载。配置 checkpoint-type=redis 时只创建 RedisSaver Bean,反之只创建 MySQLSaver Bean,避免不必要的连接池初始化。
十、更多面试高频问题
Q: 分布式场景下如何保证 Checkpoint 一致性?多节点同时写入同一个 threadId 会冲突吗?
A: 单线程模型——同一 threadId 的图执行是串行的(通过分布式锁保证)。Checkpoint 写入使用乐观锁(version 字段)。Redis 用 WATCH+MULTI 事务。实际场景很少并发,因为一个对话 = 一个 thread。
Q: ObservationRegistry 的设计模式是什么?为什么不直接用 MeterRegistry?
A: ObservationRegistry 是观察者模式 + 装饰器模式的组合。优于直接 MeterRegistry:① 统一 API(metrics + tracing + logging);② 可插拔 handler;③ 低开销(未注册时空操作)。这是 Spring 官方推荐方式。
Q: Time-Travel 的深拷贝成本高吗?有什么优化手段?
A: 成本:GraphState 包含 messages 列表(可能很大)。优化手段:① 增量快照(只存 diff);② 写时复制(COW);③ 限制 MAX_SNAPSHOTS_PER_EXECUTION=200 上限;④ 可选关闭(生产环境 debug=false)。
Q: Redis Checkpoint 的 TTL 设多长合适?过期后用户还能恢复会话吗?
A: 活跃会话 TTL=30min(可配置)。过期后不能恢复。方案:① 双写(Redis 快速访问 + MySQL 持久备份);② TTL 延长 + touch 机制(每次访问刷新 TTL)。
Q: 如果 Graph 执行中途 OOM,Checkpoint 能恢复吗?
A: 取决于最后一次成功保存的 Checkpoint。每个节点执行完后保存一次。OOM 在节点执行中途 = 丢失该节点进度,从上一个节点 Checkpoint 恢复重试。
Q: Prometheus 拉取频率和 Graph 指标精度的权衡?
A: 默认 15s 拉取间隔。图执行通常 2-30s 完成。长执行能被捕获;短执行可能在两次拉取间完成。解决:使用 Summary/Histogram 聚合,不依赖瞬时采样。