首页 / AI Agent开发 / Claude Code 工作流编排实战:使用 Tasks API 构建多阶段 Agent 任务规划系统 9 次阅读
Claude Code 工作流编排实战:使用 Tasks API 构建多阶段 Agent 任务规划系统
AI Agent 开发

Claude Code 工作流编排实战:使用 Tasks API 构建多阶段 Agent 任务规划系统

掌握 Tasks API 核心能力,实现复杂任务的自动拆解、状态追踪与错误恢复,打造生产级 Agent 工作流

2026 年 3 月 21 日 · 预计阅读 12 分钟

为什么需要任务规划系统?

想象这个场景:你需要开发一个完整的功能模块 —— 从数据库迁移到 API 实现,再到测试编写。传统做法是逐个任务交给 Claude,但很快你会遇到三个问题:

  • 上下文丢失:每个会话独立,AI 不记得之前的决策
  • 依赖混乱:任务 B 依赖任务 A 的输出,但无法自动等待
  • 错误无法恢复:中间步骤失败,整个流程需要手动重启

这正是 2026 年 Anthropic 推出的 Tasks API 要解决的核心问题。它将复杂工作流拆解为可追踪、可重试、可编排的子任务,让 Agent 从"对话助手"进化为"自动化工长"。

传统单会话模式 vs Tasks API 多阶段任务编排对比图

核心概念:Task、Workflow 与 State

在深入代码之前,先理解三个关键抽象:

Task(任务)
最小执行单元,有明确的输入、输出和状态(pending/running/completed/failed)
Workflow(工作流)
任务的有向无环图(DAG),定义任务间的依赖关系和执行顺序
State(状态)
跨会话持久化的上下文,包含决策历史、中间结果和待办事项
Task 状态流转图:pending → running → completed/failed

环境准备

开始之前,确保你的开发环境满足以下要求:

Claude Code v2.1.80+
Tasks API 需要最新版本,运行 claude --version 验证
Node.js 18+
或使用 Bun 1.0+ 获得更佳性能
Anthropic API Key
Pro/Teams/Enterprise 账户,在 ~/.claude/settings.json 配置

安装项目依赖:

# 创建项目目录
mkdir -p agent-workflow && cd agent-workflow

# 初始化项目并安装 SDK
npm init -y
npm install @anthropic-ai/sdk@latest

# 或使用 Bun(推荐)
bun init -y
bun add @anthropic-ai/sdk

实战步骤

1

配置 CLAUDE.md 持久化上下文

在项目根目录创建 CLAUDE.md,这是任务规划系统的"记忆核心":

# 项目架构

## 核心模块
- `src/tasks/` - 任务定义与执行逻辑
- `src/workflow/` - 工作流编排引擎
- `src/state/` - 状态管理与持久化

## 开发规范
- 所有任务必须定义输入/输出 schema
- 任务失败时自动重试 2 次,间隔指数退避
- 使用 TypeScript 严格模式

## 常用命令
- `bun run task:run` - 执行单个任务
- `bun run workflow:build` - 构建工作流
- `bun run state:export` - 导出当前状态

这个文件会在每次 Claude 会话开始时自动加载,确保 AI 始终理解项目结构和规范。

CLAUDE.md 文件结构示意图
2

定义第一个 Task

创建 src/tasks/fetch-data.ts

import { Task, TaskStatus } from '@anthropic-ai/sdk/tasks';

export interface FetchDataInput {
  source: string;
  format: 'json' | 'csv';
}

export interface FetchDataOutput {
  records: unknown[];
  timestamp: string;
}

export const fetchDataTask: Task<FetchDataInput, FetchDataOutput> = {
  id: 'fetch-data',
  name: '从 API 获取数据',
  description: '从指定数据源拉取数据并格式化为统一结构',

  async execute(input, context) {
    context.logger.info(`开始从 ${input.source} 获取数据`);

    try {
      const response = await fetch(input.source);
      const records = input.format === 'json'
        ? await response.json()
        : parseCSV(await response.text());

      return {
        records,
        timestamp: new Date().toISOString()
      };
    } catch (error) {
      // 自动记录错误上下文,供重试时使用
      throw context.retryableError(error, { maxRetries: 2 });
    }
  }
};

关键点:retryableError 会标记错误可重试,框架自动处理指数退避。

3

创建依赖任务

数据处理任务依赖数据获取任务的输出:

import { Task } from '@anthropic-ai/sdk/tasks';
import { fetchDataTask, FetchDataOutput } from './fetch-data';

export const processDataTask: Task<FetchDataOutput, unknown[]> = {
  id: 'process-data',
  name: '处理和清洗数据',
  description: '移除无效记录、标准化字段、添加衍生指标',

  // 声明依赖:本任务会在 fetchDataTask 完成后自动触发
  dependsOn: [fetchDataTask],

  // 从上游任务接收输出作为输入
  async execute(input, context) {
    const { records } = input;

    // 数据清洗逻辑
    const cleaned = records
      .filter(r => r.id !== null)           // 移除空 ID
      .map(r => ({                          // 标准化字段
        ...r,
        createdAt: new Date(r.created_at),
        score: calculateScore(r.metrics)
      }));

    context.logger.info(`处理完成:${cleaned.length} 条记录`);
    return cleaned;
  }
};
任务依赖关系 DAG 可视化
4

构建 Workflow 编排引擎

将任务组装成完整工作流:

import { Workflow, WorkflowExecutor } from '@anthropic-ai/sdk/workflow';
import { fetchDataTask } from './tasks/fetch-data';
import { processDataTask } from './tasks/process-data';
import { saveDataTask } from './tasks/save-data';

// 定义工作流
const dataPipeline = new Workflow({
  id: 'data-pipeline-v1',
  name: '数据处理流水线',
  tasks: [fetchDataTask, processDataTask, saveDataTask],

  // 配置执行策略
  options: {
    parallel: false,        // 按依赖顺序执行
    stopOnFailure: false,   // 单个失败不阻塞后续
    timeoutMs: 30 * 60 * 1000  // 30 分钟超时
  }
});

// 执行工作流
async function runPipeline() {
  const executor = new WorkflowExecutor(dataPipeline);

  // 订阅进度事件
  executor.on('task:start', (task) => {
    console.log(`▶️ 开始:${task.name}`);
  });

  executor.on('task:complete', (task, output) => {
    console.log(`✅ 完成:${task.name}`);
  });

  executor.on('task:fail', (task, error) => {
    console.error(`❌ 失败:${task.name} - ${error.message}`);
  });

  // 运行并等待完成
  const result = await executor.run({
    source: 'https://api.example.com/data',
    format: 'json'
  });

  console.log(`🎉 工作流完成,处理 ${result.stats.totalRecords} 条记录`);
  return result;
}
5

实现状态持久化

使用 StateManager 实现跨会话记忆:

import { StateManager, MemoryStore } from '@anthropic-ai/sdk/state';

const state = new StateManager({
  store: new MemoryStore({
    persistPath: './.workflow-state.json'  // 本地持久化
  }),

  // 配置快照策略
  snapshot: {
    everyNCommits: 5,     // 每 5 次提交打快照
    maxHistory: 10        // 保留 10 个历史版本
  }
});

// 在工作流关键节点保存状态
async function runWithCheckpoint() {
  const sessionId = await state.createSession('data-pipeline-v1');

  try {
    // 步骤 1 完成后打快照
    const step1Result = await fetchDataTask.execute(input);
    await state.checkpoint(sessionId, 'fetch-complete', { step1Result });

    // 步骤 2 完成后打快照
    const step2Result = await processDataTask.execute(step1Result);
    await state.checkpoint(sessionId, 'process-complete', { step2Result });

  } catch (error) {
    // 失败时从最近快照恢复
    const lastCheckpoint = await state.getLastCheckpoint(sessionId);
    console.log(`从快照恢复:${lastCheckpoint.label}`);
    throw error;
  }
}
状态快照与恢复机制示意图
6

添加错误处理与重试

生产环境需要健壮的错误恢复:

import { RetryPolicy, CircuitBreaker } from '@anthropic-ai/sdk/reliability';

const retryPolicy = new RetryPolicy({
  maxAttempts: 3,
  backoff: {
    type: 'exponential',
    initialMs: 1000,
    maxMs: 30000
  },
  retryableErrors: ['NETWORK_TIMEOUT', 'RATE_LIMITED']
});

const circuitBreaker = new CircuitBreaker({
  failureThreshold: 5,    // 5 次失败后打开
  resetTimeoutMs: 60000,  // 1 分钟后半开
  halfOpenRequests: 1     // 半开时允许 1 次探测
});

// 包装任务执行
async function executeWithReliability(task, input) {
  return circuitBreaker.execute(() =>
    retryPolicy.execute(() => task.execute(input))
  );
}
注意:熔断器打开时,任务会快速失败而不实际执行,避免雪崩效应。
7

使用 Channels 实现实时通知

2026 年 3 月新发布的 Channels 功能支持将任务进度推送到外部:

import { ChannelClient } from '@anthropic-ai/sdk/channels';

const channel = new ChannelClient({
  platform: 'telegram',
  token: process.env.TELEGRAM_BOT_TOKEN,
  chatId: process.env.ADMIN_CHAT_ID
});

// 在工作流执行器中集成
executor.on('task:complete', async (task, output) => {
  await channel.send({
    type: 'notification',
    title: '任务完成',
    body: `${task.name} 执行成功`,
    data: { taskId: task.id, duration: task.durationMs }
  });
});

// 支持双向交互:用户可回复触发自定义操作
channel.onCommand('/retry', async (ctx) => {
  const { taskId } = ctx.payload;
  await executor.retry(taskId);
  await channel.send({ type: 'text', text: `已重试任务 ${taskId}` });
});
Channels 双向通知架构图
8

部署与监控

将工作流部署为独立服务:

// src/server.ts
import { Hono } from 'hono';
import { dataPipeline } from './workflow';

const app = new Hono();

// 触发工作流
app.post('/api/run', async (c) => {
  const input = await c.req.json();
  const result = await dataPipeline.execute(input);
  return c.json(result);
});

// 查询状态
app.get('/api/status/:workflowId', async (c) => {
  const status = await state.getWorkflowStatus(c.req.param('workflowId'));
  return c.json(status);
});

// 健康检查
app.get('/health', (c) => {
  return c.json({ status: 'ok', timestamp: Date.now() });
});

export default {
  port: process.env.PORT || 8787,
  fetch: app.fetch
};

部署到 Cloudflare Workers:

# 部署
npx wrangler deploy

# 查看实时日志
npx wrangler tail

常见问题 FAQ

Q: 如何处理任务间的循环依赖?
Tasks API 要求工作流是有向无环图(DAG)。如果检测到循环依赖,Workflow 构造函数会抛出 CyclicDependencyError。设计时确保依赖关系单向流动。
Q: 任务超时后能否自动恢复?
可以。配置 options.timeoutMs 后,超时任务会被标记为 TIMEOUT 状态。结合 RetryPolicy 可自动重试,或手动调用 executor.retry(taskId)
Q: 如何在本地调试工作流?
使用 WorkflowDebugger
import { WorkflowDebugger } from '@anthropic-ai/sdk/debug';
const debugger = new WorkflowDebugger(dataPipeline);
debugger.visualize();  // 在浏览器打开 DAG 可视化
Q: Channels 支持哪些平台?
官方支持 Telegram、Discord、Slack。社区插件支持微信(通过企业微信)、钉钉、飞书。

总结

  • 使用 Task 抽象将复杂问题拆解为可管理的子单元
  • 通过 dependsOn 声明依赖,框架自动处理执行顺序
  • StateManager 实现跨会话记忆,支持断点续跑
  • RetryPolicy + CircuitBreaker 构建容错系统
  • Channels 实现实时通知与远程控制

现在你已经掌握了构建生产级 Agent 工作流的核心技能。下一步,尝试将现有脚本改造成 Task 编排的工作流,体验自动化带来的效率提升。

选择栏目
今日简报 播客电台 实战教程 AI挣钱计划 关于我
栏目
全球AI日报国内AI日报全球金融日报国内金融日报全球大新闻日报国内大新闻日报Claude Code 玩法日报OpenClaw 动态日报GitHub 热门项目日报AI工具实战AI应用开发编程实战工作流自动化AI原理图解AI Agent开发AI变现案例库AI工具创收AI内容变现AI接单提效变现前沿研究
我的收藏