25 - 图时间旅行与调试

Graph 执行引擎的状态快照与回溯,支持任意节点重放

一、什么是时间旅行调试?

时间旅行调试就像 Git 对于代码 一样——在 Graph 执行过程中,每经过一个节点就保存一份完整的状态快照(snapshot)。当需要调试时,可以回溯到任意历史节点,查看当时的状态,甚至从该节点重新执行。

核心价值:Graph 执行涉及多个节点、条件分支、状态变换,一旦出错很难定位。时间旅行让你可以"倒带"到任意时刻,查看每一步的输入输出,实现精确的问题定位和可重放调试。

二、核心组件

组件清单

组件类型职责
TimeTravelService@Component快照管理核心服务(保存/查询/恢复)
GraphSnapshotRecord不可变快照数据结构

GraphSnapshot 结构

public record GraphSnapshot(
    String snapshotId,              // UUID,快照唯一标识
    String nodeId,                  // 当前执行到的节点 ID
    int stepIndex,                  // 第几步(从 0 开始)
    Map<String, Object> stateData, // 完整状态数据的深拷贝
    Instant timestamp               // 快照创建时间戳
) {}

文件结构

  time-travel/
  ├── TimeTravelService.java        ← 核心服务(@Component)
  ├── GraphSnapshot.java            ← 快照 Record
  └── TimeTravelController.java     ← REST API(ADMIN 权限)

三、快照保存流程

  快照保存 — 在每个节点执行前自动触发:

  GraphRuntime.execute(graph, initialState)
       │
       ▼
  ┌─────────────────────────────────────────────────────────┐
  │  for each node in execution order:                      │
  │                                                         │
  │    ┌──────────────────────────────────────────────────┐  │
  │    │  ① timeTravelService.saveSnapshot(               │  │
  │    │       executionId, nodeId, stepIndex, state       │  │
  │    │     )                                            │  │
  │    │                                                  │  │
  │    │  内部流程:                                       │  │
  │    │  ├── deep copy stateData (防止后续修改影响快照)   │  │
  │    │  ├── 生成 UUID 作为 snapshotId                   │  │
  │    │  ├── computeIfAbsent(executionId, new List)       │  │
  │    │  │   ← 原子操作,线程安全                        │  │
  │    │  └── list.add(new GraphSnapshot(...))             │  │
  │    │       ← CopyOnWriteArrayList                     │  │
  │    └──────────────────────────────────────────────────┘  │
  │                                                         │
  │    ② node.execute(state)  ← 正常执行节点逻辑           │
  │                                                         │
  │    ③ stepIndex++                                        │
  └─────────────────────────────────────────────────────────┘
Deep Copy:快照中的 stateData 是深拷贝,与运行时状态完全独立。即使后续节点修改了状态,历史快照中保存的仍是当时的原始数据。

四、快照恢复流程

  快照恢复 — restoreSnapshot(executionId, snapshotId):

  ┌───────────────────────────────────────────────────────────────┐
  │  ① 查找快照                                                  │
  │     snapshots = store.get(executionId)                        │
  │     target = snapshots.stream()                               │
  │         .filter(s -> s.snapshotId().equals(snapshotId))       │
  │         .findFirst()                                          │
  │                                                               │
  │  ② 未找到 → 抛出 SnapshotNotFoundException                   │
  │                                                               │
  │  ③ 找到 → 恢复状态:                                          │
  │     ├── 创建新的 GraphState 实例                              │
  │     ├── 将 snapshot.stateData() 的所有键值对写入新状态         │
  │     ├── 设置 currentNodeId = snapshot.nodeId()                │
  │     └── 返回恢复后的 GraphState                               │
  │                                                               │
  │  调用方可以用恢复的状态重新从该节点开始执行 Graph              │
  └───────────────────────────────────────────────────────────────┘

五、存储限制与淘汰

配置说明
MAX_EXECUTIONS100最多保留 100 个执行的快照集合
MAX_SNAPSHOTS_PER_EXECUTION200每个执行最多 200 个快照
淘汰策略最旧优先超限时按 timestamp 删除最旧的执行/快照
并发安全computeIfAbsent原子性保证,避免 TOCTOU 竞态
  存储结构:

  ConcurrentHashMap<executionId, CopyOnWriteArrayList<GraphSnapshot>>
  │
  ├── exec_001 → [snap_0, snap_1, snap_2, ..., snap_N]  (≤200)
  ├── exec_002 → [snap_0, snap_1, ...]
  ├── ...
  └── exec_100 → [snap_0, snap_1, ...]                   (≤100 个 execution)

  超限淘汰:
  ├── execution 数 > 100 → 删除 timestamp 最旧的 execution 整组
  └── snapshot 数 > 200  → 删除该 execution 下 timestamp 最旧的 snapshot

六、调试 API

所有时间旅行 API 均需要 ADMIN 权限,仅供调试使用。

方法路径描述
GET/api/debug/timetravel/{executionId}获取指定执行的所有快照列表
POST/api/debug/timetravel/{executionId}/restore/{snapshotId}恢复到指定快照,返回恢复后的状态
GET/api/debug/timetravel-status获取时间旅行系统状态(执行数、快照总数等)
// 查询执行快照列表
GET /api/debug/timetravel/exec_001

{
  "executionId": "exec_001",
  "snapshotCount": 5,
  "snapshots": [
    {"snapshotId": "snap_a1b2", "nodeId": "start",   "stepIndex": 0, "timestamp": "..."},
    {"snapshotId": "snap_c3d4", "nodeId": "analyze",  "stepIndex": 1, "timestamp": "..."},
    {"snapshotId": "snap_e5f6", "nodeId": "decide",   "stepIndex": 2, "timestamp": "..."},
    {"snapshotId": "snap_g7h8", "nodeId": "generate", "stepIndex": 3, "timestamp": "..."},
    {"snapshotId": "snap_i9j0", "nodeId": "end",      "stepIndex": 4, "timestamp": "..."}
  ]
}

// 恢复到 "decide" 节点
POST /api/debug/timetravel/exec_001/restore/snap_e5f6

{
  "restored": true,
  "nodeId": "decide",
  "stepIndex": 2,
  "stateData": { "input": "...", "analysisResult": "..." }
}

七、面试高频问题

Q: 为什么用 CopyOnWriteArrayList?
A: 快照的访问模式是典型的读多写少:快照一旦创建很少修改,但调试时会频繁查询遍历。CopyOnWriteArrayList 在写入时复制整个数组(成本高但不频繁),在读取时无需加锁(零成本且高频)。如果用普通 ArrayList + synchronized,每次调试查询都需要加锁,严重影响读取性能。
Q: computeIfAbsent 解决了什么问题?
A: 解决了 TOCTOU(Time-of-check to time-of-use)竞态条件。如果不用 computeIfAbsent,代码会写成:if (!map.containsKey(id)) map.put(id, new List()); map.get(id).add(snap);——在高并发下,两个线程可能同时判断 key 不存在,各自创建一个新 List,后者覆盖前者导致快照丢失。computeIfAbsent 将"检查 + 创建"合为一个原子操作,彻底消除竞态。
Q: 生产环境如何使用?
A: 当前实现使用进程内内存存储快照,适合开发和调试场景。生产环境可以通过实现 SnapshotStore 接口扩展为持久化存储(Redis 或数据库):Redis 适合快速读写但有容量限制,数据库适合长期保存和审计。同时需要增加快照的自动过期清理(TTL)和按需采样(不是每个节点都保存快照)来控制存储成本。

八、Graph Observation 指标体系

Micrometer 集成架构

  GraphRuntime ──▶ ObservationRegistry ──▶ MeterRegistry ──▶ Prometheus ──▶ Grafana

  ┌──────────────┐    ┌─────────────────────┐    ┌───────────────┐    ┌────────────┐    ┌─────────┐
  │ GraphRuntime │───▶│ ObservationRegistry │───▶│ MeterRegistry │───▶│ Prometheus │───▶│ Grafana │
  │              │    │                     │    │               │    │            │    │         │
  │ 节点执行     │    │ Observation API     │    │ Timer         │    │ /actuator  │    │ 可视化  │
  │ 状态变更     │    │ start/stop/error    │    │ Counter       │    │ /prometheus│    │ 告警    │
  │ Checkpoint   │    │ ObservationHandler  │    │ Gauge         │    │ pull模式   │    │ 面板    │
  └──────────────┘    └─────────────────────┘    └───────────────┘    └────────────┘    └─────────┘

核心指标表

Metric NameTypeTagsDescription
graph.node.execution.timeTimeragentId, nodeId, status节点执行耗时
graph.total.execution.timeTimeragentId, graphId图总执行时间
graph.node.error.countCounteragentId, nodeId, errorType节点错误计数
graph.iteration.countCounteragentIdReAct迭代次数
graph.checkpoint.save.timeTimeragentId, saverTypeCheckpoint保存耗时
graph.hitl.pending.countGaugeagentId当前等待审批数

Prometheus 查询示例

# P95 节点执行耗时
histogram_quantile(0.95, rate(graph_node_execution_time_seconds_bucket[5m]))

# 每分钟错误率
rate(graph_node_error_count_total[1m])

# 活跃HITL等待数
graph_hitl_pending_count

Grafana Dashboard 面板建议

面板名称图表类型数据来源
节点耗时热力图Heatmapgraph.node.execution.time
错误率趋势线Time Seriesgraph.node.error.count
迭代次数分布直方图Histogramgraph.iteration.count
Checkpoint保存延迟Time Seriesgraph.checkpoint.save.time

九、Redis/MySQL Checkpoint 对比

双存储架构

                    GraphRuntime
                        │
                  ┌─────┴─────┐
                  ▼           ▼
          ┌───────────┐  ┌───────────┐
          │   Redis   │  │   MySQL   │
          │ Saver     │  │ Saver     │
          ├───────────┤  ├───────────┤
          │ 高性能    │  │ 持久化    │
          │ TTL自清理 │  │ 可查询    │
          │ 活跃会话  │  │ 审计回放  │
          └───────────┘  └───────────┘

Redis vs MySQL 对比

DimensionRedisMySQL
读写速度<1ms5-20ms
持久性内存+AOF磁盘持久
容量有限(内存)无限(磁盘)
自动清理TTL过期手动/定时
适用场景活跃会话历史审计
故障恢复重启丢失风险完全恢复
查询能力Key-ValueSQL查询

GraphInfraConfiguration 条件装配

@Configuration
@ConditionalOnProperty(prefix = "agent.defaults.graph",
                       name = "checkpoint-enabled", havingValue = "true")
public class GraphInfraConfiguration {

    @Bean
    @ConditionalOnProperty(name = "agent.defaults.graph.checkpoint-type",
                           havingValue = "redis")
    public BaseSaver redisSaver(...) { ... }

    @Bean
    @ConditionalOnProperty(name = "agent.defaults.graph.checkpoint-type",
                           havingValue = "mysql")
    public BaseSaver mysqlSaver(...) { ... }
}
条件装配:通过 @ConditionalOnProperty 实现 Redis/MySQL Saver 的互斥加载。配置 checkpoint-type=redis 时只创建 RedisSaver Bean,反之只创建 MySQLSaver Bean,避免不必要的连接池初始化。

十、更多面试高频问题

Q: 分布式场景下如何保证 Checkpoint 一致性?多节点同时写入同一个 threadId 会冲突吗?
A: 单线程模型——同一 threadId 的图执行是串行的(通过分布式锁保证)。Checkpoint 写入使用乐观锁(version 字段)。Redis 用 WATCH+MULTI 事务。实际场景很少并发,因为一个对话 = 一个 thread。
Q: ObservationRegistry 的设计模式是什么?为什么不直接用 MeterRegistry?
A: ObservationRegistry 是观察者模式 + 装饰器模式的组合。优于直接 MeterRegistry:① 统一 API(metrics + tracing + logging);② 可插拔 handler;③ 低开销(未注册时空操作)。这是 Spring 官方推荐方式。
Q: Time-Travel 的深拷贝成本高吗?有什么优化手段?
A: 成本:GraphState 包含 messages 列表(可能很大)。优化手段:① 增量快照(只存 diff);② 写时复制(COW);③ 限制 MAX_SNAPSHOTS_PER_EXECUTION=200 上限;④ 可选关闭(生产环境 debug=false)。
Q: Redis Checkpoint 的 TTL 设多长合适?过期后用户还能恢复会话吗?
A: 活跃会话 TTL=30min(可配置)。过期后不能恢复。方案:① 双写(Redis 快速访问 + MySQL 持久备份);② TTL 延长 + touch 机制(每次访问刷新 TTL)。
Q: 如果 Graph 执行中途 OOM,Checkpoint 能恢复吗?
A: 取决于最后一次成功保存的 Checkpoint。每个节点执行完后保存一次。OOM 在节点执行中途 = 丢失该节点进度,从上一个节点 Checkpoint 恢复重试
Q: Prometheus 拉取频率和 Graph 指标精度的权衡?
A: 默认 15s 拉取间隔。图执行通常 2-30s 完成。长执行能被捕获;短执行可能在两次拉取间完成。解决:使用 Summary/Histogram 聚合,不依赖瞬时采样。