langgraph-persistence by langchain-ai/langchain-skills
npx skills add https://github.com/langchain-ai/langchain-skills --skill langgraph-persistence两种内存类型:
| 检查点器 | 使用场景 | 生产就绪 |
|---|---|---|
InMemorySaver | 测试、开发 | 否 |
SqliteSaver | 本地开发 | 部分 |
PostgresSaver |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 生产环境 |
| 是 |
class State(TypedDict): messages: Annotated[list, operator.add]
def add_message(state: State) -> dict: return {"messages": ["Bot response"]}
checkpointer = InMemorySaver()
graph = ( StateGraph(State) .add_node("respond", add_message) .add_edge(START, "respond") .add_edge("respond", END) .compile(checkpointer=checkpointer) # 编译时传入 )
config = {"configurable": {"thread_id": "conversation-1"}}
result1 = graph.invoke({"messages": ["Hello"]}, config) print(len(result1["messages"])) # 2
result2 = graph.invoke({"messages": ["How are you?"]}, config) print(len(result2["messages"])) # 4 (之前的 + 新的)
</python>
<typescript>
设置一个带有内存检查点和基于线程状态持久化的基础图。
```typescript
import { MemorySaver, StateGraph, StateSchema, MessagesValue, START, END } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";
const State = new StateSchema({ messages: MessagesValue });
const addMessage = async (state: typeof State.State) => {
return { messages: [{ role: "assistant", content: "Bot response" }] };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("respond", addMessage)
.addEdge(START, "respond")
.addEdge("respond", END)
.compile({ checkpointer });
// 必须提供 thread_id
const config = { configurable: { thread_id: "conversation-1" } };
const result1 = await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
console.log(result1.messages.length); // 2
const result2 = await graph.invoke({ messages: [new HumanMessage("How are you?")] }, config);
console.log(result2.messages.length); // 4 (之前的 + 新的)
with PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/db" ) as checkpointer: checkpointer.setup() # 仅在首次使用时需要创建表 graph = builder.compile(checkpointer=checkpointer)
</python>
<typescript>
为生产环境部署配置 PostgreSQL 支持的检查点。
```typescript
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString(
"postgresql://user:pass@localhost/db"
);
await checkpointer.setup(); // 仅在首次使用时需要创建表
const graph = builder.compile({ checkpointer });
graph.invoke({"messages": ["Hi from Alice"]}, alice_config) graph.invoke({"messages": ["Hi from Bob"]}, bob_config)
</python>
<typescript>
演示不同线程 ID 之间的隔离状态。
```typescript
// 不同线程维护独立的状态
const aliceConfig = { configurable: { thread_id: "user-alice" } };
const bobConfig = { configurable: { thread_id: "user-bob" } };
await graph.invoke({ messages: [new HumanMessage("Hi from Alice")] }, aliceConfig);
await graph.invoke({ messages: [new HumanMessage("Hi from Bob")] }, bobConfig);
// Alice 的状态与 Bob 的状态是隔离的
result = graph.invoke({"messages": ["start"]}, config)
states = list(graph.get_state_history(config))
past = states[-2] result = graph.invoke(None, past.config) # None = 从检查点恢复
fork_config = graph.update_state(past.config, {"messages": ["edited"]}) result = graph.invoke(None, fork_config)
</python>
<typescript>
时间旅行:浏览检查点历史并从过去状态重放或分支。
```typescript
const config = { configurable: { thread_id: "session-1" } };
const result = await graph.invoke({ messages: ["start"] }, config);
// 浏览检查点历史(异步可迭代,收集到数组)
const states: Awaited<ReturnType<typeof graph.getState>>[] = [];
for await (const state of graph.getStateHistory(config)) {
states.push(state);
}
// 从过去的检查点重放
const past = states[states.length - 2];
const replayed = await graph.invoke(null, past.config); // null = 从检查点恢复
// 或者分支:在过去的检查点更新状态,然后恢复
const forkConfig = await graph.updateState(past.config, { messages: ["edited"] });
const forked = await graph.invoke(null, forkConfig);
graph.update_state(config, {"data": "manually_updated"})
result = graph.invoke(None, config)
</python>
<typescript>
在执行恢复前手动更新图状态。
```typescript
const config = { configurable: { thread_id: "session-1" } };
// 在恢复前修改状态
await graph.updateState(config, { data: "manually_updated" });
// 使用更新后的状态恢复
const result = await graph.invoke(null, config);
编译子图时,checkpointer 参数控制持久化行为。这对于使用中断、需要多轮记忆或并行运行的子图至关重要。
| 特性 | checkpointer=False | None(默认) | True |
|---|---|---|---|
| 中断(人机交互) | 否 | 是 | 是 |
| 多轮记忆 | 否 | 否 | 是 |
| 多次调用(不同子图) | 是 | 是 | 警告(可能发生命名空间冲突) |
| 多次调用(相同子图) | 是 | 是 | 否 |
| 状态检查 | 否 | 警告(仅当前调用) | 是 |
checkpointer=False — 子图不需要中断或持久化。最简单的选项,无检查点开销。None(默认 / 省略 checkpointer) — 子图需要 interrupt() 但不需要多轮记忆。每次调用都从头开始,但可以暂停/恢复。并行执行有效,因为每次调用都获得唯一的命名空间。checkpointer=True — 子图需要在多次调用之间记住状态(多轮对话)。每次调用都从上一次停止的地方继续。警告:有状态的子图(checkpointer=True)不支持在单个节点内多次调用相同的子图实例 — 这些调用会写入相同的检查点命名空间并发生冲突。
subgraph = subgraph_builder.compile()
subgraph = subgraph_builder.compile(checkpointer=True)
</python>
<typescript>
为你的子图选择合适的检查点模式。
```typescript
// 不需要中断 — 选择退出检查点
const subgraph = subgraphBuilder.compile({ checkpointer: false });
// 需要中断但不需要跨调用持久化(默认)
const subgraph = subgraphBuilder.compile();
// 需要跨调用持久化(有状态)
const subgraph = subgraphBuilder.compile({ checkpointer: true });
当多个不同的有状态子图并行运行时,将每个子图包装在自己的 StateGraph 中,并使用唯一的节点名称以实现稳定的命名空间隔离:
def create_sub_agent(model, *, name, **kwargs): """用唯一的节点名称包装代理以实现命名空间隔离。""" agent = create_agent(model=model, name=name, **kwargs) return ( StateGraph(MessagesState) .add_node(name, agent) # 唯一名称 -> 稳定命名空间 .add_edge("start ", name) .compile() )
fruit_agent = create_sub_agent( "gpt-4.1-mini", name="fruit_agent", tools=[fruit_info], prompt="...", checkpointer=True, ) veggie_agent = create_sub_agent( "gpt-4.1-mini", name="veggie_agent", tools=[veggie_info], prompt="...", checkpointer=True, )
</python>
<typescript>
```typescript
import { StateGraph, StateSchema, MessagesValue, START } from "@langchain/langgraph";
function createSubAgent(model: string, { name, ...kwargs }: { name: string; [key: string]: any }) {
const agent = createAgent({ model, name, ...kwargs });
return new StateGraph(new StateSchema({ messages: MessagesValue }))
.addNode(name, agent) // 唯一名称 -> 稳定命名空间
.addEdge(START, name)
.compile();
}
const fruitAgent = createSubAgent("gpt-4.1-mini", {
name: "fruit_agent", tools: [fruitInfo], prompt: "...", checkpointer: true,
});
const veggieAgent = createSubAgent("gpt-4.1-mini", {
name: "veggie_agent", tools: [veggieInfo], prompt: "...", checkpointer: true,
});
注意:作为节点添加的子图(通过 add_node)已经自动获得基于名称的命名空间,不需要此包装器。
store = InMemoryStore()
store.put(("alice", "preferences"), "language", {"preference": "short responses"})
from langgraph.runtime import Runtime
def respond(state, runtime: Runtime): prefs = runtime.store.get((state["user_id"], "preferences"), "language") return {"response": f"Using preference: {prefs.value}"}
graph = builder.compile(checkpointer=checkpointer, store=store)
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-1"}}) graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-2"}}) # 相同的偏好!
</python>
<typescript>
使用存储进行跨线程内存共享,以在对话间共享用户偏好。
```typescript
import { MemoryStore } from "@langchain/langgraph";
const store = new MemoryStore();
// 保存用户偏好(在所有线程中可用)
await store.put(["alice", "preferences"], "language", { preference: "short responses" });
// 带有存储的节点 — 通过运行时访问
const respond = async (state: typeof State.State, runtime: any) => {
const item = await runtime.store?.get(["alice", "preferences"], "language");
return { response: `Using preference: ${item?.value?.preference}` };
};
// 同时使用检查点器和存储进行编译
const graph = builder.compile({ checkpointer, store });
// 两个线程访问相同的长期内存
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-1" } });
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-2" } }); // 相同的偏好!
store = InMemoryStore()
store.put(("user-123", "facts"), "location", {"city": "San Francisco"}) # 放置项 = store.get(("user-123", "facts"), "location") # 获取 results = store.search(("user-123", "facts"), filter={"city": "San Francisco"}) # 搜索 store.delete(("user-123", "facts"), "location") # 删除
</python>
</ex-store-operations>
---
## 修复
<fix-thread-id-required>
<python>
始终在配置中提供 thread_id 以启用状态持久化。
```python
# 错误:没有 thread_id - 状态未持久化!
graph.invoke({"messages": ["Hello"]})
graph.invoke({"messages": ["What did I say?"]}) # 不记得!
# 正确:始终提供 thread_id
config = {"configurable": {"thread_id": "session-1"}}
graph.invoke({"messages": ["Hello"]}, config)
graph.invoke({"messages": ["What did I say?"]}, config) # 记得!
// 正确:始终提供 thread_id const config = { configurable: { thread_id: "session-1" } }; await graph.invoke({ messages: [new HumanMessage("Hello")] }, config); await graph.invoke({ messages: [new HumanMessage("What did I say?")] }, config); // 记得!
</typescript>
</fix-thread-id-required>
<fix-inmemory-not-for-production>
<python>
生产环境持久化请使用 PostgresSaver 而不是 InMemorySaver。
```python
# 错误:进程重启时数据丢失
checkpointer = InMemorySaver() # 仅内存!
# 正确:生产环境使用持久化存储
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://...") as checkpointer:
checkpointer.setup() # 仅在首次使用时需要创建表
graph = builder.compile(checkpointer=checkpointer)
// 正确:生产环境使用持久化存储 import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres"; const checkpointer = PostgresSaver.fromConnString("postgresql://..."); await checkpointer.setup(); // 仅在首次使用时需要创建表
</typescript>
</fix-inmemory-not-for-production>
<fix-update-state-with-reducers>
<python>
使用 Overwrite 替换状态值,而不是通过归约器传递。
```python
from langgraph.types import Overwrite
# 带有归约器的状态:items: Annotated[list, operator.add]
# 当前状态:{"items": ["A", "B"]}
# update_state 通过归约器传递
graph.update_state(config, {"items": ["C"]}) # 结果:["A", "B", "C"] - 追加!
# 要替换,请使用 Overwrite
graph.update_state(config, {"items": Overwrite(["C"])}) # 结果:["C"] - 替换
// 带有归约器的状态:items 使用 concat 归约器 // 当前状态:{ items: ["A", "B"] }
// updateState 通过归约器传递 await graph.updateState(config, { items: ["C"] }); // 结果:["A", "B", "C"] - 追加!
// 要替换,请使用 Overwrite await graph.updateState(config, { items: new Overwrite(["C"]) }); // 结果:["C"] - 替换
</typescript>
</fix-update-state-with-reducers>
<fix-store-injection>
<python>
在图节点中通过 Runtime 对象访问存储。
```python
# 错误:节点中无法访问存储
def my_node(state):
store.put(...) # NameError! store 未定义
# 正确:通过 runtime 访问存储
from langgraph.runtime import Runtime
def my_node(state, runtime: Runtime):
runtime.store.put(...) # 正确的存储实例
// 正确:通过 runtime 访问存储 const myNode = async (state, runtime) => { await runtime.store?.put(...); // 正确的存储实例 };
</typescript>
</fix-store-injection>
<boundaries>
### 不应做的事情
- 在生产环境中使用 `InMemorySaver` — 重启时数据丢失;请使用 `PostgresSaver`
- 忘记 `thread_id` — 没有它状态将无法持久化
- 期望 `update_state` 绕过归约器 — 它会通过它们;使用 `Overwrite` 来替换
- 在一个节点内并行运行相同的有状态子图(`checkpointer=True`)— 命名空间冲突
- 在节点中直接访问存储 — 通过 `Runtime` 参数使用 `runtime.store`
</boundaries>
每周安装量
2.4K
仓库
GitHub 星标
423
首次出现
2026年3月3日
安全审计
安装于
claude-code2.0K
codex1.9K
cursor1.9K
github-copilot1.9K
opencode1.9K
gemini-cli1.9K
Two memory types:
| Checkpointer | Use Case | Production Ready |
|---|---|---|
InMemorySaver | Testing, development | No |
SqliteSaver | Local development | Partial |
PostgresSaver | Production | Yes |
class State(TypedDict): messages: Annotated[list, operator.add]
def add_message(state: State) -> dict: return {"messages": ["Bot response"]}
checkpointer = InMemorySaver()
graph = ( StateGraph(State) .add_node("respond", add_message) .add_edge(START, "respond") .add_edge("respond", END) .compile(checkpointer=checkpointer) # Pass at compile time )
config = {"configurable": {"thread_id": "conversation-1"}}
result1 = graph.invoke({"messages": ["Hello"]}, config) print(len(result1["messages"])) # 2
result2 = graph.invoke({"messages": ["How are you?"]}, config) print(len(result2["messages"])) # 4 (previous + new)
</python>
<typescript>
Set up a basic graph with in-memory checkpointing and thread-based state persistence.
```typescript
import { MemorySaver, StateGraph, StateSchema, MessagesValue, START, END } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";
const State = new StateSchema({ messages: MessagesValue });
const addMessage = async (state: typeof State.State) => {
return { messages: [{ role: "assistant", content: "Bot response" }] };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("respond", addMessage)
.addEdge(START, "respond")
.addEdge("respond", END)
.compile({ checkpointer });
// ALWAYS provide thread_id
const config = { configurable: { thread_id: "conversation-1" } };
const result1 = await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
console.log(result1.messages.length); // 2
const result2 = await graph.invoke({ messages: [new HumanMessage("How are you?")] }, config);
console.log(result2.messages.length); // 4 (previous + new)
with PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/db" ) as checkpointer: checkpointer.setup() # only needed on first use to create tables graph = builder.compile(checkpointer=checkpointer)
</python>
<typescript>
Configure PostgreSQL-backed checkpointing for production deployments.
```typescript
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString(
"postgresql://user:pass@localhost/db"
);
await checkpointer.setup(); // only needed on first use to create tables
const graph = builder.compile({ checkpointer });
graph.invoke({"messages": ["Hi from Alice"]}, alice_config) graph.invoke({"messages": ["Hi from Bob"]}, bob_config)
</python>
<typescript>
Demonstrate isolated state between different thread IDs.
```typescript
// Different threads maintain separate state
const aliceConfig = { configurable: { thread_id: "user-alice" } };
const bobConfig = { configurable: { thread_id: "user-bob" } };
await graph.invoke({ messages: [new HumanMessage("Hi from Alice")] }, aliceConfig);
await graph.invoke({ messages: [new HumanMessage("Hi from Bob")] }, bobConfig);
// Alice's state is isolated from Bob's
result = graph.invoke({"messages": ["start"]}, config)
states = list(graph.get_state_history(config))
past = states[-2] result = graph.invoke(None, past.config) # None = resume from checkpoint
fork_config = graph.update_state(past.config, {"messages": ["edited"]}) result = graph.invoke(None, fork_config)
</python>
<typescript>
Time travel: browse checkpoint history and replay or fork from a past state.
```typescript
const config = { configurable: { thread_id: "session-1" } };
const result = await graph.invoke({ messages: ["start"] }, config);
// Browse checkpoint history (async iterable, collect to array)
const states: Awaited<ReturnType<typeof graph.getState>>[] = [];
for await (const state of graph.getStateHistory(config)) {
states.push(state);
}
// Replay from a past checkpoint
const past = states[states.length - 2];
const replayed = await graph.invoke(null, past.config); // null = resume from checkpoint
// Or fork: update state at a past checkpoint, then resume
const forkConfig = await graph.updateState(past.config, { messages: ["edited"] });
const forked = await graph.invoke(null, forkConfig);
graph.update_state(config, {"data": "manually_updated"})
result = graph.invoke(None, config)
</python>
<typescript>
Manually update graph state before resuming execution.
```typescript
const config = { configurable: { thread_id: "session-1" } };
// Modify state before resuming
await graph.updateState(config, { data: "manually_updated" });
// Resume with updated state
const result = await graph.invoke(null, config);
When compiling a subgraph, the checkpointer parameter controls persistence behavior. This is critical for subgraphs that use interrupts, need multi-turn memory, or run in parallel.
| Feature | checkpointer=False | None (default) | True |
|---|---|---|---|
| Interrupts (HITL) | No | Yes | Yes |
| Multi-turn memory | No | No | Yes |
| Multiple calls (different subgraphs) | Yes | Yes | Warning (namespace conflicts possible) |
| Multiple calls (same subgraph) | Yes | Yes | No |
| State inspection | No | Warning (current invocation only) | Yes |
checkpointer=False — Subgraph doesn't need interrupts or persistence. Simplest option, no checkpoint overhead.None (default / omit checkpointer) — Subgraph needs interrupt() but not multi-turn memory. Each invocation starts fresh but can pause/resume. Parallel execution works because each invocation gets a unique namespace.checkpointer=True — Subgraph needs to remember state across invocations (multi-turn conversations). Each call picks up where the last left off.Warning : Stateful subgraphs (checkpointer=True) do NOT support calling the same subgraph instance multiple times within a single node — the calls write to the same checkpoint namespace and conflict.
subgraph = subgraph_builder.compile()
subgraph = subgraph_builder.compile(checkpointer=True)
</python>
<typescript>
Choose the right checkpointer mode for your subgraph.
```typescript
// No interrupts needed — opt out of checkpointing
const subgraph = subgraphBuilder.compile({ checkpointer: false });
// Need interrupts but not cross-invocation persistence (default)
const subgraph = subgraphBuilder.compile();
// Need cross-invocation persistence (stateful)
const subgraph = subgraphBuilder.compile({ checkpointer: true });
When multiple different stateful subgraphs run in parallel, wrap each in its own StateGraph with a unique node name for stable namespace isolation:
def create_sub_agent(model, *, name, **kwargs): """Wrap an agent with a unique node name for namespace isolation.""" agent = create_agent(model=model, name=name, **kwargs) return ( StateGraph(MessagesState) .add_node(name, agent) # unique name -> stable namespace .add_edge("start ", name) .compile() )
fruit_agent = create_sub_agent( "gpt-4.1-mini", name="fruit_agent", tools=[fruit_info], prompt="...", checkpointer=True, ) veggie_agent = create_sub_agent( "gpt-4.1-mini", name="veggie_agent", tools=[veggie_info], prompt="...", checkpointer=True, )
</python>
<typescript>
```typescript
import { StateGraph, StateSchema, MessagesValue, START } from "@langchain/langgraph";
function createSubAgent(model: string, { name, ...kwargs }: { name: string; [key: string]: any }) {
const agent = createAgent({ model, name, ...kwargs });
return new StateGraph(new StateSchema({ messages: MessagesValue }))
.addNode(name, agent) // unique name -> stable namespace
.addEdge(START, name)
.compile();
}
const fruitAgent = createSubAgent("gpt-4.1-mini", {
name: "fruit_agent", tools: [fruitInfo], prompt: "...", checkpointer: true,
});
const veggieAgent = createSubAgent("gpt-4.1-mini", {
name: "veggie_agent", tools: [veggieInfo], prompt: "...", checkpointer: true,
});
Note: Subgraphs added as nodes (via add_node) already get name-based namespaces automatically and don't need this wrapper.
store = InMemoryStore()
store.put(("alice", "preferences"), "language", {"preference": "short responses"})
from langgraph.runtime import Runtime
def respond(state, runtime: Runtime): prefs = runtime.store.get((state["user_id"], "preferences"), "language") return {"response": f"Using preference: {prefs.value}"}
graph = builder.compile(checkpointer=checkpointer, store=store)
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-1"}}) graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-2"}}) # Same preferences!
</python>
<typescript>
Use a Store for cross-thread memory to share user preferences across conversations.
```typescript
import { MemoryStore } from "@langchain/langgraph";
const store = new MemoryStore();
// Save user preference (available across ALL threads)
await store.put(["alice", "preferences"], "language", { preference: "short responses" });
// Node with store — access via runtime
const respond = async (state: typeof State.State, runtime: any) => {
const item = await runtime.store?.get(["alice", "preferences"], "language");
return { response: `Using preference: ${item?.value?.preference}` };
};
// Compile with BOTH checkpointer and store
const graph = builder.compile({ checkpointer, store });
// Both threads access same long-term memory
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-1" } });
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-2" } }); // Same preferences!
store = InMemoryStore()
store.put(("user-123", "facts"), "location", {"city": "San Francisco"}) # Put item = store.get(("user-123", "facts"), "location") # Get results = store.search(("user-123", "facts"), filter={"city": "San Francisco"}) # Search store.delete(("user-123", "facts"), "location") # Delete
</python>
</ex-store-operations>
---
## Fixes
<fix-thread-id-required>
<python>
Always provide thread_id in config to enable state persistence.
```python
# WRONG: No thread_id - state NOT persisted!
graph.invoke({"messages": ["Hello"]})
graph.invoke({"messages": ["What did I say?"]}) # Doesn't remember!
# CORRECT: Always provide thread_id
config = {"configurable": {"thread_id": "session-1"}}
graph.invoke({"messages": ["Hello"]}, config)
graph.invoke({"messages": ["What did I say?"]}, config) # Remembers!
// CORRECT: Always provide thread_id const config = { configurable: { thread_id: "session-1" } }; await graph.invoke({ messages: [new HumanMessage("Hello")] }, config); await graph.invoke({ messages: [new HumanMessage("What did I say?")] }, config); // Remembers!
</typescript>
</fix-thread-id-required>
<fix-inmemory-not-for-production>
<python>
Use PostgresSaver instead of InMemorySaver for production persistence.
```python
# WRONG: Data lost on process restart
checkpointer = InMemorySaver() # In-memory only!
# CORRECT: Use persistent storage for production
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://...") as checkpointer:
checkpointer.setup() # only needed on first use to create tables
graph = builder.compile(checkpointer=checkpointer)
// CORRECT: Use persistent storage for production import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres"; const checkpointer = PostgresSaver.fromConnString("postgresql://..."); await checkpointer.setup(); // only needed on first use to create tables
</typescript>
</fix-inmemory-not-for-production>
<fix-update-state-with-reducers>
<python>
Use Overwrite to replace state values instead of passing through reducers.
```python
from langgraph.types import Overwrite
# State with reducer: items: Annotated[list, operator.add]
# Current state: {"items": ["A", "B"]}
# update_state PASSES THROUGH reducers
graph.update_state(config, {"items": ["C"]}) # Result: ["A", "B", "C"] - Appended!
# To REPLACE instead, use Overwrite
graph.update_state(config, {"items": Overwrite(["C"])}) # Result: ["C"] - Replaced
// State with reducer: items uses concat reducer // Current state: { items: ["A", "B"] }
// updateState PASSES THROUGH reducers await graph.updateState(config, { items: ["C"] }); // Result: ["A", "B", "C"] - Appended!
// To REPLACE instead, use Overwrite await graph.updateState(config, { items: new Overwrite(["C"]) }); // Result: ["C"] - Replaced
</typescript>
</fix-update-state-with-reducers>
<fix-store-injection>
<python>
Access store via the Runtime object in graph nodes.
```python
# WRONG: Store not available in node
def my_node(state):
store.put(...) # NameError! store not defined
# CORRECT: Access store via runtime
from langgraph.runtime import Runtime
def my_node(state, runtime: Runtime):
runtime.store.put(...) # Correct store instance
// CORRECT: Access store via runtime const myNode = async (state, runtime) => { await runtime.store?.put(...); // Correct store instance };
</typescript>
</fix-store-injection>
<boundaries>
### What You Should NOT Do
- Use `InMemorySaver` in production — data lost on restart; use `PostgresSaver`
- Forget `thread_id` — state won't persist without it
- Expect `update_state` to bypass reducers — it passes through them; use `Overwrite` to replace
- Run the same stateful subgraph (`checkpointer=True`) in parallel within one node — namespace conflict
- Access store directly in a node — use `runtime.store` via the `Runtime` param
</boundaries>
Weekly Installs
2.4K
Repository
GitHub Stars
423
First Seen
Mar 3, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykFail
Installed on
claude-code2.0K
codex1.9K
cursor1.9K
github-copilot1.9K
opencode1.9K
gemini-cli1.9K
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
102,200 周安装