32 - Agent健康监控与任务生命周期

HealthCheckable接口、心跳机制、僵尸检测、断点续传、指数退避重试

一、AgentHealthMonitor 分布式健康检查

Agent任务可能运行数分钟甚至数小时,必须有机制检测任务是否还在正常执行,而不是已经"假死"。

健康监控整体架构:

  ┌───────────────────────────────────────────────────────────┐
  │                  AgentHealthMonitor                        │
  │  (ScheduledExecutorService, 每30秒执行一次)               │
  └─────────────────────────┬─────────────────────────────────┘
                            │
              ┌─────────────┼─────────────────┐
              ▼             ▼                 ▼
  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐
  │  Node A      │  │  Node B      │  │  Node C      │
  │  Tasks: 3    │  │  Tasks: 5    │  │  Tasks: 2    │
  │  Heartbeat ♥ │  │  Heartbeat ♥ │  │  Heartbeat ♥ │
  └──────────────┘  └──────────────┘  └──────────────┘
         │                 │                 │
         └─────────────────┼─────────────────┘
                           ▼
  ┌───────────────────────────────────────────────────────────┐
  │  Redis (分布式状态)                                        │
  │  health:{nodeId} → { lastHeartbeat, taskCount, status }   │
  │  task:{taskId}:heartbeat → timestamp (TTL=90s)            │
  └───────────────────────────────────────────────────────────┘

二、HealthCheckable 接口设计

Agent可以选择性实现此接口,提供自定义健康检查逻辑:

public interface HealthCheckable {

    /**
     * 健康检查 - 返回当前Agent的健康状态
     * @return HealthStatus 包含状态码和诊断信息
     */
    HealthStatus checkHealth();

    /**
     * 优雅停止 - 收到停止信号时的清理逻辑
     * @param timeout 最大等待时间
     */
    void gracefulShutdown(Duration timeout);

    /**
     * 是否支持断点续传
     */
    default boolean supportsCheckpoint() { return false; }
}

public record HealthStatus(
    Status status,       // UP / DOWN / DEGRADED
    String message,      // 人类可读的诊断信息
    Map<String, Object> details  // 扩展指标
) {
    public enum Status { UP, DOWN, DEGRADED }
}
HealthCheckable 使用场景:

  Agent实现了HealthCheckable?
  ┌──────────────────────────────────────────────────┐
  │ YES → 调用 agent.checkHealth()                   │
  │   · UP       → 正常,继续监控                    │
  │   · DEGRADED → 告警,标记为亚健康               │
  │   · DOWN     → 触发任务转移/重试                │
  │                                                  │
  │ NO → 默认只通过心跳超时判断存活                 │
  └──────────────────────────────────────────────────┘

三、AgentTaskLifecycleService 统一门面

所有任务生命周期操作通过一个门面统一管理,屏蔽底层复杂性:

AgentTaskLifecycleService (Facade):

  ┌─────────────────────────────────────────────────────┐
  │  submitTask(request)                                │
  │    → 创建任务 → 启动心跳 → 执行Agent → 完成/失败 │
  │                                                     │
  │  cancelTask(taskId)                                 │
  │    → 发送中断信号 → 等待Agent优雅停止 → 更新状态  │
  │                                                     │
  │  retryTask(taskId)                                  │
  │    → 加载检查点 → 从断点恢复 → 继续执行           │
  │                                                     │
  │  getTaskStatus(taskId)                              │
  │    → 查询实时状态 + 进度百分比 + 最后心跳时间     │
  └─────────────────────────────────────────────────────┘

  任务状态机:
  ┌────────┐   submit   ┌─────────┐  agent执行  ┌──────────┐
  │CREATED │──────────→│ RUNNING │───────────→│COMPLETED │
  └────────┘            └────┬────┘            └──────────┘
                             │
                    ┌────────┼────────┐
                    ▼        ▼        ▼
             ┌──────────┐┌───────┐┌────────┐
             │CANCELLED ││FAILED ││RETRYING│
             └──────────┘└───┬───┘└───┬────┘
                             │        │
                             └───→────┘
                           重试成功→RUNNING

四、心跳机制

核心设计:使用独立线程池 + REQUIRES_NEW 事务保证心跳不受业务事务影响。

@Component
public class TaskHeartbeatService {

    private final ScheduledExecutorService heartbeatExecutor =
        Executors.newScheduledThreadPool(4, r -> {
            Thread t = new Thread(r, "heartbeat-worker");
            t.setDaemon(true);
            return t;
        });

    private final ConcurrentMap<String, ScheduledFuture<?>> activeHeartbeats
        = new ConcurrentHashMap<>();

    /**
     * 开始心跳 - 每15秒向Redis写入时间戳
     */
    public void startHeartbeat(String taskId) {
        ScheduledFuture<?> future = heartbeatExecutor.scheduleAtFixedRate(
            () -> sendHeartbeat(taskId),
            0, 15, TimeUnit.SECONDS  // 初始0延迟,每15秒一次
        );
        activeHeartbeats.put(taskId, future);
    }

    /**
     * REQUIRES_NEW事务 - 心跳写入独立于业务事务
     * 即使业务事务回滚,心跳依然能写入
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void sendHeartbeat(String taskId) {
        String key = "task:" + taskId + ":heartbeat";
        redisTemplate.opsForValue().set(key,
            String.valueOf(System.currentTimeMillis()),
            90, TimeUnit.SECONDS  // TTL=90s, 3倍心跳间隔
        );
    }

    public void stopHeartbeat(String taskId) {
        ScheduledFuture<?> future = activeHeartbeats.remove(taskId);
        if (future != null) future.cancel(false);
    }
}
心跳时序:

  时间轴 ────────────────────────────────────────────→
  0s     15s     30s     45s     60s     75s     90s
  │       │       │       │       │       │       │
  ♥       ♥       ♥       ♥       ♥       ♥       ♥  ← 正常心跳
  │       │       │       │
  ♥       ♥       ♥       ✗  ← 45s心跳丢失
                          │
                    60s后(距最后心跳):
                    ⚠️ 标记为疑似僵尸
                    90s后(TTL过期):
                    ☠️ 确认僵尸,触发回收

  关键参数:
  · 心跳间隔: 15秒 (足够频繁又不浪费资源)
  · Redis TTL: 90秒 (3倍间隔,容忍2次心跳丢失)
  · 僵尸阈值: 60秒无心跳 → 疑似僵尸
  · 确认阈值: 90秒无心跳 → 确认僵尸

五、僵尸任务检测与回收

僵尸检测流程 (每30秒扫描一次):

  ┌────────────────────────────────────────────────────────┐
  │ ZombieTaskDetector.scan()                              │
  │                                                        │
  │ 1. 查询所有状态=RUNNING的任务                          │
  │ 2. 检查每个任务的最后心跳时间                          │
  │ 3. 分类处理:                                           │
  └────────────────────────┬───────────────────────────────┘
                           │
            ┌──────────────┼──────────────┐
            ▼              ▼              ▼
    lastHB < 60s     60s ≤ lastHB < 90s   lastHB ≥ 90s
    ┌──────────┐     ┌──────────────┐     ┌─────────────┐
    │ 正常     │     │ 疑似僵尸     │     │ 确认僵尸    │
    │ 继续监控 │     │ · 记录告警   │     │ · 回收处理  │
    └──────────┘     │ · 尝试ping  │     └──────┬──────┘
                     └──────────────┘            │
                                                 ▼
                                    ┌────── retries < maxRetries? ──────┐
                                    │ YES                    NO          │
                                    ▼                        ▼          │
                           ┌─────────────┐          ┌─────────────┐    │
                           │ 重新调度    │          │ 标记FAILED  │    │
                           │ status→     │          │ 发送告警    │    │
                           │ RETRYING    │          │ 人工介入    │    │
                           └─────────────┘          └─────────────┘    │
                                                                        │
                           └────────────────────────────────────────────┘

六、指数退避 + 随机抖动

重试间隔采用指数退避策略,避免"重试风暴"压垮系统:

public class ExponentialBackoffCalculator {

    private static final Duration BASE_DELAY = Duration.ofMinutes(1);  // 基础1分钟
    private static final Duration MAX_DELAY = Duration.ofHours(1);     // 上限1小时
    private static final double JITTER_FACTOR = 0.2;                   // ±20%抖动

    /**
     * 计算第N次重试的等待时间
     * 公式: min(BASE * 2^(attempt-1), MAX) * (1 ± JITTER)
     *
     * attempt=1: 1min  * (0.8~1.2) = 48s ~ 72s
     * attempt=2: 2min  * (0.8~1.2) = 96s ~ 144s
     * attempt=3: 4min  * (0.8~1.2) = 192s ~ 288s
     * attempt=4: 8min  * (0.8~1.2) = 384s ~ 576s
     * attempt=5: 16min * (0.8~1.2) = 768s ~ 1152s
     * attempt=6: 32min * (0.8~1.2) = 1536s ~ 2304s
     * attempt=7: 60min (cap)       = 2880s ~ 4320s
     */
    public Duration calculateDelay(int attempt) {
        long baseMs = BASE_DELAY.toMillis();
        long exponentialMs = baseMs * (1L << (attempt - 1));
        long cappedMs = Math.min(exponentialMs, MAX_DELAY.toMillis());

        // 添加±20%随机抖动,防止多任务同时重试
        double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1)
                        * JITTER_FACTOR;
        return Duration.ofMillis((long)(cappedMs * jitter));
    }
}
指数退避可视化:

  等待时间
  60min ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬─── CAP上限
  │                                                   │
  32min ─                                      ╱──────╱
  │                                    ╱──────╱
  16min ─                      ╱──────╱
  │                    ╱──────╱
  8min  ─          ╱──╱
  4min  ─      ╱──╱    ←── ±20% jitter范围
  2min  ─  ╱──╱
  1min  ─╱─╱
  └──────┬──────┬──────┬──────┬──────┬──────┬──────→ 重试次数
         1      2      3      4      5      6      7

  为什么需要抖动?
  ┌────────────────────────────────────────────────┐
  │ 无抖动: 100个任务在同一时刻重试 → 瞬间高峰    │
  │ 有抖动: 100个任务分散在±20%时间窗内重试       │
  │         → 负载均匀分布,避免"惊群效应"        │
  └────────────────────────────────────────────────┘

七、RocketMQ 重试调度

@Component
public class TaskRetryProducer {

    private final RocketMQTemplate rocketMQ;
    private final ExponentialBackoffCalculator backoff;

    /**
     * 发送延迟重试消息到RocketMQ
     */
    public void scheduleRetry(String taskId, int attempt) {
        Duration delay = backoff.calculateDelay(attempt);

        TaskRetryMessage message = new TaskRetryMessage(taskId, attempt, Instant.now());

        rocketMQ.syncSendDelayTimeMills(
            "TASK_RETRY_TOPIC",
            MessageBuilder.withPayload(message).build(),
            delay.toMillis()
        );

        log.info("任务[{}]第{}次重试已调度,将在{}后执行",
            taskId, attempt, formatDuration(delay));
    }
}

@Component
@RocketMQMessageListener(topic = "TASK_RETRY_TOPIC", consumerGroup = "task-retry-group")
public class TaskRetryConsumer implements RocketMQListener<TaskRetryMessage> {

    @Override
    public void onMessage(TaskRetryMessage msg) {
        log.info("执行任务[{}]第{}次重试", msg.taskId(), msg.attempt());
        agentTaskLifecycleService.retryTask(msg.taskId());
    }
}

八、断点续传 (AgentTaskCheckpointService)

长时间运行的Agent任务可以在执行过程中保存检查点,失败后从最近的检查点恢复而不是从头开始。

断点续传流程:

  正常执行:
  ┌────┐   ┌────┐   ┌────┐   ┌────┐   ┌────┐
  │步骤1│→ │步骤2│→ │步骤3│→ │步骤4│→ │步骤5│→ 完成
  └──┬─┘   └──┬─┘   └──┬─┘   └────┘   └────┘
     │        │        │
     ▼ CP1    ▼ CP2    ▼ CP3    ← 每步保存检查点(Checkpoint)

  异常中断:
  ┌────┐   ┌────┐   ┌────┐
  │步骤1│→ │步骤2│→ │步骤3│→ ✗ 崩溃!
  └────┘   └────┘   └──┬─┘
                        │
                        ▼ CP3已保存

  断点恢复:
                        ┌────┐   ┌────┐
  从CP3恢复 ──────────→│步骤4│→ │步骤5│→ 完成 ✅
                        └────┘   └────┘
  跳过了步骤1~3,节省大量时间和LLM调用成本!
@Service
public class AgentTaskCheckpointService {

    private final DistributedCheckpointStore checkpointStore;

    /**
     * 保存检查点 - Agent执行过程中定期调用
     */
    public void saveCheckpoint(String taskId, CheckpointData data) {
        checkpointStore.save(
            "checkpoint:" + taskId + ":" + data.stepId(),
            serialize(data),
            Duration.ofHours(24)  // 检查点保留24小时
        );
        // 更新任务进度
        checkpointStore.save(
            "checkpoint:" + taskId + ":latest",
            data.stepId(),
            Duration.ofHours(24)
        );
    }

    /**
     * 加载最新检查点 - 重试时调用
     */
    public Optional<CheckpointData> loadLatestCheckpoint(String taskId) {
        String latestStep = checkpointStore.get("checkpoint:" + taskId + ":latest");
        if (latestStep == null) return Optional.empty();
        String data = checkpointStore.get("checkpoint:" + taskId + ":" + latestStep);
        return Optional.ofNullable(data).map(this::deserialize);
    }

    /**
     * 检查点数据结构
     */
    public record CheckpointData(
        String stepId,                // 步骤标识
        int stepIndex,                // 第几步
        Map<String, Object> state,    // 中间状态
        List<Message> messages,       // 已积累的对话历史
        Instant savedAt               // 保存时间
    ) {}
}

九、完整生命周期时序

任务完整生命周期:

  t=0s    用户提交任务
  │       ├── AgentTaskLifecycleService.submitTask()
  │       ├── 创建TaskRecord (status=CREATED)
  │       ├── 分配到可用节点
  │       └── status → RUNNING
  │
  t=0s    启动心跳
  │       └── TaskHeartbeatService.startHeartbeat(taskId)
  │           每15秒写入Redis: task:{id}:heartbeat → timestamp
  │
  t=Ns    Agent执行中...
  │       ├── 定期保存Checkpoint
  │       ├── 通过SSE推送进度
  │       └── 心跳持续发送 ♥♥♥
  │
  ════════ 正常完成路径 ════════
  t=Xs    Agent执行完成
  │       ├── status → COMPLETED
  │       ├── 停止心跳
  │       └── 清理Checkpoint
  │
  ════════ 异常路径 ════════
  t=Ys    Agent异常/节点宕机
  │       ├── 心跳停止 (TTL倒计时)
  │       ├── 60s后: ZombieDetector标记疑似僵尸
  │       ├── 90s后: TTL过期,确认僵尸
  │       ├── 检查retryCount < maxRetries?
  │       │   ├── YES: status → RETRYING
  │       │   │       发送RocketMQ延迟消息
  │       │   │       等待指数退避时间后重试
  │       │   │       从Checkpoint恢复执行
  │       │   └── NO:  status → FAILED
  │       │            发送告警通知
  │       └── 更新TaskRecord

十、面试话术

面试话术:"我们的Agent任务可能运行几分钟到几小时,所以设计了完整的生命周期管理。核心是心跳+僵尸检测:每个运行中的任务每15秒向Redis写入心跳,TTL设为90秒(3倍间隔容忍偶尔丢包)。后台有ZombieDetector每30秒扫描,超过60秒没心跳标记疑似僵尸,90秒确认后触发回收。心跳写入用REQUIRES_NEW独立事务,保证即使业务事务长时间持有锁或回滚,心跳依然正常发送。"
面试话术:"重试策略采用指数退避加随机抖动:1分钟→2分钟→4分钟...最大1小时封顶,每次加±20%随机偏移防止惊群。重试通过RocketMQ延迟消息调度,不占用应用线程。同时支持断点续传——Agent执行中定期保存Checkpoint到Redis,重试时从最近检查点恢复而不是从头开始,对于需要多轮LLM调用的复杂任务,这能节省大量时间和费用。"