langgraph-fundamentals by langchain-ai/langchain-skills
npx skills add https://github.com/langchain-ai/langchain-skills --skill langgraph-fundamentals图在执行前必须经过 compile() 编译。
构建新图时遵循以下 5 个步骤:
| 何时使用 LangGraph | 何时使用替代方案 |
|---|---|
| 需要对智能体编排进行细粒度控制 | 快速原型制作 → LangChain 智能体 |
| 构建具有分支/循环的复杂工作流 | 简单的无状态工作流 → LangChain 直接调用 |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 需要人在回路、持久化 | 开箱即用的功能 → Deep Agents |
| 需求 | 解决方案 | 示例 |
|---|---|---|
| 覆盖值 | 无归约器(默认) | 计数器等简单字段 |
| 追加到列表 | 归约器 (operator.add / concat) | 消息历史、日志 |
| 自定义逻辑 | 自定义归约器函数 | 复杂合并 |
class State(TypedDict): name: str # 默认:更新时覆盖 messages: Annotated[list, operator.add] # 追加到列表 total: Annotated[int, operator.add] # 对整数求和
</python>
<typescript>
使用 StateSchema 和 ReducedValue 来累积数组。
```typescript
import { StateSchema, ReducedValue, MessagesValue } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
name: z.string(), // 默认:覆盖
messages: MessagesValue, // 消息的内置类型
items: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (current, update) => current.concat(update) }
),
});
from typing import Annotated import operator
class State(TypedDict): messages: Annotated[list, operator.add]
</python>
<typescript>
没有 ReducedValue,数组会被覆盖而不是追加。
```typescript
// 错误:数组将被覆盖
const State = new StateSchema({
items: z.array(z.string()), // 没有归约器!
});
// 节点 1: { items: ["A"] }, 节点 2: { items: ["B"] }
// 最终结果: { items: ["B"] } // A 丢失了!
// 正确做法:使用 ReducedValue
const State = new StateSchema({
items: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (current, update) => current.concat(update) }
),
});
// 最终结果: { items: ["A", "B"] }
def my_node(state: State) -> dict: return {"field": "updated"}
</python>
<typescript>
只返回部分更新,而不是完整的状态对象。
```typescript
// 错误:返回整个状态
const myNode = async (state: typeof State.State) => {
state.field = "updated";
return state; // 不要这样做!
};
// 正确做法:返回部分更新
const myNode = async (state: typeof State.State) => {
return { field: "updated" };
};
节点函数接受以下参数:
| 签名 | 何时使用 |
|---|---|
def node(state: State) | 仅需要状态的简单节点 |
def node(state: State, config: RunnableConfig) | 需要 thread_id、tags 或可配置值 |
def node(state: State, runtime: Runtime[Context]) | 需要运行时上下文、存储或 stream_writer |
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
def plain_node(state: State):
return {"results": "done"}
def node_with_config(state: State, config: RunnableConfig):
thread_id = config["configurable"]["thread_id"]
return {"results": f"Thread: {thread_id}"}
def node_with_runtime(state: State, runtime: Runtime[Context]):
user_id = runtime.context.user_id
return {"results": f"User: {user_id}"}
| 签名 | 何时使用 |
|---|---|
(state) => {...} | 仅需要状态的简单节点 |
(state, config) => {...} | 需要 thread_id、tags 或可配置值 |
import { GraphNode, StateSchema } from "@langchain/langgraph";
const plainNode: GraphNode<typeof State> = (state) => {
return { results: "done" };
};
const nodeWithConfig: GraphNode<typeof State> = (state, config) => {
const threadId = config?.configurable?.thread_id;
return { results: `Thread: ${threadId}` };
};
| 需求 | 边类型 | 何时使用 |
|---|---|---|
| 总是前往同一节点 | add_edge() | 固定的、确定性的流程 |
| 基于状态路由 | add_conditional_edges() | 动态分支 |
| 更新状态并路由 | Command | 在单个节点中组合逻辑 |
| 扇出到多个节点 | Send | 带动态输入的并行处理 |
class State(TypedDict): input: str output: str
def process_input(state: State) -> dict: return {"output": f"Processed: {state['input']}"}
def finalize(state: State) -> dict: return {"output": state["output"].upper()}
graph = ( StateGraph(State) .add_node("process", process_input) .add_node("finalize", finalize) .add_edge(START, "process") .add_edge("process", "finalize") .add_edge("finalize", END) .compile() )
result = graph.invoke({"input": "hello"}) print(result["output"]) # "PROCESSED: HELLO"
</python>
<typescript>
使用 addEdge 连接节点,并在调用前进行 compile 编译。
```typescript
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
input: z.string(),
output: z.string().default(""),
});
const processInput = async (state: typeof State.State) => {
return { output: `Processed: ${state.input}` };
};
const finalize = async (state: typeof State.State) => {
return { output: state.output.toUpperCase() };
};
const graph = new StateGraph(State)
.addNode("process", processInput)
.addNode("finalize", finalize)
.addEdge(START, "process")
.addEdge("process", "finalize")
.addEdge("finalize", END)
.compile();
const result = await graph.invoke({ input: "hello" });
console.log(result.output); // "PROCESSED: HELLO"
class State(TypedDict): query: str route: str result: str
def classify(state: State) -> dict: if "weather" in state["query"].lower(): return {"route": "weather"} return {"route": "general"}
def route_query(state: State) -> Literal["weather", "general"]: return state["route"]
graph = ( StateGraph(State) .add_node("classify", classify) .add_node("weather", lambda s: {"result": "Sunny, 72F"}) .add_node("general", lambda s: {"result": "General response"}) .add_edge(START, "classify") .add_conditional_edges("classify", route_query, ["weather", "general"]) .add_edge("weather", END) .add_edge("general", END) .compile() )
</python>
<typescript>
addConditionalEdges 根据函数返回值进行路由。
```typescript
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
query: z.string(),
route: z.string().default(""),
result: z.string().default(""),
});
const classify = async (state: typeof State.State) => {
if (state.query.toLowerCase().includes("weather")) {
return { route: "weather" };
}
return { route: "general" };
};
const routeQuery = (state: typeof State.State) => state.route;
const graph = new StateGraph(State)
.addNode("classify", classify)
.addNode("weather", async () => ({ result: "Sunny, 72F" }))
.addNode("general", async () => ({ result: "General response" }))
.addEdge(START, "classify")
.addConditionalEdges("classify", routeQuery, ["weather", "general"])
.addEdge("weather", END)
.addEdge("general", END)
.compile();
命令将状态更新和路由组合在单个返回值中。字段:
update : 要应用的状态更新(类似于从节点返回字典)goto : 接下来要导航到的节点名称resume : 在 interrupt() 之后恢复的值 — 参见人在回路技能class State(TypedDict): count: int result: str
def node_a(state: State) -> Command[Literal["node_b", "node_c"]]: """在一个返回值中更新状态并决定下一个节点。""" new_count = state["count"] + 1 if new_count > 5: return Command(update={"count": new_count}, goto="node_c") return Command(update={"count": new_count}, goto="node_b")
graph = ( StateGraph(State) .add_node("node_a", node_a) .add_node("node_b", lambda s: {"result": "B"}) .add_node("node_c", lambda s: {"result": "C"}) .add_edge(START, "node_a") .add_edge("node_b", END) .add_edge("node_c", END) .compile() )
</python>
<typescript>
返回带有 update 和 goto 的 Command,以将状态更改与路由结合起来。
```typescript
import { StateGraph, StateSchema, START, END, Command } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
count: z.number().default(0),
result: z.string().default(""),
});
const nodeA = async (state: typeof State.State) => {
const newCount = state.count + 1;
if (newCount > 5) {
return new Command({ update: { count: newCount }, goto: "node_c" });
}
return new Command({ update: { count: newCount }, goto: "node_b" });
};
const graph = new StateGraph(State)
.addNode("node_a", nodeA, { ends: ["node_b", "node_c"] })
.addNode("node_b", async () => ({ result: "B" }))
.addNode("node_c", async () => ({ result: "C" }))
.addEdge(START, "node_a")
.addEdge("node_b", END)
.addEdge("node_c", END)
.compile();
Python : 使用 Command[Literal["node_a", "node_b"]] 作为返回类型注解来声明有效的 goto 目标。
TypeScript : 将 { ends: ["node_a", "node_b"] } 作为第三个参数传递给 addNode 来声明有效的 goto 目标。
警告 : Command 只添加动态边 — 使用 add_edge / addEdge 定义的静态边仍然会执行。如果 node_a 返回 Command(goto="node_c") 并且你还有 graph.add_edge("node_a", "node_b"),那么 node_b 和 node_c 都会运行。
使用 Send 进行扇出:从条件边返回 [Send("worker", {...})] 以生成并行工作器。需要在结果字段上使用归约器。
class OrchestratorState(TypedDict): tasks: list[str] results: Annotated[list, operator.add] summary: str
def orchestrator(state: OrchestratorState): """将任务扇出给工作器。""" return [Send("worker", {"task": task}) for task in state["tasks"]]
def worker(state: dict) -> dict: return {"results": [f"Completed: {state['task']}"]}
def synthesize(state: OrchestratorState) -> dict: return {"summary": f"Processed {len(state['results'])} tasks"}
graph = ( StateGraph(OrchestratorState) .add_node("worker", worker) .add_node("synthesize", synthesize) .add_conditional_edges(START, orchestrator, ["worker"]) .add_edge("worker", "synthesize") .add_edge("synthesize", END) .compile() )
result = graph.invoke({"tasks": ["Task A", "Task B", "Task C"]})
</python>
<typescript>
使用 Send API 将任务扇出给并行工作器并聚合结果。
```typescript
import { Send, StateGraph, StateSchema, ReducedValue, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
tasks: z.array(z.string()),
results: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (curr, upd) => curr.concat(upd) }
),
summary: z.string().default(""),
});
const orchestrator = (state: typeof State.State) => {
return state.tasks.map((task) => new Send("worker", { task }));
};
const worker = async (state: { task: string }) => {
return { results: [`Completed: ${state.task}`] };
};
const synthesize = async (state: typeof State.State) => {
return { summary: `Processed ${state.results.length} tasks` };
};
const graph = new StateGraph(State)
.addNode("worker", worker)
.addNode("synthesize", synthesize)
.addConditionalEdges(START, orchestrator, ["worker"])
.addEdge("worker", "synthesize")
.addEdge("synthesize", END)
.compile();
class State(TypedDict): results: Annotated[list, operator.add] # 累积
</python>
<typescript>
使用 ReducedValue 来累积并行工作器的结果。
```typescript
// 错误:没有归约器
const State = new StateSchema({ results: z.array(z.string()) });
// 正确做法
const State = new StateSchema({
results: new ReducedValue(z.array(z.string()).default(() => []), { reducer: (curr, upd) => curr.concat(upd) }),
});
调用 graph.invoke(input, config) 以运行图直至完成并返回最终状态。
| 模式 | 它流式传输的内容 | 使用场景 |
|---|---|---|
values | 每个步骤后的完整状态 | 监控完整状态 |
updates | 状态增量 | 跟踪增量更新 |
messages | LLM 令牌 + 元数据 | 聊天界面 |
custom | 用户定义的数据 | 进度指示器 |
def my_node(state): writer = get_stream_writer() writer("Processing step 1...") # 执行工作 writer("Complete!") return {"result": "done"}
for chunk in graph.stream({"data": "test"}, stream_mode="custom"): print(chunk)
</python>
<typescript>
使用流写入器从节点内部发出自定义进度更新。
```typescript
import { getWriter } from "@langchain/langgraph";
const myNode = async (state: typeof State.State) => {
const writer = getWriter();
writer("Processing step 1...");
// 执行工作
writer("Complete!");
return { result: "done" };
};
for await (const chunk of graph.stream({ data: "test" }, { streamMode: "custom" })) {
console.log(chunk);
}
将错误类型与正确的处理程序匹配:
| 错误类型 | 谁修复 | 策略 | 示例 |
|---|---|---|---|
| 暂时性错误(网络、速率限制) | 系统 | RetryPolicy(max_attempts=3) | add_node(..., retry_policy=...) |
| LLM 可恢复错误(工具故障) | LLM | ToolNode(tools, handle_tool_errors=True) | 错误作为 ToolMessage 返回 |
| 用户可修复错误(信息缺失) | 人类 | interrupt({"message": ...}) | 收集缺失数据(参见 HITL 技能) |
| 意外错误 | 开发者 | 让其冒泡 | raise |
workflow.add_node( "search_documentation", search_documentation, retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0) )
</python>
<typescript>
对暂时性错误使用 retryPolicy。
```typescript
workflow.addNode(
"searchDocumentation",
searchDocumentation,
{
retryPolicy: { maxAttempts: 3, initialInterval: 1.0 },
},
);
tool_node = ToolNode(tools, handle_tool_errors=True)
workflow.add_node("tools", tool_node)
</python>
<typescript>
使用来自 @langchain/langgraph/prebuilt 的 ToolNode 来处理工具执行和错误。当 handleToolErrors 为 true 时,错误会作为 ToolMessages 返回,以便 LLM 可以恢复。
```typescript
import { ToolNode } from "@langchain/langgraph/prebuilt";
const toolNode = new ToolNode(tools, { handleToolErrors: true });
workflow.addNode("tools", toolNode);
graph = builder.compile() graph.invoke({"input": "test"})
</python>
<typescript>
必须使用 compile() 来获取可执行图。
```typescript
// 错误
await builder.invoke({ input: "test" });
// 正确做法
const graph = builder.compile();
await graph.invoke({ input: "test" });
def should_continue(state): return END if state["count"] > 10 else "node_b" builder.add_conditional_edges("node_a", should_continue)
</python>
<typescript>
使用带有 END 返回值的条件边来中断循环。
```typescript
// 错误:永远循环
builder.addEdge("node_a", "node_b").addEdge("node_b", "node_a");
// 正确做法
builder.addConditionalEdges("node_a", (state) => state.count > 10 ? END : "node_b");
def node_a(state) -> Command[Literal["node_b", "node_c"]]: return Command(goto="node_b")
builder.add_edge("node_a", START) # 错误! builder.add_edge("node_a", "entry") # 改用命名的入口节点
return {"items": ["item"]} # 列表归约器需要列表,而不是字符串
```typescript
// 始终 await graph.invoke() - 它返回一个 Promise
const result = await graph.invoke({ input: "test" });
// TS Command 节点需要 { ends } 来声明路由目标
builder.addNode("router", routerFn, { ends: ["node_b", "node_c"] });
每周安装量
2.7K
代码仓库
GitHub 星标数
431
首次出现
2026年3月3日
安全审计
安装于
claude-code2.2K
codex2.2K
cursor2.2K
opencode2.1K
github-copilot2.1K
gemini-cli2.1K
Graphs must be compile()d before execution.
Follow these 5 steps when building a new graph:
| Use LangGraph When | Use Alternatives When |
|---|---|
| Need fine-grained control over agent orchestration | Quick prototyping → LangChain agents |
| Building complex workflows with branching/loops | Simple stateless workflows → LangChain direct |
| Require human-in-the-loop, persistence | Batteries-included features → Deep Agents |
| Need | Solution | Example |
|---|---|---|
| Overwrite value | No reducer (default) | Simple fields like counters |
| Append to list | Reducer (operator.add / concat) | Message history, logs |
| Custom logic | Custom reducer function | Complex merging |
class State(TypedDict): name: str # Default: overwrites on update messages: Annotated[list, operator.add] # Appends to list total: Annotated[int, operator.add] # Sums integers
</python>
<typescript>
Use StateSchema with ReducedValue for accumulating arrays.
```typescript
import { StateSchema, ReducedValue, MessagesValue } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
name: z.string(), // Default: overwrites
messages: MessagesValue, // Built-in for messages
items: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (current, update) => current.concat(update) }
),
});
from typing import Annotated import operator
class State(TypedDict): messages: Annotated[list, operator.add]
</python>
<typescript>
Without ReducedValue, arrays are overwritten not appended.
```typescript
// WRONG: Array will be overwritten
const State = new StateSchema({
items: z.array(z.string()), // No reducer!
});
// Node 1: { items: ["A"] }, Node 2: { items: ["B"] }
// Final: { items: ["B"] } // A is lost!
// CORRECT: Use ReducedValue
const State = new StateSchema({
items: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (current, update) => current.concat(update) }
),
});
// Final: { items: ["A", "B"] }
def my_node(state: State) -> dict: return {"field": "updated"}
</python>
<typescript>
Return partial updates only, not the full state object.
```typescript
// WRONG: Returning entire state
const myNode = async (state: typeof State.State) => {
state.field = "updated";
return state; // Don't do this!
};
// CORRECT: Return partial updates
const myNode = async (state: typeof State.State) => {
return { field: "updated" };
};
Node functions accept these arguments:
| Signature | When to Use |
|---|---|
def node(state: State) | Simple nodes that only need state |
def node(state: State, config: RunnableConfig) | Need thread_id, tags, or configurable values |
def node(state: State, runtime: Runtime[Context]) | Need runtime context, store, or stream_writer |
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
def plain_node(state: State):
return {"results": "done"}
def node_with_config(state: State, config: RunnableConfig):
thread_id = config["configurable"]["thread_id"]
return {"results": f"Thread: {thread_id}"}
def node_with_runtime(state: State, runtime: Runtime[Context]):
user_id = runtime.context.user_id
return {"results": f"User: {user_id}"}
| Signature | When to Use |
|---|---|
(state) => {...} | Simple nodes that only need state |
(state, config) => {...} | Need thread_id, tags, or configurable values |
import { GraphNode, StateSchema } from "@langchain/langgraph";
const plainNode: GraphNode<typeof State> = (state) => {
return { results: "done" };
};
const nodeWithConfig: GraphNode<typeof State> = (state, config) => {
const threadId = config?.configurable?.thread_id;
return { results: `Thread: ${threadId}` };
};
| Need | Edge Type | When to Use |
|---|---|---|
| Always go to same node | add_edge() | Fixed, deterministic flow |
| Route based on state | add_conditional_edges() | Dynamic branching |
| Update state AND route | Command | Combine logic in single node |
| Fan-out to multiple nodes | Send | Parallel processing with dynamic inputs |
class State(TypedDict): input: str output: str
def process_input(state: State) -> dict: return {"output": f"Processed: {state['input']}"}
def finalize(state: State) -> dict: return {"output": state["output"].upper()}
graph = ( StateGraph(State) .add_node("process", process_input) .add_node("finalize", finalize) .add_edge(START, "process") .add_edge("process", "finalize") .add_edge("finalize", END) .compile() )
result = graph.invoke({"input": "hello"}) print(result["output"]) # "PROCESSED: HELLO"
</python>
<typescript>
Chain nodes with addEdge and compile before invoking.
```typescript
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
input: z.string(),
output: z.string().default(""),
});
const processInput = async (state: typeof State.State) => {
return { output: `Processed: ${state.input}` };
};
const finalize = async (state: typeof State.State) => {
return { output: state.output.toUpperCase() };
};
const graph = new StateGraph(State)
.addNode("process", processInput)
.addNode("finalize", finalize)
.addEdge(START, "process")
.addEdge("process", "finalize")
.addEdge("finalize", END)
.compile();
const result = await graph.invoke({ input: "hello" });
console.log(result.output); // "PROCESSED: HELLO"
class State(TypedDict): query: str route: str result: str
def classify(state: State) -> dict: if "weather" in state["query"].lower(): return {"route": "weather"} return {"route": "general"}
def route_query(state: State) -> Literal["weather", "general"]: return state["route"]
graph = ( StateGraph(State) .add_node("classify", classify) .add_node("weather", lambda s: {"result": "Sunny, 72F"}) .add_node("general", lambda s: {"result": "General response"}) .add_edge(START, "classify") .add_conditional_edges("classify", route_query, ["weather", "general"]) .add_edge("weather", END) .add_edge("general", END) .compile() )
</python>
<typescript>
addConditionalEdges routes based on function return value.
```typescript
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
query: z.string(),
route: z.string().default(""),
result: z.string().default(""),
});
const classify = async (state: typeof State.State) => {
if (state.query.toLowerCase().includes("weather")) {
return { route: "weather" };
}
return { route: "general" };
};
const routeQuery = (state: typeof State.State) => state.route;
const graph = new StateGraph(State)
.addNode("classify", classify)
.addNode("weather", async () => ({ result: "Sunny, 72F" }))
.addNode("general", async () => ({ result: "General response" }))
.addEdge(START, "classify")
.addConditionalEdges("classify", routeQuery, ["weather", "general"])
.addEdge("weather", END)
.addEdge("general", END)
.compile();
Command combines state updates and routing in a single return value. Fields:
update : State updates to apply (like returning a dict from a node)goto : Node name(s) to navigate to nextresume : Value to resume after interrupt() — see human-in-the-loop skillclass State(TypedDict): count: int result: str
def node_a(state: State) -> Command[Literal["node_b", "node_c"]]: """Update state AND decide next node in one return.""" new_count = state["count"] + 1 if new_count > 5: return Command(update={"count": new_count}, goto="node_c") return Command(update={"count": new_count}, goto="node_b")
graph = ( StateGraph(State) .add_node("node_a", node_a) .add_node("node_b", lambda s: {"result": "B"}) .add_node("node_c", lambda s: {"result": "C"}) .add_edge(START, "node_a") .add_edge("node_b", END) .add_edge("node_c", END) .compile() )
</python>
<typescript>
Return Command with update and goto to combine state change with routing.
```typescript
import { StateGraph, StateSchema, START, END, Command } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
count: z.number().default(0),
result: z.string().default(""),
});
const nodeA = async (state: typeof State.State) => {
const newCount = state.count + 1;
if (newCount > 5) {
return new Command({ update: { count: newCount }, goto: "node_c" });
}
return new Command({ update: { count: newCount }, goto: "node_b" });
};
const graph = new StateGraph(State)
.addNode("node_a", nodeA, { ends: ["node_b", "node_c"] })
.addNode("node_b", async () => ({ result: "B" }))
.addNode("node_c", async () => ({ result: "C" }))
.addEdge(START, "node_a")
.addEdge("node_b", END)
.addEdge("node_c", END)
.compile();
Python : Use Command[Literal["node_a", "node_b"]] as the return type annotation to declare valid goto destinations.
TypeScript : Pass { ends: ["node_a", "node_b"] } as the third argument to addNode to declare valid goto destinations.
Warning : Command only adds dynamic edges — static edges defined with add_edge / addEdge still execute. If node_a returns Command(goto="node_c") and you also have graph.add_edge("node_a", "node_b"), both node_b and node_c will run.
Fan-out with Send: return [Send("worker", {...})] from a conditional edge to spawn parallel workers. Requires a reducer on the results field.
class OrchestratorState(TypedDict): tasks: list[str] results: Annotated[list, operator.add] summary: str
def orchestrator(state: OrchestratorState): """Fan out tasks to workers.""" return [Send("worker", {"task": task}) for task in state["tasks"]]
def worker(state: dict) -> dict: return {"results": [f"Completed: {state['task']}"]}
def synthesize(state: OrchestratorState) -> dict: return {"summary": f"Processed {len(state['results'])} tasks"}
graph = ( StateGraph(OrchestratorState) .add_node("worker", worker) .add_node("synthesize", synthesize) .add_conditional_edges(START, orchestrator, ["worker"]) .add_edge("worker", "synthesize") .add_edge("synthesize", END) .compile() )
result = graph.invoke({"tasks": ["Task A", "Task B", "Task C"]})
</python>
<typescript>
Fan out tasks to parallel workers using the Send API and aggregate results.
```typescript
import { Send, StateGraph, StateSchema, ReducedValue, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
tasks: z.array(z.string()),
results: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (curr, upd) => curr.concat(upd) }
),
summary: z.string().default(""),
});
const orchestrator = (state: typeof State.State) => {
return state.tasks.map((task) => new Send("worker", { task }));
};
const worker = async (state: { task: string }) => {
return { results: [`Completed: ${state.task}`] };
};
const synthesize = async (state: typeof State.State) => {
return { summary: `Processed ${state.results.length} tasks` };
};
const graph = new StateGraph(State)
.addNode("worker", worker)
.addNode("synthesize", synthesize)
.addConditionalEdges(START, orchestrator, ["worker"])
.addEdge("worker", "synthesize")
.addEdge("synthesize", END)
.compile();
class State(TypedDict): results: Annotated[list, operator.add] # Accumulates
</python>
<typescript>
Use ReducedValue to accumulate parallel worker results.
```typescript
// WRONG: No reducer
const State = new StateSchema({ results: z.array(z.string()) });
// CORRECT
const State = new StateSchema({
results: new ReducedValue(z.array(z.string()).default(() => []), { reducer: (curr, upd) => curr.concat(upd) }),
});
Call graph.invoke(input, config) to run a graph to completion and return the final state.
| Mode | What it Streams | Use Case |
|---|---|---|
values | Full state after each step | Monitor complete state |
updates | State deltas | Track incremental updates |
messages | LLM tokens + metadata | Chat UIs |
custom | User-defined data | Progress indicators |
def my_node(state): writer = get_stream_writer() writer("Processing step 1...") # Do work writer("Complete!") return {"result": "done"}
for chunk in graph.stream({"data": "test"}, stream_mode="custom"): print(chunk)
</python>
<typescript>
Emit custom progress updates from within nodes using the stream writer.
```typescript
import { getWriter } from "@langchain/langgraph";
const myNode = async (state: typeof State.State) => {
const writer = getWriter();
writer("Processing step 1...");
// Do work
writer("Complete!");
return { result: "done" };
};
for await (const chunk of graph.stream({ data: "test" }, { streamMode: "custom" })) {
console.log(chunk);
}
Match the error type to the right handler:
| Error Type | Who Fixes | Strategy | Example |
|---|---|---|---|
| Transient (network, rate limits) | System | RetryPolicy(max_attempts=3) | add_node(..., retry_policy=...) |
| LLM-recoverable (tool failures) | LLM | ToolNode(tools, handle_tool_errors=True) | Error returned as ToolMessage |
| User-fixable (missing info) | Human | interrupt({"message": ...}) | Collect missing data (see HITL skill) |
| Unexpected | Developer |
workflow.add_node( "search_documentation", search_documentation, retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0) )
</python>
<typescript>
Use retryPolicy for transient errors.
```typescript
workflow.addNode(
"searchDocumentation",
searchDocumentation,
{
retryPolicy: { maxAttempts: 3, initialInterval: 1.0 },
},
);
tool_node = ToolNode(tools, handle_tool_errors=True)
workflow.add_node("tools", tool_node)
</python>
<typescript>
Use ToolNode from @langchain/langgraph/prebuilt to handle tool execution and errors. When handleToolErrors is true, errors are returned as ToolMessages so the LLM can recover.
```typescript
import { ToolNode } from "@langchain/langgraph/prebuilt";
const toolNode = new ToolNode(tools, { handleToolErrors: true });
workflow.addNode("tools", toolNode);
graph = builder.compile() graph.invoke({"input": "test"})
</python>
<typescript>
Must compile() to get executable graph.
```typescript
// WRONG
await builder.invoke({ input: "test" });
// CORRECT
const graph = builder.compile();
await graph.invoke({ input: "test" });
def should_continue(state): return END if state["count"] > 10 else "node_b" builder.add_conditional_edges("node_a", should_continue)
</python>
<typescript>
Use conditional edges with END return to break loops.
```typescript
// WRONG: Loops forever
builder.addEdge("node_a", "node_b").addEdge("node_b", "node_a");
// CORRECT
builder.addConditionalEdges("node_a", (state) => state.count > 10 ? END : "node_b");
def node_a(state) -> Command[Literal["node_b", "node_c"]]: return Command(goto="node_b")
builder.add_edge("node_a", START) # WRONG! builder.add_edge("node_a", "entry") # Use a named entry node instead
return {"items": ["item"]} # List for list reducer, not a string
```typescript
// Always await graph.invoke() - it returns a Promise
const result = await graph.invoke({ input: "test" });
// TS Command nodes need { ends } to declare routing destinations
builder.addNode("router", routerFn, { ends: ["node_b", "node_c"] });
Weekly Installs
2.7K
Repository
GitHub Stars
431
First Seen
Mar 3, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
claude-code2.2K
codex2.2K
cursor2.2K
opencode2.1K
github-copilot2.1K
gemini-cli2.1K
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
102,200 周安装
| Let bubble up |
raise |