引言 为什么你的 AI Agent 无法投入生产?

想象这个场景:你花了一周构建了一个 AI 客服 Agent,它在测试环境运行完美——能准确理解用户意图、调用正确的工具、给出专业回答。但上线第一天就出了问题:

  • 服务器重启后,所有进行中的对话全部丢失
  • 用户要求"稍等一下,我去查下订单",Agent 无法等待
  • 某个工具调用超时,整个工作流崩溃,用户需要重新描述问题
  • 你想看看 Agent 为什么做出了错误决策,但没有任何执行日志
核心问题

传统 LLM 应用采用无状态线性链式架构——每个请求从头开始,无法保留中间状态,不支持暂停/恢复,缺少可观察性。这种架构可以做出漂亮的 Demo,但无法应对生产环境的复杂性。

本教程将带你掌握 LangGraph 1.0 的状态图编排能力,这是一个由 LangChain 团队开发的生产级 AI 工作流框架,已被 NVIDIA、IBM 等公司用于构建关键业务系统。学完本教程后,你将能够:

✓ 设计状态图

使用 TypedDict 定义状态契约,用节点和边构建清晰的执行流程

✓ 实现 Human-in-the-loop

在关键决策点暂停,等待人类审批后再继续执行

✓ 添加持久化

使用 PostgreSQL 检查点,崩溃后从断点恢复而不丢失进度

✓ 调试与观测

通过 LangGraph Studio 可视化执行轨迹,快速定位问题

01 环境准备与核心概念

首先创建项目并安装依赖:

# 创建项目目录
mkdir langgraph-tutorial && cd langgraph-tutorial

# 初始化 Python 项目
python -m venv .venv
source .venv/bin/activate  # Windows: .venv\Scripts\activate

# 安装依赖
pip install langgraph langchain-openai langchain-community
pip install psycopg2-binary  # PostgreSQL 驱动

创建 .env 文件配置 API 密钥:

OPENAI_API_KEY=sk-xxx
DATABASE_URL=postgresql://user:pass@localhost:5432/langgraph_demo
关于数据库

检查点持久化需要 PostgreSQL。如果只想快速体验,可以暂时跳过数据库配置,使用内存模式(重启后数据丢失)。

LangGraph 核心概念图:状态图的节点、边、状态流转
图 1-1:LangGraph 状态图核心概念——节点处理状态,边控制流转

02 定义状态架构 (State Schema)

状态是 LangGraph 的核心抽象。所有节点共享同一个状态对象,节点接收状态、返回更新。我们使用 Python 的 TypedDict 定义状态结构:

from typing import TypedDict, Annotated, List, Literal
import operator

class ResearchState(TypedDict):
    # 用户输入的研究主题
    query: str
    # 累积消息历史(使用 reducer 确保追加而非覆盖)
    messages: Annotated[List[str], operator.add]
    # 搜索到的来源 URL 列表
    sources: Annotated[List[str], operator.add]
    # LLM 生成的综合内容
    synthesis: str
    # 人类审批结果
    is_approved: bool
    # 错误信息(如果有)
    error: str
关键点:Reducer 函数

注意 Annotated[List[str], operator.add] 的使用。如果不指定 reducer,状态更新会覆盖原有值;使用 operator.add 可以确保列表被追加。这对于消息历史等累积型字段至关重要。

03 定义节点 (Nodes) —— 纯函数处理逻辑

节点是状态图的处理单元。每个节点是一个纯函数:接收状态字典,返回要更新的字段。节点应该无副作用——不直接修改外部状态,所有变更通过返回值体现。

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

def search_node(state: ResearchState) -> dict:
    """搜索网络获取相关资料"""
    query = state["query"]
    # 实际项目中这里调用搜索 API(如 Tavily、Exa)
    sources = [
        f"https://example.com/result1?q={query}",
        f"https://example.com/result2?q={query}",
    ]
    return {"sources": sources}

def synthesize_node(state: ResearchState) -> dict:
    """使用 LLM 综合搜索结果生成报告"""
    context = "\n".join(state["sources"])
    prompt = f"基于以下资料综合研究报告:{context}"
    response = llm.invoke(prompt)
    return {"synthesis": response.content}

def human_review_node(state: ResearchState) -> dict:
    """等待人类审批(实际项目中通过 API 实现)"""
    # 这里会触发 interrupt_before 暂停
    # 审批后通过 update_state 设置 is_approved
    return {"is_approved": state.get("is_approved", False)}

04 构建状态图 (StateGraph)

有了状态和节点,现在使用 StateGraph 构建完整的执行流程。关键点在于条件边——它允许根据状态动态决定下一步。

from langgraph.graph import StateGraph, START, END

# 1. 初始化图
builder = StateGraph(ResearchState)

# 2. 添加节点
builder.add_node("search", search_node)
builder.add_node("synthesize", synthesize_node)
builder.add_node("review", human_review_node)
builder.add_node("publish", publish_node)
builder.add_node("revise", revise_node)

# 3. 定义边(固定流程)
builder.add_edge(START, "search")
builder.add_edge("search", "synthesize")
builder.add_edge("synthesize", "review")

# 4. 条件边:根据审批结果决定走向
def route_after_review(state: ResearchState) -> Literal["publish", "revise"]:
    if state["is_approved"]:
        return "publish"
    else:
        return "revise"

builder.add_conditional_edges(
    "review",
    route_after_review,
    {"publish": "publish", "revise": "revise"}
)

# 5. 循环:修订后回到综合步骤
builder.add_edge("revise", "synthesize")
builder.add_edge("publish", END)

# 6. 编译(添加持久化)
from langgraph.checkpoint.postgres import PostgresSaver

graph = builder.compile(
    checkpointer=PostgresSaver.from_conn_string(
        "postgresql://user:pass@localhost:5432/langgraph_demo"
    ),
    interrupt_before=["review"]  # 在审批节点前暂停
)
LangGraph 状态图执行流程:搜索→综合→审批→发布/修订
图 4-1:完整的状态图执行流程,包含条件路由和循环

05 执行与 Human-in-the-loop 审批

现在执行工作流。注意 interrupt_before=["review"] 配置——这会让图在执行到 review 节点前自动暂停,等待人类决策。

# 配置线程(用于持久化会话)
config = {"configurable": {"thread_id": "session-001"}}

# 启动工作流
result = graph.invoke({
    "query": "2026 年 AI Agent 工作流引擎技术趋势",
    "messages": [],
    "sources": [],
    "synthesis": "",
    "is_approved": False,
}, config)

# 检查暂停状态
snapshot = graph.get_state(config)
print(f"下一步执行:{snapshot.next}")  # 输出:['review']

# 查看当前状态
current_state = snapshot.values
print(f"已收集来源:{len(current_state['sources'])} 个")

暂停后,你可以通过外部系统(Web 界面、Slack 机器人等)展示当前结果,等待用户审批:

# 用户批准后更新状态
graph.update_state(config, {
    "is_approved": True,
    "messages": ["用户已批准,继续发布"]
})

# 从断点继续执行
final_result = graph.invoke(None, config)
生产价值

Human-in-the-loop 是生产系统的关键特性——在内容发布、资金操作、敏感决策等场景,必须保留人类最终控制权。

06 检查点持久化与故障恢复

LangGraph 的检查点机制会在每个 Super-Step 后自动保存状态到数据库。这意味着:

  • 服务器重启后,从最近的检查点恢复
  • 用户间隔几天继续对话,历史状态完整保留
  • 调试时可以回放任意时间点的状态
# 查看历史检查点
from langgraph.checkpoint.postgres import PostgresSaver

saver = PostgresSaver.from_conn_string(DATABASE_URL)

# 获取某个线程的所有检查点
checkpoints = saver.list({"configurable": {"thread_id": "session-001"}})

for checkpoint in checkpoints:
    print(f"时间:{checkpoint['created_at']}")
    print(f"状态:{checkpoint['channel_values']}")
    print("---")

# 回滚到特定检查点
target_checkpoint = checkpoints[-2]  # 倒数第二个
graph.update_state(config, target_checkpoint["channel_values"])
LangGraph 检查点机制:时间线上的状态序列
图 6-1:检查点沿时间线组织,支持回溯和分支

07 可观测性:使用 LangGraph Studio 调试

LangGraph Studio 是官方可视化工具,可以实时查看状态图的执行过程:

# 安装 LangGraph Studio
pip install langgraph-cli

# 启动开发服务器
langgraph dev

# 在浏览器打开 http://localhost:8123

Studio 提供以下能力:

  • 可视化执行轨迹:看到状态如何在节点间流转
  • 状态检查:点击任意节点查看当时的状态值
  • 时间旅行调试:回放到历史检查点重新执行
  • 人工审批界面:直接在 UI 中批准或拒绝

08 生产模式:错误处理与优化

最后,补充生产环境必备的错误处理和优化策略:

import logging
from tenacity import retry, stop_after_attempt, wait_exponential

logger = logging.getLogger(__name__)

# 重试装饰器
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def search_with_retry(query: str):
    return search_api(query)

# 在节点内捕获异常
def safe_search_node(state: ResearchState) -> dict:
    try:
        sources = search_with_retry(state["query"])
        return {"sources": sources}
    except Exception as e:
        logger.error(f"搜索失败:{e}")
        return {"error": str(e)}

# 设置最大迭代次数防止无限循环
graph = builder.compile(
    checkpointer=saver,
    interrupt_before=["review"]
)

# 执行时限制最大步数
result = graph.invoke(
    input_data,
    config,
    stream_mode="updates",  # 流式输出中间状态
    recursion_limit=25  # 防止无限循环
)
无限循环陷阱

曾有团队的生产系统因条件边逻辑错误陷入无限循环,11 天内消耗了 $47,000 的 API 费用。务必设置 recursion_limit 并监控执行步数。

FAQ 常见问题解答

Q: LangGraph 和 CrewAI 有什么区别?

CrewAI 采用基于角色的抽象(Agent 有 Role/Goal/Backstory),适合快速原型和多 Agent 协作场景。LangGraph 采用更底层的状态图抽象,提供精确的流程控制和持久化能力,适合生产环境的关键业务系统。简单说:CrewAI 像"乐高积木",LangGraph 像"电路板"。

Q: 是否需要 PostgreSQL?可以用其他数据库吗?

LangGraph 支持多种检查点后端:PostgreSQL(生产推荐)、Redis(低延迟场景)、SQLite(开发测试)。你也可以实现自定义的 BaseCheckpointSaver 对接其他存储系统。

Q: 如何并行执行多个分支?

使用 LangGraph 1.0 新增的 Send API 实现动态 Fan-Out。例如同时搜索多个子主题,然后聚合结果。具体可以参考官方文档的"Parallel Execution"章节。

Q: 适合什么样的项目规模?

如果你的工作流只有 2-3 个简单步骤,使用 LangChain 的线性链就足够了。LangGraph 的价值体现在:5+ 步骤、需要条件分支、需要人类审批、需要故障恢复的复杂场景。