HealthCheckable接口、心跳机制、僵尸检测、断点续传、指数退避重试
Agent任务可能运行数分钟甚至数小时,必须有机制检测任务是否还在正常执行,而不是已经"假死"。
健康监控整体架构:
┌───────────────────────────────────────────────────────────┐
│ AgentHealthMonitor │
│ (ScheduledExecutorService, 每30秒执行一次) │
└─────────────────────────┬─────────────────────────────────┘
│
┌─────────────┼─────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Node A │ │ Node B │ │ Node C │
│ Tasks: 3 │ │ Tasks: 5 │ │ Tasks: 2 │
│ Heartbeat ♥ │ │ Heartbeat ♥ │ │ Heartbeat ♥ │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└─────────────────┼─────────────────┘
▼
┌───────────────────────────────────────────────────────────┐
│ Redis (分布式状态) │
│ health:{nodeId} → { lastHeartbeat, taskCount, status } │
│ task:{taskId}:heartbeat → timestamp (TTL=90s) │
└───────────────────────────────────────────────────────────┘
Agent可以选择性实现此接口,提供自定义健康检查逻辑:
public interface HealthCheckable {
/**
* 健康检查 - 返回当前Agent的健康状态
* @return HealthStatus 包含状态码和诊断信息
*/
HealthStatus checkHealth();
/**
* 优雅停止 - 收到停止信号时的清理逻辑
* @param timeout 最大等待时间
*/
void gracefulShutdown(Duration timeout);
/**
* 是否支持断点续传
*/
default boolean supportsCheckpoint() { return false; }
}
public record HealthStatus(
Status status, // UP / DOWN / DEGRADED
String message, // 人类可读的诊断信息
Map<String, Object> details // 扩展指标
) {
public enum Status { UP, DOWN, DEGRADED }
}
HealthCheckable 使用场景: Agent实现了HealthCheckable? ┌──────────────────────────────────────────────────┐ │ YES → 调用 agent.checkHealth() │ │ · UP → 正常,继续监控 │ │ · DEGRADED → 告警,标记为亚健康 │ │ · DOWN → 触发任务转移/重试 │ │ │ │ NO → 默认只通过心跳超时判断存活 │ └──────────────────────────────────────────────────┘
所有任务生命周期操作通过一个门面统一管理,屏蔽底层复杂性:
AgentTaskLifecycleService (Facade):
┌─────────────────────────────────────────────────────┐
│ submitTask(request) │
│ → 创建任务 → 启动心跳 → 执行Agent → 完成/失败 │
│ │
│ cancelTask(taskId) │
│ → 发送中断信号 → 等待Agent优雅停止 → 更新状态 │
│ │
│ retryTask(taskId) │
│ → 加载检查点 → 从断点恢复 → 继续执行 │
│ │
│ getTaskStatus(taskId) │
│ → 查询实时状态 + 进度百分比 + 最后心跳时间 │
└─────────────────────────────────────────────────────┘
任务状态机:
┌────────┐ submit ┌─────────┐ agent执行 ┌──────────┐
│CREATED │──────────→│ RUNNING │───────────→│COMPLETED │
└────────┘ └────┬────┘ └──────────┘
│
┌────────┼────────┐
▼ ▼ ▼
┌──────────┐┌───────┐┌────────┐
│CANCELLED ││FAILED ││RETRYING│
└──────────┘└───┬───┘└───┬────┘
│ │
└───→────┘
重试成功→RUNNING
核心设计:使用独立线程池 + REQUIRES_NEW 事务保证心跳不受业务事务影响。
@Component
public class TaskHeartbeatService {
private final ScheduledExecutorService heartbeatExecutor =
Executors.newScheduledThreadPool(4, r -> {
Thread t = new Thread(r, "heartbeat-worker");
t.setDaemon(true);
return t;
});
private final ConcurrentMap<String, ScheduledFuture<?>> activeHeartbeats
= new ConcurrentHashMap<>();
/**
* 开始心跳 - 每15秒向Redis写入时间戳
*/
public void startHeartbeat(String taskId) {
ScheduledFuture<?> future = heartbeatExecutor.scheduleAtFixedRate(
() -> sendHeartbeat(taskId),
0, 15, TimeUnit.SECONDS // 初始0延迟,每15秒一次
);
activeHeartbeats.put(taskId, future);
}
/**
* REQUIRES_NEW事务 - 心跳写入独立于业务事务
* 即使业务事务回滚,心跳依然能写入
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void sendHeartbeat(String taskId) {
String key = "task:" + taskId + ":heartbeat";
redisTemplate.opsForValue().set(key,
String.valueOf(System.currentTimeMillis()),
90, TimeUnit.SECONDS // TTL=90s, 3倍心跳间隔
);
}
public void stopHeartbeat(String taskId) {
ScheduledFuture<?> future = activeHeartbeats.remove(taskId);
if (future != null) future.cancel(false);
}
}
心跳时序:
时间轴 ────────────────────────────────────────────→
0s 15s 30s 45s 60s 75s 90s
│ │ │ │ │ │ │
♥ ♥ ♥ ♥ ♥ ♥ ♥ ← 正常心跳
│ │ │ │
♥ ♥ ♥ ✗ ← 45s心跳丢失
│
60s后(距最后心跳):
⚠️ 标记为疑似僵尸
90s后(TTL过期):
☠️ 确认僵尸,触发回收
关键参数:
· 心跳间隔: 15秒 (足够频繁又不浪费资源)
· Redis TTL: 90秒 (3倍间隔,容忍2次心跳丢失)
· 僵尸阈值: 60秒无心跳 → 疑似僵尸
· 确认阈值: 90秒无心跳 → 确认僵尸
僵尸检测流程 (每30秒扫描一次):
┌────────────────────────────────────────────────────────┐
│ ZombieTaskDetector.scan() │
│ │
│ 1. 查询所有状态=RUNNING的任务 │
│ 2. 检查每个任务的最后心跳时间 │
│ 3. 分类处理: │
└────────────────────────┬───────────────────────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
lastHB < 60s 60s ≤ lastHB < 90s lastHB ≥ 90s
┌──────────┐ ┌──────────────┐ ┌─────────────┐
│ 正常 │ │ 疑似僵尸 │ │ 确认僵尸 │
│ 继续监控 │ │ · 记录告警 │ │ · 回收处理 │
└──────────┘ │ · 尝试ping │ └──────┬──────┘
└──────────────┘ │
▼
┌────── retries < maxRetries? ──────┐
│ YES NO │
▼ ▼ │
┌─────────────┐ ┌─────────────┐ │
│ 重新调度 │ │ 标记FAILED │ │
│ status→ │ │ 发送告警 │ │
│ RETRYING │ │ 人工介入 │ │
└─────────────┘ └─────────────┘ │
│
└────────────────────────────────────────────┘
重试间隔采用指数退避策略,避免"重试风暴"压垮系统:
public class ExponentialBackoffCalculator {
private static final Duration BASE_DELAY = Duration.ofMinutes(1); // 基础1分钟
private static final Duration MAX_DELAY = Duration.ofHours(1); // 上限1小时
private static final double JITTER_FACTOR = 0.2; // ±20%抖动
/**
* 计算第N次重试的等待时间
* 公式: min(BASE * 2^(attempt-1), MAX) * (1 ± JITTER)
*
* attempt=1: 1min * (0.8~1.2) = 48s ~ 72s
* attempt=2: 2min * (0.8~1.2) = 96s ~ 144s
* attempt=3: 4min * (0.8~1.2) = 192s ~ 288s
* attempt=4: 8min * (0.8~1.2) = 384s ~ 576s
* attempt=5: 16min * (0.8~1.2) = 768s ~ 1152s
* attempt=6: 32min * (0.8~1.2) = 1536s ~ 2304s
* attempt=7: 60min (cap) = 2880s ~ 4320s
*/
public Duration calculateDelay(int attempt) {
long baseMs = BASE_DELAY.toMillis();
long exponentialMs = baseMs * (1L << (attempt - 1));
long cappedMs = Math.min(exponentialMs, MAX_DELAY.toMillis());
// 添加±20%随机抖动,防止多任务同时重试
double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1)
* JITTER_FACTOR;
return Duration.ofMillis((long)(cappedMs * jitter));
}
}
指数退避可视化:
等待时间
60min ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬─── CAP上限
│ │
32min ─ ╱──────╱
│ ╱──────╱
16min ─ ╱──────╱
│ ╱──────╱
8min ─ ╱──╱
4min ─ ╱──╱ ←── ±20% jitter范围
2min ─ ╱──╱
1min ─╱─╱
└──────┬──────┬──────┬──────┬──────┬──────┬──────→ 重试次数
1 2 3 4 5 6 7
为什么需要抖动?
┌────────────────────────────────────────────────┐
│ 无抖动: 100个任务在同一时刻重试 → 瞬间高峰 │
│ 有抖动: 100个任务分散在±20%时间窗内重试 │
│ → 负载均匀分布,避免"惊群效应" │
└────────────────────────────────────────────────┘
@Component
public class TaskRetryProducer {
private final RocketMQTemplate rocketMQ;
private final ExponentialBackoffCalculator backoff;
/**
* 发送延迟重试消息到RocketMQ
*/
public void scheduleRetry(String taskId, int attempt) {
Duration delay = backoff.calculateDelay(attempt);
TaskRetryMessage message = new TaskRetryMessage(taskId, attempt, Instant.now());
rocketMQ.syncSendDelayTimeMills(
"TASK_RETRY_TOPIC",
MessageBuilder.withPayload(message).build(),
delay.toMillis()
);
log.info("任务[{}]第{}次重试已调度,将在{}后执行",
taskId, attempt, formatDuration(delay));
}
}
@Component
@RocketMQMessageListener(topic = "TASK_RETRY_TOPIC", consumerGroup = "task-retry-group")
public class TaskRetryConsumer implements RocketMQListener<TaskRetryMessage> {
@Override
public void onMessage(TaskRetryMessage msg) {
log.info("执行任务[{}]第{}次重试", msg.taskId(), msg.attempt());
agentTaskLifecycleService.retryTask(msg.taskId());
}
}
长时间运行的Agent任务可以在执行过程中保存检查点,失败后从最近的检查点恢复而不是从头开始。
断点续传流程:
正常执行:
┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐
│步骤1│→ │步骤2│→ │步骤3│→ │步骤4│→ │步骤5│→ 完成
└──┬─┘ └──┬─┘ └──┬─┘ └────┘ └────┘
│ │ │
▼ CP1 ▼ CP2 ▼ CP3 ← 每步保存检查点(Checkpoint)
异常中断:
┌────┐ ┌────┐ ┌────┐
│步骤1│→ │步骤2│→ │步骤3│→ ✗ 崩溃!
└────┘ └────┘ └──┬─┘
│
▼ CP3已保存
断点恢复:
┌────┐ ┌────┐
从CP3恢复 ──────────→│步骤4│→ │步骤5│→ 完成 ✅
└────┘ └────┘
跳过了步骤1~3,节省大量时间和LLM调用成本!
@Service
public class AgentTaskCheckpointService {
private final DistributedCheckpointStore checkpointStore;
/**
* 保存检查点 - Agent执行过程中定期调用
*/
public void saveCheckpoint(String taskId, CheckpointData data) {
checkpointStore.save(
"checkpoint:" + taskId + ":" + data.stepId(),
serialize(data),
Duration.ofHours(24) // 检查点保留24小时
);
// 更新任务进度
checkpointStore.save(
"checkpoint:" + taskId + ":latest",
data.stepId(),
Duration.ofHours(24)
);
}
/**
* 加载最新检查点 - 重试时调用
*/
public Optional<CheckpointData> loadLatestCheckpoint(String taskId) {
String latestStep = checkpointStore.get("checkpoint:" + taskId + ":latest");
if (latestStep == null) return Optional.empty();
String data = checkpointStore.get("checkpoint:" + taskId + ":" + latestStep);
return Optional.ofNullable(data).map(this::deserialize);
}
/**
* 检查点数据结构
*/
public record CheckpointData(
String stepId, // 步骤标识
int stepIndex, // 第几步
Map<String, Object> state, // 中间状态
List<Message> messages, // 已积累的对话历史
Instant savedAt // 保存时间
) {}
}
任务完整生命周期:
t=0s 用户提交任务
│ ├── AgentTaskLifecycleService.submitTask()
│ ├── 创建TaskRecord (status=CREATED)
│ ├── 分配到可用节点
│ └── status → RUNNING
│
t=0s 启动心跳
│ └── TaskHeartbeatService.startHeartbeat(taskId)
│ 每15秒写入Redis: task:{id}:heartbeat → timestamp
│
t=Ns Agent执行中...
│ ├── 定期保存Checkpoint
│ ├── 通过SSE推送进度
│ └── 心跳持续发送 ♥♥♥
│
════════ 正常完成路径 ════════
t=Xs Agent执行完成
│ ├── status → COMPLETED
│ ├── 停止心跳
│ └── 清理Checkpoint
│
════════ 异常路径 ════════
t=Ys Agent异常/节点宕机
│ ├── 心跳停止 (TTL倒计时)
│ ├── 60s后: ZombieDetector标记疑似僵尸
│ ├── 90s后: TTL过期,确认僵尸
│ ├── 检查retryCount < maxRetries?
│ │ ├── YES: status → RETRYING
│ │ │ 发送RocketMQ延迟消息
│ │ │ 等待指数退避时间后重试
│ │ │ 从Checkpoint恢复执行
│ │ └── NO: status → FAILED
│ │ 发送告警通知
│ └── 更新TaskRecord