32 - Agent健康监控与任务生命周期

HealthCheckable接口、心跳机制、僵尸检测、Graph Checkpoint、指数退避重试

一、AgentHealthMonitor 分布式健康检查

Agent任务可能运行数分钟甚至数小时,必须有机制检测任务是否还在正常执行,而不是已经"假死"。

健康监控整体架构:

  ┌───────────────────────────────────────────────────────────┐
  │                  AgentHealthMonitor                        │
  │  (@Scheduled,默认每60秒执行一次)                         │
  └─────────────────────────┬─────────────────────────────────┘
                            │
              ┌─────────────┼─────────────────┐
              ▼             ▼                 ▼
  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐
  │  Node A      │  │  Node B      │  │  Node C      │
  │  Tasks: 3    │  │  Tasks: 5    │  │  Tasks: 2    │
  │  Heartbeat ♥ │  │  Heartbeat ♥ │  │  Heartbeat ♥ │
  └──────────────┘  └──────────────┘  └──────────────┘
         │                 │                 │
         └─────────────────┼─────────────────┘
                           ▼
  ┌───────────────────────────────────────────────────────────┐
  │  AgentHealthStore (local / redis)                         │
  │  AgentTaskRecord (DB): status / lastHeartbeat / retry     │
  │  DistributedJobGuard: 集群下避免重复扫描                   │
  └───────────────────────────────────────────────────────────┘

二、HealthCheckable 接口设计

Agent可以选择性实现此接口,提供自定义健康检查逻辑:

public interface HealthCheckable {

    /**
     * 健康检查 - 返回当前Agent的健康状态
     * @return HealthStatus 包含状态码和诊断信息
     */
    HealthStatus checkHealth();

    /**
     * 健康检查间隔。默认 60 秒。
     */
    default Duration getHealthCheckInterval() {
        return Duration.ofSeconds(60);
    }
}

public class HealthStatus {
    private final String agentId;
    private final HealthState state; // UP / DOWN / DEGRADED / UNKNOWN
    private final String detail;
    private final LocalDateTime checkedAt;
    private final Map<String, Object> metrics;
}
HealthCheckable 使用场景:

  Agent实现了HealthCheckable?
  ┌──────────────────────────────────────────────────┐
  │ YES → 调用 agent.checkHealth()                   │
  │   · UP       → 正常,继续监控                    │
  │   · DEGRADED → 告警,标记为亚健康               │
  │   · DOWN     → 触发任务转移/重试                │
  │                                                  │
  │ NO → 默认只通过心跳超时判断存活                 │
  └──────────────────────────────────────────────────┘

三、AgentTaskLifecycleService 统一门面

所有任务生命周期操作通过一个门面统一管理,屏蔽底层复杂性:

AgentTaskLifecycleService (Facade):

  ┌─────────────────────────────────────────────────────┐
  │  submitTask(request)                                │
  │    → 创建任务 → 启动心跳 → 执行Agent → 完成/失败 │
  │                                                     │
  │  cancelTask(taskId)                                 │
  │    → 发送中断信号 → 等待Agent优雅停止 → 更新状态  │
  │                                                     │
  │  retryTask(taskId)                                  │
  │    → 加载检查点 → 从断点恢复 → 继续执行           │
  │                                                     │
  │  getTaskStatus(taskId)                              │
  │    → 查询实时状态 + 进度百分比 + 最后心跳时间     │
  └─────────────────────────────────────────────────────┘

  任务状态机:
  ┌────────┐   submit   ┌─────────┐  agent执行  ┌──────────┐
  │CREATED │──────────→│ RUNNING │───────────→│COMPLETED │
  └────────┘            └────┬────┘            └──────────┘
                             │
                    ┌────────┼────────┐
                    ▼        ▼        ▼
             ┌──────────┐┌───────┐┌────────┐
             │CANCELLED ││FAILED ││RETRYING│
             └──────────┘└───┬───┘└───┬────┘
                             │        │
                             └───→────┘
                           重试成功→RUNNING

四、心跳机制

核心设计:使用独立线程池 + REQUIRES_NEW 事务保证心跳不受业务事务影响。

@Component
public class TaskHeartbeatService {

    private final ScheduledExecutorService heartbeatExecutor =
        Executors.newScheduledThreadPool(4, r -> {
            Thread t = new Thread(r, "heartbeat-worker");
            t.setDaemon(true);
            return t;
        });

    private final ConcurrentMap<String, ScheduledFuture<?>> activeHeartbeats
        = new ConcurrentHashMap<>();

    /**
     * 开始心跳 - 每15秒向Redis写入时间戳
     */
    public void startHeartbeat(String taskId) {
        ScheduledFuture<?> future = heartbeatExecutor.scheduleAtFixedRate(
            () -> sendHeartbeat(taskId),
            0, 15, TimeUnit.SECONDS  // 初始0延迟,每15秒一次
        );
        activeHeartbeats.put(taskId, future);
    }

    /**
     * REQUIRES_NEW事务 - 心跳写入独立于业务事务
     * 即使业务事务回滚,心跳依然能写入
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void sendHeartbeat(String taskId) {
        String key = "task:" + taskId + ":heartbeat";
        redisTemplate.opsForValue().set(key,
            String.valueOf(System.currentTimeMillis()),
            90, TimeUnit.SECONDS  // TTL=90s, 3倍心跳间隔
        );
    }

    public void stopHeartbeat(String taskId) {
        ScheduledFuture<?> future = activeHeartbeats.remove(taskId);
        if (future != null) future.cancel(false);
    }
}
心跳时序:

  时间轴 ────────────────────────────────────────────→
  0s     10s     20s     30s     40s     50s     60s
  │       │       │       │       │       │       │
  ♥       ♥       ♥       ♥       ♥       ♥       ♥  ← 正常心跳
  │       │       │       │
  ♥       ♥       ♥       ✗  ← 30s后不再更新
                          │
                    超过 heartbeatTimeout:
                    ⚠️ 扫描为疑似僵尸
                    可重试 → RETRY
                    不可重试 → TIMEOUT/FAILED

  关键参数:
  · 默认心跳间隔: 10秒
  · 默认心跳超时: 30秒
  · 存储位置: AgentTaskRecord.lastHeartbeat (数据库)
  · 扫描间隔: 默认30秒

五、僵尸任务检测与回收

僵尸检测流程 (每30秒扫描一次):

  ┌────────────────────────────────────────────────────────┐
  │ ZombieTaskDetector.scan()                              │
  │                                                        │
  │ 1. 查询所有状态=RUNNING的任务                          │
  │ 2. 检查每个任务的最后心跳时间                          │
  │ 3. 分类处理:                                           │
  └────────────────────────┬───────────────────────────────┘
                           │
            ┌──────────────┼──────────────┐
            ▼              ▼              ▼
    lastHB < 60s     60s ≤ lastHB < 90s   lastHB ≥ 90s
    ┌──────────┐     ┌──────────────┐     ┌─────────────┐
    │ 正常     │     │ 疑似僵尸     │     │ 确认僵尸    │
    │ 继续监控 │     │ · 记录告警   │     │ · 回收处理  │
    └──────────┘     │ · 尝试ping  │     └──────┬──────┘
                     └──────────────┘            │
                                                 ▼
                                    ┌────── retries < maxRetries? ──────┐
                                    │ YES                    NO          │
                                    ▼                        ▼          │
                           ┌─────────────┐          ┌─────────────┐    │
                           │ 重新调度    │          │ 标记FAILED  │    │
                           │ status→     │          │ 发送告警    │    │
                           │ RETRYING    │          │ 人工介入    │    │
                           └─────────────┘          └─────────────┘    │
                                                                        │
                           └────────────────────────────────────────────┘

六、指数退避 + 随机抖动

重试间隔采用指数退避策略,避免"重试风暴"压垮系统:

public class ExponentialBackoffCalculator {

    private static final Duration BASE_DELAY = Duration.ofMinutes(1);  // 基础1分钟
    private static final Duration MAX_DELAY = Duration.ofHours(1);     // 上限1小时
    private static final double JITTER_FACTOR = 0.2;                   // ±20%抖动

    /**
     * 计算第N次重试的等待时间
     * 公式: min(BASE * 2^(attempt-1), MAX) * (1 ± JITTER)
     *
     * attempt=1: 1min  * (0.8~1.2) = 48s ~ 72s
     * attempt=2: 2min  * (0.8~1.2) = 96s ~ 144s
     * attempt=3: 4min  * (0.8~1.2) = 192s ~ 288s
     * attempt=4: 8min  * (0.8~1.2) = 384s ~ 576s
     * attempt=5: 16min * (0.8~1.2) = 768s ~ 1152s
     * attempt=6: 32min * (0.8~1.2) = 1536s ~ 2304s
     * attempt=7: 60min (cap)       = 2880s ~ 4320s
     */
    public Duration calculateDelay(int attempt) {
        long baseMs = BASE_DELAY.toMillis();
        long exponentialMs = baseMs * (1L << (attempt - 1));
        long cappedMs = Math.min(exponentialMs, MAX_DELAY.toMillis());

        // 添加±20%随机抖动,防止多任务同时重试
        double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1)
                        * JITTER_FACTOR;
        return Duration.ofMillis((long)(cappedMs * jitter));
    }
}
指数退避可视化:

  等待时间
  60min ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬─── CAP上限
  │                                                   │
  32min ─                                      ╱──────╱
  │                                    ╱──────╱
  16min ─                      ╱──────╱
  │                    ╱──────╱
  8min  ─          ╱──╱
  4min  ─      ╱──╱    ←── ±20% jitter范围
  2min  ─  ╱──╱
  1min  ─╱─╱
  └──────┬──────┬──────┬──────┬──────┬──────┬──────→ 重试次数
         1      2      3      4      5      6      7

  为什么需要抖动?
  ┌────────────────────────────────────────────────┐
  │ 无抖动: 100个任务在同一时刻重试 → 瞬间高峰    │
  │ 有抖动: 100个任务分散在±20%时间窗内重试       │
  │         → 负载均匀分布,避免"惊群效应"        │
  └────────────────────────────────────────────────┘

七、RocketMQ 重试调度

@Component
public class TaskRetryProducer {

    private final RocketMQTemplate rocketMQ;
    private final ExponentialBackoffCalculator backoff;

    /**
     * 发送延迟重试消息到RocketMQ
     */
    public void scheduleRetry(String taskId, int attempt) {
        Duration delay = backoff.calculateDelay(attempt);

        TaskRetryMessage message = new TaskRetryMessage(taskId, attempt, Instant.now());

        rocketMQ.syncSendDelayTimeMills(
            "TASK_RETRY_TOPIC",
            MessageBuilder.withPayload(message).build(),
            delay.toMillis()
        );

        log.info("任务[{}]第{}次重试已调度,将在{}后执行",
            taskId, attempt, formatDuration(delay));
    }
}

@Component
@RocketMQMessageListener(topic = "TASK_RETRY_TOPIC", consumerGroup = "task-retry-group")
public class TaskRetryConsumer implements RocketMQListener<TaskRetryMessage> {

    @Override
    public void onMessage(TaskRetryMessage msg) {
        log.info("执行任务[{}]第{}次重试", msg.taskId(), msg.attempt());
        agentTaskLifecycleService.retryTask(msg.taskId());
    }
}

八、Graph Checkpoint (AgentTaskCheckpointService)

长时间运行的 Graph Agent 可以在执行过程中保存检查点,用于 HITL 恢复、重试恢复和问题排查。当前代码保存的是任务记录中的 Graph checkpoint 和独立 checkpoint 表,不是“所有任务自动从 Redis 断点续跑”。

Graph Checkpoint 恢复流程:

  正常执行:
  ┌────┐   ┌────┐   ┌────┐   ┌────┐   ┌────┐
  │步骤1│→ │步骤2│→ │步骤3│→ │步骤4│→ │步骤5│→ 完成
  └──┬─┘   └──┬─┘   └──┬─┘   └────┘   └────┘
     │        │        │
     ▼ CP1    ▼ CP2    ▼ CP3    ← 每步保存检查点(Checkpoint)

  异常中断:
  ┌────┐   ┌────┐   ┌────┐
  │步骤1│→ │步骤2│→ │步骤3│→ ✗ 崩溃!
  └────┘   └────┘   └──┬─┘
                        │
                        ▼ CP3已保存

  断点恢复:
                        ┌────┐   ┌────┐
  从CP3恢复 ──────────→│步骤4│→ │步骤5│→ 完成 ✅
                        └────┘   └────┘
  可从最近 checkpoint 恢复 Graph 状态;
  是否自动继续执行取决于后续请求/调度链路。
@Service
public class AgentTaskCheckpointService {

    private final AgentTaskCheckpointRepository checkpointRepository;

    /**
     * 追加检查点 - Graph 执行过程中调用
     */
    public void appendCheckpoint(String taskId,
                                 String nodeId,
                                 String stateSnapshot,
                                 Integer expectedVersion,
                                 String contextHash,
                                 LocalDateTime expiresAt) {
        AgentTaskCheckpoint checkpoint = new AgentTaskCheckpoint();
        checkpoint.setTaskId(taskId);
        checkpoint.setNodeId(nodeId);
        checkpoint.setStateSnapshot(stateSnapshot != null ? stateSnapshot : "{}");
        checkpoint.setContextHash(contextHash);
        checkpoint.setExpiresAt(expiresAt);
        checkpointRepository.save(checkpoint);
    }

    /**
     * 查询最新检查点
     */
    public Optional<AgentTaskCheckpoint> findLatest(String taskId) {
        return checkpointRepository.findTopByTaskIdOrderBySequenceDesc(taskId);
    }
}

九、完整生命周期时序

任务完整生命周期:

  t=0s    用户提交任务
  │       ├── AgentTaskOrchestrator.execute()
  │       ├── AgentTaskLifecycleService.onTaskCreated()
  │       ├── 创建 AgentTaskRecord (status=PENDING)
  │       └── onTaskStarted() → RUNNING
  │
  t=0s    启动心跳
  │       └── AgentTaskHeartbeatManager.startHeartbeat(taskId)
  │           定期更新数据库 lastHeartbeat
  │
  t=Ns    Agent执行中...
  │       ├── Graph 模式保存 Checkpoint
  │       ├── 通过SSE推送进度
  │       └── 心跳持续发送 ♥♥♥
  │
  ════════ 正常完成路径 ════════
  t=Xs    Agent执行完成
  │       ├── status → COMPLETED
  │       ├── 停止心跳
  │       └── 清理Checkpoint
  │
  ════════ 异常路径 ════════
  t=Ys    Agent异常/节点宕机
  │       ├── 心跳停止
  │       ├── 定时扫描 RUNNING 且 lastHeartbeat 超时的任务
  │       ├── 检查retryCount < maxRetries?
  │       │   ├── YES: status → RETRY
  │       │   │       计算 nextRetryAt
  │       │   │       可发布 RocketMQ retry schedule 事件
  │       │   │       到期后标记回 PENDING
  │       │   └── NO:  status → FAILED
  │       └── 更新TaskRecord

十、面试话术

面试话术:"Agent 任务生命周期的核心是把运行态落到数据库:创建、运行、完成、失败、超时、等待审批和重试都有明确状态。运行中任务由 AgentTaskHeartbeatManager 定期更新 lastHeartbeat,后台扫描 RUNNING 且心跳过期的记录,把可重试任务标记为 RETRY,不可重试任务转为失败/超时。这样进程崩溃后不会留下永远 RUNNING 的脏状态。"
面试话术:"重试策略采用指数退避加随机抖动:1分钟、2分钟、4分钟,最大 1 小时封顶,并设置 nextRetryAt。RocketMQ retry schedule 事件用于跨节点感知和调度,到期后任务会回到 PENDING;当前不能夸成所有任务自动重跑。Graph 模式会保存 checkpoint,HITL 或恢复链路可以读取最近状态继续处理,这是长任务可恢复设计的基础。"