HealthCheckable接口、心跳机制、僵尸检测、Graph Checkpoint、指数退避重试
Agent任务可能运行数分钟甚至数小时,必须有机制检测任务是否还在正常执行,而不是已经"假死"。
健康监控整体架构:
┌───────────────────────────────────────────────────────────┐
│ AgentHealthMonitor │
│ (@Scheduled,默认每60秒执行一次) │
└─────────────────────────┬─────────────────────────────────┘
│
┌─────────────┼─────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Node A │ │ Node B │ │ Node C │
│ Tasks: 3 │ │ Tasks: 5 │ │ Tasks: 2 │
│ Heartbeat ♥ │ │ Heartbeat ♥ │ │ Heartbeat ♥ │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└─────────────────┼─────────────────┘
▼
┌───────────────────────────────────────────────────────────┐
│ AgentHealthStore (local / redis) │
│ AgentTaskRecord (DB): status / lastHeartbeat / retry │
│ DistributedJobGuard: 集群下避免重复扫描 │
└───────────────────────────────────────────────────────────┘
Agent可以选择性实现此接口,提供自定义健康检查逻辑:
public interface HealthCheckable {
/**
* 健康检查 - 返回当前Agent的健康状态
* @return HealthStatus 包含状态码和诊断信息
*/
HealthStatus checkHealth();
/**
* 健康检查间隔。默认 60 秒。
*/
default Duration getHealthCheckInterval() {
return Duration.ofSeconds(60);
}
}
public class HealthStatus {
private final String agentId;
private final HealthState state; // UP / DOWN / DEGRADED / UNKNOWN
private final String detail;
private final LocalDateTime checkedAt;
private final Map<String, Object> metrics;
}
HealthCheckable 使用场景: Agent实现了HealthCheckable? ┌──────────────────────────────────────────────────┐ │ YES → 调用 agent.checkHealth() │ │ · UP → 正常,继续监控 │ │ · DEGRADED → 告警,标记为亚健康 │ │ · DOWN → 触发任务转移/重试 │ │ │ │ NO → 默认只通过心跳超时判断存活 │ └──────────────────────────────────────────────────┘
所有任务生命周期操作通过一个门面统一管理,屏蔽底层复杂性:
AgentTaskLifecycleService (Facade):
┌─────────────────────────────────────────────────────┐
│ submitTask(request) │
│ → 创建任务 → 启动心跳 → 执行Agent → 完成/失败 │
│ │
│ cancelTask(taskId) │
│ → 发送中断信号 → 等待Agent优雅停止 → 更新状态 │
│ │
│ retryTask(taskId) │
│ → 加载检查点 → 从断点恢复 → 继续执行 │
│ │
│ getTaskStatus(taskId) │
│ → 查询实时状态 + 进度百分比 + 最后心跳时间 │
└─────────────────────────────────────────────────────┘
任务状态机:
┌────────┐ submit ┌─────────┐ agent执行 ┌──────────┐
│CREATED │──────────→│ RUNNING │───────────→│COMPLETED │
└────────┘ └────┬────┘ └──────────┘
│
┌────────┼────────┐
▼ ▼ ▼
┌──────────┐┌───────┐┌────────┐
│CANCELLED ││FAILED ││RETRYING│
└──────────┘└───┬───┘└───┬────┘
│ │
└───→────┘
重试成功→RUNNING
核心设计:使用独立线程池 + REQUIRES_NEW 事务保证心跳不受业务事务影响。
@Component
public class TaskHeartbeatService {
private final ScheduledExecutorService heartbeatExecutor =
Executors.newScheduledThreadPool(4, r -> {
Thread t = new Thread(r, "heartbeat-worker");
t.setDaemon(true);
return t;
});
private final ConcurrentMap<String, ScheduledFuture<?>> activeHeartbeats
= new ConcurrentHashMap<>();
/**
* 开始心跳 - 每15秒向Redis写入时间戳
*/
public void startHeartbeat(String taskId) {
ScheduledFuture<?> future = heartbeatExecutor.scheduleAtFixedRate(
() -> sendHeartbeat(taskId),
0, 15, TimeUnit.SECONDS // 初始0延迟,每15秒一次
);
activeHeartbeats.put(taskId, future);
}
/**
* REQUIRES_NEW事务 - 心跳写入独立于业务事务
* 即使业务事务回滚,心跳依然能写入
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void sendHeartbeat(String taskId) {
String key = "task:" + taskId + ":heartbeat";
redisTemplate.opsForValue().set(key,
String.valueOf(System.currentTimeMillis()),
90, TimeUnit.SECONDS // TTL=90s, 3倍心跳间隔
);
}
public void stopHeartbeat(String taskId) {
ScheduledFuture<?> future = activeHeartbeats.remove(taskId);
if (future != null) future.cancel(false);
}
}
心跳时序:
时间轴 ────────────────────────────────────────────→
0s 10s 20s 30s 40s 50s 60s
│ │ │ │ │ │ │
♥ ♥ ♥ ♥ ♥ ♥ ♥ ← 正常心跳
│ │ │ │
♥ ♥ ♥ ✗ ← 30s后不再更新
│
超过 heartbeatTimeout:
⚠️ 扫描为疑似僵尸
可重试 → RETRY
不可重试 → TIMEOUT/FAILED
关键参数:
· 默认心跳间隔: 10秒
· 默认心跳超时: 30秒
· 存储位置: AgentTaskRecord.lastHeartbeat (数据库)
· 扫描间隔: 默认30秒
僵尸检测流程 (每30秒扫描一次):
┌────────────────────────────────────────────────────────┐
│ ZombieTaskDetector.scan() │
│ │
│ 1. 查询所有状态=RUNNING的任务 │
│ 2. 检查每个任务的最后心跳时间 │
│ 3. 分类处理: │
└────────────────────────┬───────────────────────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
lastHB < 60s 60s ≤ lastHB < 90s lastHB ≥ 90s
┌──────────┐ ┌──────────────┐ ┌─────────────┐
│ 正常 │ │ 疑似僵尸 │ │ 确认僵尸 │
│ 继续监控 │ │ · 记录告警 │ │ · 回收处理 │
└──────────┘ │ · 尝试ping │ └──────┬──────┘
└──────────────┘ │
▼
┌────── retries < maxRetries? ──────┐
│ YES NO │
▼ ▼ │
┌─────────────┐ ┌─────────────┐ │
│ 重新调度 │ │ 标记FAILED │ │
│ status→ │ │ 发送告警 │ │
│ RETRYING │ │ 人工介入 │ │
└─────────────┘ └─────────────┘ │
│
└────────────────────────────────────────────┘
重试间隔采用指数退避策略,避免"重试风暴"压垮系统:
public class ExponentialBackoffCalculator {
private static final Duration BASE_DELAY = Duration.ofMinutes(1); // 基础1分钟
private static final Duration MAX_DELAY = Duration.ofHours(1); // 上限1小时
private static final double JITTER_FACTOR = 0.2; // ±20%抖动
/**
* 计算第N次重试的等待时间
* 公式: min(BASE * 2^(attempt-1), MAX) * (1 ± JITTER)
*
* attempt=1: 1min * (0.8~1.2) = 48s ~ 72s
* attempt=2: 2min * (0.8~1.2) = 96s ~ 144s
* attempt=3: 4min * (0.8~1.2) = 192s ~ 288s
* attempt=4: 8min * (0.8~1.2) = 384s ~ 576s
* attempt=5: 16min * (0.8~1.2) = 768s ~ 1152s
* attempt=6: 32min * (0.8~1.2) = 1536s ~ 2304s
* attempt=7: 60min (cap) = 2880s ~ 4320s
*/
public Duration calculateDelay(int attempt) {
long baseMs = BASE_DELAY.toMillis();
long exponentialMs = baseMs * (1L << (attempt - 1));
long cappedMs = Math.min(exponentialMs, MAX_DELAY.toMillis());
// 添加±20%随机抖动,防止多任务同时重试
double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1)
* JITTER_FACTOR;
return Duration.ofMillis((long)(cappedMs * jitter));
}
}
指数退避可视化:
等待时间
60min ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬─── CAP上限
│ │
32min ─ ╱──────╱
│ ╱──────╱
16min ─ ╱──────╱
│ ╱──────╱
8min ─ ╱──╱
4min ─ ╱──╱ ←── ±20% jitter范围
2min ─ ╱──╱
1min ─╱─╱
└──────┬──────┬──────┬──────┬──────┬──────┬──────→ 重试次数
1 2 3 4 5 6 7
为什么需要抖动?
┌────────────────────────────────────────────────┐
│ 无抖动: 100个任务在同一时刻重试 → 瞬间高峰 │
│ 有抖动: 100个任务分散在±20%时间窗内重试 │
│ → 负载均匀分布,避免"惊群效应" │
└────────────────────────────────────────────────┘
@Component
public class TaskRetryProducer {
private final RocketMQTemplate rocketMQ;
private final ExponentialBackoffCalculator backoff;
/**
* 发送延迟重试消息到RocketMQ
*/
public void scheduleRetry(String taskId, int attempt) {
Duration delay = backoff.calculateDelay(attempt);
TaskRetryMessage message = new TaskRetryMessage(taskId, attempt, Instant.now());
rocketMQ.syncSendDelayTimeMills(
"TASK_RETRY_TOPIC",
MessageBuilder.withPayload(message).build(),
delay.toMillis()
);
log.info("任务[{}]第{}次重试已调度,将在{}后执行",
taskId, attempt, formatDuration(delay));
}
}
@Component
@RocketMQMessageListener(topic = "TASK_RETRY_TOPIC", consumerGroup = "task-retry-group")
public class TaskRetryConsumer implements RocketMQListener<TaskRetryMessage> {
@Override
public void onMessage(TaskRetryMessage msg) {
log.info("执行任务[{}]第{}次重试", msg.taskId(), msg.attempt());
agentTaskLifecycleService.retryTask(msg.taskId());
}
}
长时间运行的 Graph Agent 可以在执行过程中保存检查点,用于 HITL 恢复、重试恢复和问题排查。当前代码保存的是任务记录中的 Graph checkpoint 和独立 checkpoint 表,不是“所有任务自动从 Redis 断点续跑”。
Graph Checkpoint 恢复流程:
正常执行:
┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐
│步骤1│→ │步骤2│→ │步骤3│→ │步骤4│→ │步骤5│→ 完成
└──┬─┘ └──┬─┘ └──┬─┘ └────┘ └────┘
│ │ │
▼ CP1 ▼ CP2 ▼ CP3 ← 每步保存检查点(Checkpoint)
异常中断:
┌────┐ ┌────┐ ┌────┐
│步骤1│→ │步骤2│→ │步骤3│→ ✗ 崩溃!
└────┘ └────┘ └──┬─┘
│
▼ CP3已保存
断点恢复:
┌────┐ ┌────┐
从CP3恢复 ──────────→│步骤4│→ │步骤5│→ 完成 ✅
└────┘ └────┘
可从最近 checkpoint 恢复 Graph 状态;
是否自动继续执行取决于后续请求/调度链路。
@Service
public class AgentTaskCheckpointService {
private final AgentTaskCheckpointRepository checkpointRepository;
/**
* 追加检查点 - Graph 执行过程中调用
*/
public void appendCheckpoint(String taskId,
String nodeId,
String stateSnapshot,
Integer expectedVersion,
String contextHash,
LocalDateTime expiresAt) {
AgentTaskCheckpoint checkpoint = new AgentTaskCheckpoint();
checkpoint.setTaskId(taskId);
checkpoint.setNodeId(nodeId);
checkpoint.setStateSnapshot(stateSnapshot != null ? stateSnapshot : "{}");
checkpoint.setContextHash(contextHash);
checkpoint.setExpiresAt(expiresAt);
checkpointRepository.save(checkpoint);
}
/**
* 查询最新检查点
*/
public Optional<AgentTaskCheckpoint> findLatest(String taskId) {
return checkpointRepository.findTopByTaskIdOrderBySequenceDesc(taskId);
}
}
任务完整生命周期: t=0s 用户提交任务 │ ├── AgentTaskOrchestrator.execute() │ ├── AgentTaskLifecycleService.onTaskCreated() │ ├── 创建 AgentTaskRecord (status=PENDING) │ └── onTaskStarted() → RUNNING │ t=0s 启动心跳 │ └── AgentTaskHeartbeatManager.startHeartbeat(taskId) │ 定期更新数据库 lastHeartbeat │ t=Ns Agent执行中... │ ├── Graph 模式保存 Checkpoint │ ├── 通过SSE推送进度 │ └── 心跳持续发送 ♥♥♥ │ ════════ 正常完成路径 ════════ t=Xs Agent执行完成 │ ├── status → COMPLETED │ ├── 停止心跳 │ └── 清理Checkpoint │ ════════ 异常路径 ════════ t=Ys Agent异常/节点宕机 │ ├── 心跳停止 │ ├── 定时扫描 RUNNING 且 lastHeartbeat 超时的任务 │ ├── 检查retryCount < maxRetries? │ │ ├── YES: status → RETRY │ │ │ 计算 nextRetryAt │ │ │ 可发布 RocketMQ retry schedule 事件 │ │ │ 到期后标记回 PENDING │ │ └── NO: status → FAILED │ └── 更新TaskRecord