11 - Agent任务生命周期

从创建到完成的8种状态、心跳检测、僵尸回收与启动恢复

一、任务状态机

              AgentTask 状态机
              ═════════════════

                    ┌──────────┐
         创建任务 → │ PENDING  │ 等待线程池调度
                    └────┬─────┘
                         │ 线程池开始执行
                         ▼
                    ┌──────────┐
                    │ RUNNING  │ 正在执行
                    └──┬──┬──┬─┘
                       │  │  │
           ┌───────────┘  │  └───────────┐
           │              │              │
           ▼              ▼              ▼
    ┌──────────┐  ┌──────────────┐  ┌──────────┐
    │COMPLETED │  │WAITING_APPROVAL│  │ FAILED  │
    │(成功完成)│  │(等待人工审批) │  │(执行失败)│
    └──────────┘  └──────┬───────┘  └────┬─────┘
                        │               │
                   用户审批结果      可重试?
                   ┌───┴───┐           │
                   ▼       ▼           ▼
              APPROVED  REJECTED  ┌──────────┐
                   │       │      │  RETRY   │
                   │       │      │(重试中)  │
                   ▼       │      └────┬─────┘
              回到RUNNING   │           │
                            │           └──→ 回到RUNNING
                            ▼
                       ┌──────────┐
                       │CANCELLED │
                       └──────────┘

  超时 → TIMEOUT
  熔断 → CIRCUIT_OPEN

二、生命周期管理架构

AgentTaskLifecycleService 架构:

  ┌────────────────────────────────────────────────────────┐
  │                  AgentTaskOrchestrator                  │
  │                   (任务调度中心)                        │
  └────────────────────────┬───────────────────────────────┘
                           │
                           ▼
  ┌────────────────────────────────────────────────────────┐
  │             AgentTaskLifecycleService                   │
  │               (生命周期管理门面)                        │
  │                                                        │
  │  · createTask()     创建并持久化任务                   │
  │  · updateStatus()   状态转换 (原子CAS)                 │
  │  · saveCheckpoint() 保存Graph执行快照                  │
  │  · restoreCheckpoint() 恢复中断的执行                  │
  └────────┬───────────────────────────┬───────────────────┘
           │                           │
           ▼                           ▼
  ┌──────────────────┐    ┌──────────────────────┐
  │ AgentTaskRecord  │    │ HeartbeatManager     │
  │ (数据库持久化)    │    │ (心跳检测)            │
  │                  │    │                      │
  │ · taskId         │    │ · 每10s发送心跳      │
  │ · agentId        │    │ · 30s无心跳=僵尸     │
  │ · userId         │    │ · 僵尸→标记TIMEOUT   │
  │ · status         │    └──────────────────────┘
  │ · createdAt      │
  │ · completedAt    │    ┌──────────────────────┐
  │ · retryCount     │    │ TaskScheduler        │
  │ · errorMessage   │    │ (定时任务)            │
  │ · checkpoint     │    │                      │
  └──────────────────┘    │ · 启动恢复: 恢复断电前│
                          │   的RUNNING任务       │
                          │ · 重试扫描: 每15s     │
                          │ · 清理: 每天3AM       │
                          └──────────────────────┘

三、心跳与僵尸检测

心跳检测机制:

  正常流程:
  ┌──────────────────────────────────────────┐
  │ Agent执行线程                             │
  │                                          │
  │ [10s] 心跳 → [10s] 心跳 → [10s] 心跳    │
  │    ↑           ↑           ↑             │
  │    └───────────┴───────────┘             │
  │        后台线程每10s更新 lastHeartbeat    │
  └──────────────────────────────────────────┘

  异常流程 (僵尸任务):
  ┌──────────────────────────────────────────┐
  │ Agent执行线程                             │
  │                                          │
  │ [10s] 心跳 → [10s] 心跳 → ✗ 无心跳      │
  │                              │            │
  │                         30s无心跳         │
  │                              │            │
  │                         检测到僵尸        │
  │                              │            │
  │                    status → TIMEOUT       │
  │                    释放信号量              │
  │                    记录错误日志            │
  └──────────────────────────────────────────┘

四、启动恢复机制

容灾设计: 应用重启时,数据库中可能还残留RUNNING状态的任务(因为崩溃前没有正常标记为COMPLETED/FAILED)。TaskScheduler在启动时自动扫描所有RUNNING任务,将它们标记为FAILED并记录恢复日志。这确保了任务状态的一致性。

五、延迟响应保存

处理"SSE超时但LLM仍在运行"的边界场景:

延迟响应保存:

  场景: SSE超时(120s),但LLM在第130秒才完成

  ┌──────────────────────────────────────────────┐
  │ T=0s    客户端发起请求                        │
  │ T=120s  SSE活动超时 → 客户端断开              │
  │ T=130s  LLM仍在运行,最终生成完整响应         │
  │         ↓                                    │
  │         DelayedResponseEvent 发布             │
  │         → DelayedResponseSaver 接收           │
  │         → 保存完整响应到数据库               │
  │         → 用户下次打开对话可以看到完整回答    │
  └──────────────────────────────────────────────┘

六、面试高频问题

Q: 为什么状态转换用AtomicReference的CAS而不是加锁?
A: CAS(Compare-And-Swap)是无锁的原子操作,比synchronized更轻量。任务状态转换有严格的顺序要求(如PENDING→RUNNING→COMPLETED,不能跳跃),CAS天然保证只有符合预期的状态转换才能成功。例如 status.compareAndSet(RUNNING, COMPLETED) 如果当前状态不是RUNNING就会失败,避免了并发下的状态混乱。
Q: 僵尸任务是怎么产生的?怎么处理?
A: 僵尸任务通常由以下原因产生:1) JVM崩溃(OOM、Kill信号)导致执行线程突然终止;2) 线程死锁导致心跳无法更新;3) Redis连接断开导致状态无法同步。处理方式:后台线程每30秒扫描一次,如果lastHeartbeat超过30秒未更新,将任务标记为TIMEOUT,释放占用的信号量资源,并记录错误日志供运维排查。