LangGraph 技术架构与实现详解
本文档旨在全面剖析 LangGraph 的技术架构与底层实现。LangGraph 是一个用于构建有状态、可循环、多参与者(Multi-agent)应用的强大框架,它作为 LangChain 生态的关键扩展,为复杂的 AI 工作流提供了图计算的能力。
1. 顶层架构与设计哲学 (High-Level Architecture)
从最高层面看,LangGraph 的设计目标是将复杂的 AI 应用逻辑,特别是那些包含循环、条件分支和状态依赖的 Agentic 工作流,抽象为一个可计算、可观测、可持久化的有向图。其核心架构思想可以概括为以下几点:
- 图即应用 (Graph as Application):将整个应用程序的工作流程建模为一个有向图(Directed Graph)。图中的节点 (Node) 代表计算单元(如调用 LLM、执行工具、处理数据),而边 (Edge) 代表控制流和数据流的方向。
- 状态机范式 (State Machine Paradigm):整个图的执行过程被视为一个状态机。存在一个全局共享的状态 (State) 对象,在整个图的执行过程中被持久化和传递。每个节点接收当前状态,执行其逻辑,然后返回对状态的更新。这种模式使得应用的状态变迁清晰可追溯。
- 声明式定义 (Declarative Definition):开发者通过声明式的 API 来“绘制”这张图——定义节点、定义边、设置入口和出口。开发者更关注“做什么”(What),而不是“如何执行”(How),具体的调度、并发和状态管理由框架处理。
- 人机协同原生支持 (Human-in-the-Loop Native Support):架构层面内置了中断和恢复的能力。图可以在任意节点暂停,等待外部(如人工审批)输入,然后再从断点处无缝恢复执行,这对于构建需要人工干预的复杂流程至关重要。
下图描绘了 LangGraph 的高层逻辑视图:
+-------------------------------------------------+
| Application Layer |
| (e.g., Multi-agent System, ReAct Loop, CoT) |
+----------------------+--------------------------+
| (Defines the flow)
v
+-------------------------------------------------+
| LangGraph API & Core Logic |
| +-----------------+ +-----------------------+ |
| | Graph Builder | | Execution Engine | |
| | (Nodes, Edges) | | (Scheduler, Executor) | |
| +-----------------+ +-----------------------+ |
| +-----------------+ +-----------------------+ |
| | State Manager | | Checkpoint & Persist | |
| +-----------------+ +-----------------------+ |
+----------------------+--------------------------+
| (Leverages & Extends)
v
+-------------------------------------------------+
| LangChain Ecosystem (LCEL, etc.) |
| +-------+ +-------+ +---------+ +------+ |
| | LLMs | | Tools | | Prompts | | Memory |
| +-------+ +-------+ +---------+ +------+ |
+-------------------------------------------------+
| (Built Upon)
v
+-------------------------------------------------+
| Core Libraries & Python Runtime |
| (NetworkX, Pydantic, Asyncio, etc.) |
+-------------------------------------------------+
┌─────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Agents │ │ Workflows │ │ Tools │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Execution Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Graph │ │ State │ │ Checkpoints│ │
│ │ Executor │ │ Manager │ │ Manager │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Core Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Node │ │ Edge │ │ Channel │ │
│ │ Registry │ │ Resolver │ │ System │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Infrastructure Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Persistence │ │ Concurrency │ │ Observability│ │
│ │ Layer │ │ Control │ │ System │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────┘
2. 核心组件与技术栈 (Core Components & Tech Stack)
为了实现上述顶层设计,LangGraph 精心选择并整合了一系列成熟的技术组件,形成了一个分层、解耦的技术栈。
2.1 技术栈总览
- 运行时环境: Python 3.9+ / Node.js 18+ (官方提供双语言实现)
- 图结构与算法:
networkx
(Python) - 状态定义与验证:
pydantic
(首选) 或 TypedDict
- 异步与并发:
asyncio
/ anyio
(Python), Promise
(TypeScript) - 核心依赖:
LangChain Expression Language (LCEL)
2.2 核心组件详解
图数据结构 (Graph Data Structure)
- High-Level: LangGraph 将工作流表示为一个有向图,允许存在循环。这个图定义了所有可能的计算路径。
- Low-Level: 在 Python 实现中,LangGraph 直接依赖
networkx.DiGraph
作为其底层的图存储和算法引擎。networkx
是一个功能强大的图论库,LangGraph 利用它进行:- 拓扑结构管理: 添加、删除节点和边。
- 路径与遍历: 寻找从入口到终点的执行路径。
- 循环检测: 在编译阶段或运行时识别和管理图中的循环,防止无限循环(除非显式允许)。
- 图算法: 利用
networkx
成熟的算法进行图的分析和优化。
状态管理 (State Management)
- High-Level: 状态是流经图的“血液”。LangGraph 强制要求一个显式、强类型的状态定义,该状态在所有节点间共享和更新。
- Low-Level:
- 模式定义 (Schema Definition): 主要使用
pydantic.BaseModel
来定义状态的结构。这带来了巨大的好处:- 运行时类型安全: 自动验证每次状态更新是否符合预定义的模式。
- 序列化/反序列化: Pydantic 提供了高效的
.model_dump()
和 .model_validate()
方法,为状态的持久化(Checkpointing)和网络传输提供了便利。 - IDE 智能提示: 带来优秀的开发体验。
- 状态更新机制: LangGraph 借鉴了 Reducer 模式(源于 Redux)。每个节点执行后返回的不是一个完整的状态对象,而是一个部分状态更新 (Partial State Update)。LangGraph 的执行引擎负责将这个部分更新安全地合并回主状态。这种设计天然地支持了并行执行分支的状态合并,因为不同分支可以独立更新状态的不同字段,然后由框架原子性地合并。
- 不可变性 (Immutability): 推荐的最佳实践是节点不直接修改传入的状态(in-place mutation),而是返回一个新的、更新后的状态对象或部分状态字典。这使得状态追踪和调试(时间旅行)变得简单。
执行引擎 (Execution Engine)
- High-Level: 负责根据图的结构、当前状态和边的条件,调度和执行节点,驱动整个工作流前进。
- Low-Level:
异步优先 (Async-First): 核心执行逻辑建立在 asyncio
之上。这使得 I/O 密集型操作(如调用外部 LLM API、数据库查询)可以被高效地并发执行,极大地提升了应用的吞吐量。anyio
库被用作兼容层,以支持不同的异步事件循环。
调度逻辑: 执行引擎的简化伪代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| # Simplified Execution Loop
state = initial_state
current_node_name = ENTRY_POINT
while current_node_name != END:
# 1. Fetch the node to execute
node_runnable = graph.nodes[current_node_name]['function']
# 2. Execute the node (potentially in parallel with others)
# The await here is key for async execution
partial_update = await node_runnable.invoke(state)
# 3. Merge the update into the global state
state = merge_state(state, partial_update)
# 4. Persist a checkpoint of the new state
save_checkpoint(state)
# 5. Determine the next node(s) based on conditional edges
current_node_name = self.route(state, graph.out_edges(current_node_name))
|
流式处理 (Streaming): 基于 asyncio
的异步生成器 (async def... yield
),LangGraph 原生支持流式输出。当一个节点(如 LLM)产生流式响应时,执行引擎可以将这些 token 块实时地流向最终用户,而无需等待整个图执行完毕。
控制流 (Control Flow)
- High-Level: LangGraph 提供了强大的控制流机制,包括条件分支和循环,这是构建复杂 Agent 的核心。
- Low-Level:
条件边 (Conditional Edges): 这是实现动态路由的关键。当从一个源节点引出多条边时,可以为每条边关联一个条件函数。这个函数接收当前的状态对象,并返回一个布尔值或一个特定的键。执行引擎在源节点执行完毕后,会调用这些条件函数来决定接下来应该走哪一条边。
1
2
3
4
5
6
| # Conceptual implementation of conditional edges
def add_conditional_edges(source, path_function, path_map):
# path_function(state) returns a key, e.g., "tool_call" or "respond"
# path_map maps these keys to destination nodes:
# {"tool_call": "TOOL_NODE", "respond": "RESPONSE_NODE"}
# LangGraph then creates edges with these conditions attached.
|
循环实现: 循环并非一种特殊的语法,而是通过图的拓扑结构自然实现的。一个条件边可以指向图中的一个前序节点,从而构成一个循环。为了防止死循环,通常会在状态中加入一个计数器或设置一个最大迭代次数,并在条件边函数中进行判断。
检查点与持久化 (Checkpointing & Persistence)
- High-Level: 为了实现长时间运行、可中断和可恢复的应用,LangGraph 提供了强大的检查点机制。
- Low-Level:
- 可插拔后端: LangGraph 设计了一个可插拔的检查点后端系统。虽然默认是内存存储 (
MemorySaver
),但可以轻松替换为生产级的持久化存储,如 Redis
, PostgreSQL
, SQLite
等。 - 时间旅行调试 (Time-Travel Debugging): 由于每次状态更新后都会保存一个快照,这使得“时间旅行”成为可能。开发者可以加载任意历史检查点,查看当时的状态,并从该点重新开始执行。这对于调试复杂的、非确定性的 AI 应用来说是一个杀手级功能。
3. 与 LangChain 的深度集成
LangGraph 并非一个孤立的库,而是深度根植于 LangChain 生态系统,尤其是 LangChain 表达式语言 (LCEL)。
- 万物皆 Runnable: LCEL 的核心是
Runnable
协议,它为链、提示、模型、工具等所有组件提供了一套统一的接口 (invoke
, stream
, batch
, ainvoke
等)。LangGraph 的每个节点本身就是一个 Runnable
,而整个编译后的 Graph 也是一个 Runnable
。 - 无缝复用: 这种设计带来了极大的可组合性。任何 LangChain 组件都可以直接作为 LangGraph 的一个节点,无需任何封装或修改。
- 继承可观测性: LangGraph 自动继承了 LangChain 的可观测性能力。当与 LangSmith 或其他兼容 OpenTelemetry 的追踪系统集成时,图的每一次执行,包括每个节点的输入输出、耗时、内部调用链,都会被完整地记录下来,形成一个可视化的执行轨迹。
4. 总结:技术优势与权衡
优势:
- 站在巨人肩膀上: 巧妙地利用
networkx
, pydantic
, asyncio
等成熟库,专注于解决 AI 工作流这一核心问题,而非重复造轮子。 - 与 LangChain 生态的协同效应: 深度集成带来了无与伦比的组件复用能力和一流的可观测性。
- 专为 Agentic 流程优化: 对循环、状态管理、人机交互和持久化的原生支持,完美匹配了现代 AI Agent 的开发需求。
- 类型安全与健壮性: Pydantic 的使用大大提升了代码的健壮性和可维护性。
权衡与限制:
- 学习曲线: 对于不熟悉 LangChain 或图概念的开发者,存在一定的学习成本。
- 生态锁定: 与 LangChain 深度绑定,虽然带来了好处,但在一定程度上也形成了生态锁定。
- 性能考量: 对于超大规模图(例如,数十万节点),
networkx
的内存占用和调度性能可能成为瓶颈。在这种情况下,可能需要开发者进行手动的图切分或优化。 - 序列化开销: 对于包含复杂、大型非标准对象的 State,其序列化和反序列化(在每个检查点)可能会引入不可忽视的性能开销。
结论:
LangGraph 的技术底座是一个精心设计的“胶水层”架构。它并没有发明全新的底层技术,而是通过对现有顶级开源组件的智慧编排和深度整合,提供了一个优雅、强大且高度专业化的抽象层,精准地解决了构建有状态、可循环的复杂 AI 应用这一核心痛点。它的成功在于其务实的设计哲学:组合优于创造,专注优于全面。
LangGraph 的技术底座可以从几个层面来理解:
核心依赖和技术栈
1. 图数据结构基础
- NetworkX:Python 版本使用 NetworkX 作为图结构的底层实现,用于存储节点、边、拓扑排序、循环检测等图算法
- 有向图(DAG)模型:虽然支持循环,但底层仍基于有向图的数据结构来组织工作流
2. LangChain 生态系统
- LangChain Expression Language (LCEL):继承了 LangChain 的 Runnable 协议,所有节点都实现
invoke()
, stream()
, batch()
等标准接口 - LangChain 组件复用:可以直接使用 LangChain 的各种组件(LLM、工具、检索器等)作为图节点
- 可观测性基础设施:复用 LangChain 的追踪系统(LangSmith、回调机制)
3. 状态管理和类型系统
- Pydantic:用于定义和验证状态模式(State Schema),提供运行时类型检查和序列化能力
- TypedDict(Python)或 zod(TypeScript):作为轻量级的状态定义选项
- Reducer 模式:借鉴了 Redux 的状态更新理念,通过 reducer 函数来合并并行分支的状态更新
4. 异步和并发基础
- asyncio(Python):提供异步执行能力,支持并行节点执行
- anyio:提供跨异步框架的兼容层
- Streaming 支持:基于异步生成器实现流式输出
核心实现机制
1. 图编译和执行引擎
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| # 简化的执行流程
class GraphExecutor:
def __init__(self, graph: nx.DiGraph):
self.graph = graph
self.state_schema = ...
async def execute(self, initial_state):
current_node = self.get_entry_point()
state = initial_state
while current_node != END:
# 执行节点
node_fn = self.graph.nodes[current_node]['function']
state = await node_fn(state)
# 决定下一个节点
edges = self.graph.out_edges(current_node)
current_node = self.route(state, edges)
return state
|
2. 状态管理机制
- 不可变状态更新:每个节点返回新的状态对象,而不是修改原状态
- 部分状态更新:节点可以只更新状态的特定字段,其他字段保持不变
- 状态通道(Channels):更高级的状态管理,支持不同类型的状态合并策略
3. 条件路由和控制流
1
2
3
4
5
6
| # 条件边的实现
def add_conditional_edges(source, path_map, path_fn):
for target, condition in path_map.items():
def edge_condition(state):
return path_fn(state) == condition
graph.add_edge(source, target, condition=edge_condition)
|
4. 检查点和持久化
- 内存检查点:默认的状态快照存储
- 可插拔的持久化后端:支持 Redis、PostgreSQL 等作为检查点存储
- 时间旅行调试:可以回溯到任意历史状态继续执行
设计模式和架构特点
1. 声明式 API
- 通过方法链构建图结构,而不是命令式地编写执行逻辑
- 关注"what"而非"how"
2. 可组合性
- 子图可以作为节点嵌入到父图中
- 节点之间通过标准化的状态接口通信
3. 扩展性
- 自定义节点类型
- 自定义路由逻辑
- 自定义状态通道和 reducer
4. 与 LLM 工作流的适配
1. 技术优势和限制
优势:
- 利用成熟的图算法库,不重复造轮子
- 与 LangChain 生态深度集成
- 类型安全和运行时验证
- 原生支持异步和流式处理
限制:
- 对 LangChain 的依赖较重
- 大规模图(10万+ 节点)的性能需要优化
- 状态序列化在复杂对象时可能有开销
总的来说,LangGraph 是一个精心设计的"胶水层",它将多个成熟的技术组件(图算法、类型系统、异步框架、LLM 工具链)有机结合,专门针对 AI Agent 工作流的需求进行了优化。它的成功在于没有重新发明轮子,而是站在巨人的肩膀上,提供了一个优雅的抽象层。
LangGraph是LangChain生态系统中用于构建有状态、多参与者应用程序的库,其技术底座主要包括以下几个层面:
核心技术架构:
图计算框架:
- 基于有向图(Directed Graph)的计算模型,将应用逻辑表示为节点和边的组合
- 支持循环图结构,允许复杂的工作流控制和状态管理
- 采用状态机模式,每个节点可以读取、修改和传递状态
状态管理系统:
- 实现了持久化的状态存储机制,支持跨节点的状态共享
- 提供状态快照和回滚功能
- 支持并发状态访问和修改的安全机制
执行引擎:
- 异步执行框架,支持并发节点处理
- 智能调度器,根据图结构和依赖关系优化执行顺序
- 支持条件分支和动态路由
底层技术依赖:
Python生态系统:
- 基于Python构建,充分利用其丰富的AI/ML库生态
- 使用asyncio进行异步编程支持
- 集成了类型提示(Type Hints)以提供更好的开发体验
LangChain集成:
- 深度集成LangChain的组件体系(LLMs、工具、内存等)
- 复用LangChain的抽象层和接口设计
- 支持LangChain的可观测性和调试工具
序列化和持久化:
- 支持多种状态持久化后端(内存、文件系统、数据库等)
- 使用JSON/Pickle等序列化机制保存和恢复状态
- 提供检查点(Checkpoint)机制用于故障恢复
关键设计模式:
Actor模型:
- 每个节点可以看作是一个独立的Actor,处理消息和状态
- 支持多Agent协作场景
流式处理:
- 支持流式输入输出,适合实时交互场景
- 可以处理大型数据流而不需要全部加载到内存
可组合性:
- 节点和图可以嵌套组合,支持复杂应用的模块化构建
- 提供丰富的预构建节点类型和工具
技术特色:
人机协作:
- 内置对人工干预的支持,可以在执行过程中暂停等待人工输入
- 支持审批流程和人工验证节点
可观测性:
- 提供详细的执行日志和状态跟踪
- 支持可视化图结构和执行流程
- 集成调试和监控工具
容错性:
LangGraph的技术底座体现了现代AI应用开发的趋势:将复杂的AI工作流抽象为可管理的图结构,同时保持足够的灵活性来处理各种复杂场景。它特别适合构建需要多步推理、工具调用、人机协作的智能应用。
具体技术分层图
下面这张“技术分层图”可以快速看到 LangGraph 究竟踩在了哪几块“地基”之上。再往下读,会把每一层拆开讲:
┌──────────────────────────────────────────────┐
│ 你的业务代码 │
└──────────────────────────────────────────────┘
┌──────────────────────────────────────────────┐
│ LangGraph API(GraphRunnable) │
│ ─────────────────────────────────────────── |
│ • 节点/边 DSL |
│ • State 模型(@pydantic) │
│ • Router / 条件跳转 / 循环语义 │
│ • 调度引擎(async / streaming) │
└──────────────────────────────────────────────┘
┌──────────────────────────────────────────────┐
│ LangChain Expression Language │
│ ────────────────────────────────────────── │
│ • Runnable、LCEL ▶│◇ 组合算子 │
│ • ChatModel / Retriever / Memory 等适配器 │
│ • 调用链路 Tracing (LangSmith, OpenTelemetry)│
└──────────────────────────────────────────────┘
┌──────────────────────────────────────────────┐
│ 通用第三方基础库 & 运行时能力 │
│ ────────────────────────────────────────── │
│ • networkx → 图结构存储/遍历/循环检测 │
│ • pydantic → 状态数据校验 & 类型提示 │
│ • asyncio / anyio → 并发、流式传输 │
│ • typing_extensions / dataclasses 等 │
└──────────────────────────────────────────────┘
┌──────────────────────────────────────────────┐
│ Python 3.9+ / Node.js 18+ │
│ (官方提供双语言实现) │
└──────────────────────────────────────────────┘
一、核心实现思路
图即流程:
• 通过 networkx.DiGraph
把每个 节点(Node)和 边(Edge)抽象成 Python 对象。
• 节点实现统一的 Runnable 接口(来自 LangChain),因此可以是 prompt、LLM、工具调用或子图。
明确定义“状态”:
• 所有节点输入/输出共享一份 State(通常是 pydantic.BaseModel
)。
• LangGraph 在运行时只更新被节点声明会“写入”的字段,天然支持并行分支合并。
调度引擎:
• 每一步运行后,会根据边上绑定的 条件函数 或 Router 决定下一个节点。
• 支持显式循环(while-style)与“跳转表”。
• 底层调度用 asyncio(Python)或 Promise(TS),因此可以边计算边流式输出。
可观测性:
• 默认复用 LangChain 的 Tracing 协议(LangSmith / OpenTelemetry)。
• 每个节点天然是一个可追踪的 RunnableSpan。
二、为什么要依赖 LangChain?
• LangChain 把各种 LLM、VectorStore、Tooling 等适配好了,LangGraph 只管“数据流 + 控制流”;
• LangChain 的 Runnable 协议提供统一 invoke()/stream()/batch()
API,让调度器无需感知底层是同步还是异步、远程还是本地。
三、关键第三方库怎么用?
networkx
– 建图、拓扑排序、循环检测、求子图。pydantic
– 运行时校验 + IDE 友好的类型提示,一行代码搞定深拷贝 & .model_dump()
序列化。asyncio / anyio
– 让一个节点跑 LLM、另一个节点查向量库,全程 await
,再把 token 流一路向上传。
四、技术亮点/坑点
• “可回溯循环”——把循环次数写进 State,可用于阻断死循环。
• “部分更新 State”——大幅降低序列化开销。
• 由于依赖 networkx
,图规模到 10⁵ 以上节点可能需要自行做分段执行或并行拆图。
五、快速代码感受
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
import pydantic, asyncio
class FlowState(pydantic.BaseModel):
history: list[str] = []
user_input: str = ""
llm = ChatOpenAI(model="gpt-3.5-turbo", streaming=True)
# 1. 定义节点
async def answer_q(state: FlowState) -> FlowState:
rsp = await llm.ainvoke(state.user_input)
state.history.append(rsp.content)
return state
def check_continue(state: FlowState) -> str:
return END if state.user_input.lower() == "quit" else "ask"
# 2. 画图
g = StateGraph(FlowState)
g.add_node("ask", answer_q)
g.set_entry_point("ask")
g.add_conditional_edges("ask", check_continue)
graph_runnable = g.compile()
# 3. 调用
asyncio.run(
graph_runnable.ainvoke({"user_input": "LangGraph 的技术底座?"})
)
|
总结一句话:
LangGraph 并没有重新造轮子,而是“把 LangChain 的可组合能力 + networkx 的图能力 + pydantic 的数据能力 + asyncio 的并发能力”拼成了一个专注于 “可循环 & 有状态” LLM 工作流 的执行框架,这些就是它真正的技术底座。
2. 核心组件深度解析
2.1 图结构实现(Graph Core)
StateGraph类架构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| class StateGraph:
def __init__(self, state_schema: Type[BaseModel]):
self.nodes: Dict[str, NodeSpec] = {}
self.edges: Dict[str, EdgeSpec] = {}
self.channels: Dict[str, Channel] = {}
self.state_schema = state_schema
self.compiled_graph: Optional[CompiledGraph] = None
def add_node(self, name: str, func: Callable, **kwargs):
"""节点注册机制"""
self.nodes[name] = NodeSpec(
name=name,
func=func,
input_channels=self._infer_input_channels(func),
output_channels=self._infer_output_channels(func),
metadata=kwargs
)
def add_edge(self, source: str, target: str):
"""边的添加与验证"""
if source not in self.nodes or target not in self.nodes:
raise GraphValidationError(f"Unknown node: {source} or {target}")
self.edges[f"{source}->{target}"] = EdgeSpec(
source=source,
target=target,
condition=None,
edge_type=EdgeType.UNCONDITIONAL
)
def add_conditional_edges(self, source: str, condition: Callable,
mapping: Dict[str, str]):
"""条件边实现"""
for condition_result, target in mapping.items():
self.edges[f"{source}->{target}@{condition_result}"] = EdgeSpec(
source=source,
target=target,
condition=condition,
condition_result=condition_result,
edge_type=EdgeType.CONDITIONAL
)
|
图编译过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| class GraphCompiler:
def compile(self, graph: StateGraph) -> CompiledGraph:
"""图编译的核心逻辑"""
# 1. 拓扑排序验证
self._validate_topology(graph)
# 2. 状态通道构建
channels = self._build_channels(graph)
# 3. 执行计划生成
execution_plan = self._generate_execution_plan(graph)
# 4. 优化策略应用
optimized_plan = self._optimize_execution_plan(execution_plan)
return CompiledGraph(
nodes=graph.nodes,
edges=graph.edges,
channels=channels,
execution_plan=optimized_plan,
state_schema=graph.state_schema
)
def _validate_topology(self, graph: StateGraph):
"""拓扑验证算法"""
visited = set()
rec_stack = set()
def dfs(node):
if node in rec_stack:
raise CyclicGraphError(f"Cycle detected involving node: {node}")
if node in visited:
return
visited.add(node)
rec_stack.add(node)
for edge in graph.edges.values():
if edge.source == node:
dfs(edge.target)
rec_stack.remove(node)
for node in graph.nodes:
if node not in visited:
dfs(node)
|
2.2 状态管理系统
Channel系统实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| class Channel(ABC):
"""状态通道抽象基类"""
def __init__(self, value_type: Type, reducer: Optional[Callable] = None):
self.value_type = value_type
self.reducer = reducer or self._default_reducer
@abstractmethod
def update(self, old_value: Any, new_value: Any) -> Any:
"""状态更新逻辑"""
pass
def _default_reducer(self, old_value: Any, new_value: Any) -> Any:
"""默认的状态归约策略"""
return new_value
class LastValueChannel(Channel):
"""最后值策略通道"""
def update(self, old_value: Any, new_value: Any) -> Any:
return new_value
class BinaryOperatorChannel(Channel):
"""二元操作通道"""
def __init__(self, value_type: Type, operator: Callable):
super().__init__(value_type)
self.operator = operator
def update(self, old_value: Any, new_value: Any) -> Any:
if old_value is None:
return new_value
return self.operator(old_value, new_value)
class AppendChannel(BinaryOperatorChannel):
"""追加操作通道"""
def __init__(self, value_type: Type):
super().__init__(value_type, lambda a, b: a + [b] if isinstance(a, list) else [a, b])
|
状态管理器实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| class StateManager:
def __init__(self, channels: Dict[str, Channel], state_schema: Type[BaseModel]):
self.channels = channels
self.state_schema = state_schema
self.current_state: Dict[str, Any] = {}
self.state_history: List[Dict[str, Any]] = []
def update_state(self, updates: Dict[str, Any]) -> Dict[str, Any]:
"""状态更新的核心逻辑"""
new_state = self.current_state.copy()
for key, new_value in updates.items():
if key not in self.channels:
raise StateError(f"Unknown state key: {key}")
channel = self.channels[key]
old_value = new_state.get(key)
new_state[key] = channel.update(old_value, new_value)
# 状态验证
self._validate_state(new_state)
# 历史记录
self.state_history.append(self.current_state.copy())
self.current_state = new_state
return new_state
def _validate_state(self, state: Dict[str, Any]):
"""状态一致性验证"""
try:
self.state_schema(**state)
except ValidationError as e:
raise StateValidationError(f"State validation failed: {e}")
|
2.3 执行引擎架构
节点执行器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| class NodeExecutor:
def __init__(self, node_spec: NodeSpec, state_manager: StateManager):
self.node_spec = node_spec
self.state_manager = state_manager
self.execution_context = ExecutionContext()
async def execute(self, input_state: Dict[str, Any],
config: ExecutionConfig) -> Dict[str, Any]:
"""节点执行的核心逻辑"""
try:
# 1. 前置处理
processed_input = self._preprocess_input(input_state)
# 2. 函数调用
if asyncio.iscoroutinefunction(self.node_spec.func):
result = await self.node_spec.func(processed_input)
else:
result = await asyncio.get_event_loop().run_in_executor(
None, self.node_spec.func, processed_input
)
# 3. 结果处理
output_state = self._postprocess_output(result)
# 4. 状态更新
updated_state = self.state_manager.update_state(output_state)
return updated_state
except Exception as e:
await self._handle_execution_error(e, input_state, config)
raise
def _preprocess_input(self, state: Dict[str, Any]) -> Any:
"""输入预处理"""
if len(self.node_spec.input_channels) == 1:
key = list(self.node_spec.input_channels.keys())[0]
return state.get(key)
else:
return {key: state.get(key) for key in self.node_spec.input_channels}
|
图执行器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
| class GraphExecutor:
def __init__(self, compiled_graph: CompiledGraph):
self.graph = compiled_graph
self.checkpoint_manager = CheckpointManager()
self.execution_tracer = ExecutionTracer()
async def invoke(self, input_data: Dict[str, Any],
config: Optional[ExecutionConfig] = None) -> Dict[str, Any]:
"""图执行的主入口"""
config = config or ExecutionConfig()
# 初始化状态
state_manager = StateManager(
self.graph.channels,
self.graph.state_schema
)
state_manager.update_state(input_data)
# 创建执行上下文
context = ExecutionContext(
graph=self.graph,
state_manager=state_manager,
config=config,
trace_id=uuid.uuid4()
)
try:
# 执行图
final_state = await self._execute_graph(context)
return final_state
except Exception as e:
await self._handle_graph_error(e, context)
raise
async def _execute_graph(self, context: ExecutionContext) -> Dict[str, Any]:
"""图执行的核心算法"""
current_nodes = {self.graph.entry_point}
visited_nodes = set()
while current_nodes:
# 并发执行当前层的所有节点
tasks = []
for node_name in current_nodes:
if node_name not in visited_nodes:
node_executor = NodeExecutor(
self.graph.nodes[node_name],
context.state_manager
)
tasks.append(
self._execute_node_with_trace(
node_executor, context, node_name
)
)
visited_nodes.add(node_name)
if tasks:
await asyncio.gather(*tasks)
# 计算下一批要执行的节点
next_nodes = set()
for node_name in current_nodes:
next_nodes.update(
self._get_next_nodes(node_name, context.state_manager.current_state)
)
current_nodes = next_nodes - visited_nodes
return context.state_manager.current_state
def _get_next_nodes(self, current_node: str, state: Dict[str, Any]) -> Set[str]:
"""计算下一步要执行的节点"""
next_nodes = set()
for edge in self.graph.edges.values():
if edge.source == current_node:
if edge.edge_type == EdgeType.UNCONDITIONAL:
next_nodes.add(edge.target)
elif edge.edge_type == EdgeType.CONDITIONAL:
condition_result = edge.condition(state)
if condition_result == edge.condition_result:
next_nodes.add(edge.target)
return next_nodes
|
3. 检查点与持久化机制
3.1 检查点系统架构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| class CheckpointManager:
def __init__(self, storage_backend: StorageBackend):
self.storage = storage_backend
self.checkpoint_strategy = CheckpointStrategy.EVERY_NODE
async def save_checkpoint(self, execution_id: str, node_name: str,
state: Dict[str, Any], metadata: Dict[str, Any]):
"""保存检查点"""
checkpoint = Checkpoint(
execution_id=execution_id,
node_name=node_name,
state=state,
metadata=metadata,
timestamp=datetime.utcnow(),
checkpoint_id=uuid.uuid4()
)
await self.storage.save(checkpoint)
async def restore_checkpoint(self, execution_id: str,
checkpoint_id: Optional[str] = None) -> Checkpoint:
"""恢复检查点"""
if checkpoint_id:
return await self.storage.load_by_id(checkpoint_id)
else:
return await self.storage.load_latest(execution_id)
class StorageBackend(ABC):
@abstractmethod
async def save(self, checkpoint: Checkpoint):
pass
@abstractmethod
async def load_by_id(self, checkpoint_id: str) -> Checkpoint:
pass
@abstractmethod
async def load_latest(self, execution_id: str) -> Checkpoint:
pass
class RedisStorageBackend(StorageBackend):
def __init__(self, redis_client):
self.redis = redis_client
async def save(self, checkpoint: Checkpoint):
key = f"checkpoint:{checkpoint.execution_id}:{checkpoint.checkpoint_id}"
data = checkpoint.to_dict()
await self.redis.setex(key, 3600, json.dumps(data))
# 维护最新检查点索引
latest_key = f"latest_checkpoint:{checkpoint.execution_id}"
await self.redis.setex(latest_key, 3600, checkpoint.checkpoint_id)
|
3.2 流式执行支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| class StreamingExecutor:
def __init__(self, graph_executor: GraphExecutor):
self.graph_executor = graph_executor
async def stream(self, input_data: Dict[str, Any],
config: Optional[ExecutionConfig] = None):
"""流式执行图"""
async for event in self._stream_execution(input_data, config):
yield event
async def _stream_execution(self, input_data: Dict[str, Any],
config: ExecutionConfig):
"""流式执行的核心逻辑"""
# 创建执行上下文
context = self._create_streaming_context(input_data, config)
current_nodes = {self.graph_executor.graph.entry_point}
visited_nodes = set()
while current_nodes:
for node_name in current_nodes:
if node_name not in visited_nodes:
# 发送节点开始事件
yield StreamEvent(
event_type=EventType.NODE_START,
node_name=node_name,
state=context.state_manager.current_state.copy(),
timestamp=datetime.utcnow()
)
# 执行节点
node_executor = NodeExecutor(
self.graph_executor.graph.nodes[node_name],
context.state_manager
)
updated_state = await node_executor.execute(
context.state_manager.current_state,
config
)
# 发送节点完成事件
yield StreamEvent(
event_type=EventType.NODE_COMPLETE,
node_name=node_name,
state=updated_state,
timestamp=datetime.utcnow()
)
visited_nodes.add(node_name)
# 计算下一批节点
next_nodes = set()
for node_name in current_nodes:
next_nodes.update(
self.graph_executor._get_next_nodes(
node_name,
context.state_manager.current_state
)
)
current_nodes = next_nodes - visited_nodes
|
4. 高级特性实现
4.1 循环与分支控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| class LoopController:
def __init__(self, max_iterations: int = 100):
self.max_iterations = max_iterations
self.iteration_count = defaultdict(int)
def should_continue(self, loop_id: str, condition_func: Callable,
state: Dict[str, Any]) -> bool:
"""循环控制逻辑"""
self.iteration_count[loop_id] += 1
if self.iteration_count[loop_id] > self.max_iterations:
raise LoopLimitExceededError(
f"Loop {loop_id} exceeded maximum iterations: {self.max_iterations}"
)
return condition_func(state)
class ConditionalBranch:
def __init__(self, condition: Callable, branches: Dict[Any, str]):
self.condition = condition
self.branches = branches
def evaluate(self, state: Dict[str, Any]) -> str:
"""条件分支评估"""
result = self.condition(state)
if result in self.branches:
return self.branches[result]
elif 'default' in self.branches:
return self.branches['default']
else:
raise BranchEvaluationError(
f"No branch found for condition result: {result}"
)
|
4.2 错误处理与重试机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| class ErrorHandler:
def __init__(self, retry_policy: RetryPolicy,
fallback_strategy: FallbackStrategy):
self.retry_policy = retry_policy
self.fallback_strategy = fallback_strategy
async def handle_error(self, error: Exception, context: ExecutionContext) -> bool:
"""错误处理的核心逻辑"""
if self.retry_policy.should_retry(error, context):
await self._execute_retry(context)
return True
else:
await self._execute_fallback(error, context)
return False
async def _execute_retry(self, context: ExecutionContext):
"""重试逻辑"""
delay = self.retry_policy.get_delay(context.retry_count)
await asyncio.sleep(delay)
context.retry_count += 1
async def _execute_fallback(self, error: Exception, context: ExecutionContext):
"""回退策略执行"""
if self.fallback_strategy.has_fallback():
fallback_result = await self.fallback_strategy.execute(error, context)
context.state_manager.update_state(fallback_result)
else:
raise error
class ExponentialBackoffRetryPolicy:
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
def should_retry(self, error: Exception, context: ExecutionContext) -> bool:
return (context.retry_count < self.max_retries and
isinstance(error, RetryableError))
def get_delay(self, retry_count: int) -> float:
return self.base_delay * (2 ** retry_count)
|
5. 性能优化策略
5.1 并发执行优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| class ConcurrencyController:
def __init__(self, max_concurrent_nodes: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent_nodes)
self.node_dependencies = {}
async def execute_nodes_concurrently(self, nodes: List[str],
context: ExecutionContext):
"""并发执行节点"""
# 计算依赖关系
dependency_graph = self._build_dependency_graph(nodes, context.graph)
# 按层级并发执行
execution_levels = self._topological_sort(dependency_graph)
for level in execution_levels:
tasks = []
for node_name in level:
task = self._execute_node_with_semaphore(node_name, context)
tasks.append(task)
if tasks:
await asyncio.gather(*tasks)
async def _execute_node_with_semaphore(self, node_name: str,
context: ExecutionContext):
"""带信号量控制的节点执行"""
async with self.semaphore:
node_executor = NodeExecutor(
context.graph.nodes[node_name],
context.state_manager
)
return await node_executor.execute(
context.state_manager.current_state,
context.config
)
|
5.2 内存管理优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| class MemoryManager:
def __init__(self, max_memory_mb: int = 1024):
self.max_memory_bytes = max_memory_mb * 1024 * 1024
self.state_cache = {}
self.cache_policy = LRUCachePolicy(max_size=1000)
def optimize_state_storage(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""状态存储优化"""
# 大对象压缩
optimized_state = {}
for key, value in state.items():
if self._is_large_object(value):
optimized_state[key] = self._compress_object(value)
else:
optimized_state[key] = value
return optimized_state
def _is_large_object(self, obj: Any) -> bool:
"""判断是否为大对象"""
return sys.getsizeof(obj) > 1024 * 1024 # 1MB
def _compress_object(self, obj: Any) -> CompressedObject:
"""对象压缩"""
serialized = pickle.dumps(obj)
compressed = gzip.compress(serialized)
return CompressedObject(data=compressed, original_type=type(obj))
|
6. 可观测性与监控
6.1 分布式追踪
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| class DistributedTracer:
def __init__(self, tracer_provider):
self.tracer = tracer_provider.get_tracer("langgraph")
async def trace_graph_execution(self, graph_name: str,
execution_func: Callable):
"""图执行追踪"""
with self.tracer.start_as_current_span(
f"graph_execution_{graph_name}"
) as span:
span.set_attribute("graph.name", graph_name)
span.set_attribute("graph.version", "1.0")
try:
result = await execution_func()
span.set_attribute("execution.status", "success")
return result
except Exception as e:
span.set_attribute("execution.status", "error")
span.set_attribute("error.message", str(e))
span.record_exception(e)
raise
async def trace_node_execution(self, node_name: str,
execution_func: Callable):
"""节点执行追踪"""
with self.tracer.start_as_current_span(
f"node_execution_{node_name}"
) as span:
span.set_attribute("node.name", node_name)
start_time = time.time()
try:
result = await execution_func()
execution_time = time.time() - start_time
span.set_attribute("node.execution_time", execution_time)
return result
except Exception as e:
span.record_exception(e)
raise
|
6.2 性能监控
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| class PerformanceMonitor:
def __init__(self):
self.metrics_collector = MetricsCollector()
self.performance_history = deque(maxlen=1000)
async def monitor_execution(self, context: ExecutionContext):
"""执行性能监控"""
metrics = ExecutionMetrics(
execution_id=context.execution_id,
start_time=datetime.utcnow(),
memory_usage=self._get_memory_usage(),
cpu_usage=self._get_cpu_usage()
)
try:
yield metrics
finally:
metrics.end_time = datetime.utcnow()
metrics.duration = (metrics.end_time - metrics.start_time).total_seconds()
self.performance_history.append(metrics)
await self.metrics_collector.record_metrics(metrics)
def _get_memory_usage(self) -> float:
"""获取内存使用量"""
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024 # MB
def _get_cpu_usage(self) -> float:
"""获取CPU使用率"""
return psutil.cpu_percent(interval=0.1)
|