langgraph-human-in-the-loop by langchain-ai/langchain-skills
npx skills add https://github.com/langchain-ai/langchain-skills --skill langgraph-human-in-the-loopinterrupt(value) — 暂停执行,将值传递给调用方
Command(resume=value) — 恢复执行,将值回传给 interrupt()中断功能正常工作需要满足三个条件:
checkpointer=InMemorySaver()(开发环境)或 PostgresSaver(生产环境)进行编译invoke/stream 调用中传递 广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
{"configurable": {"thread_id": "..."}}interrupt() 的值必须是可 JSON 序列化的interrupt(value) 暂停图执行。该值会在结果中的 __interrupt__ 下显示。Command(resume=value) 用于恢复执行 — resume 值会成为 interrupt() 的返回值。
关键点:当图恢复执行时,节点会从开头重新开始 — interrupt() 之前的所有代码都会重新运行。
class State(TypedDict):
approved: bool
def approval_node(state: State):
# 暂停并请求批准
approved = interrupt("Do you approve this action?")
# 恢复时,Command(resume=...) 的返回值会传到这里
return {"approved": approved}
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("approval", approval_node)
.add_edge(START, "approval")
.add_edge("approval", END)
.compile(checkpointer=checkpointer)
)
config = {"configurable": {"thread_id": "thread-1"}}
# 初始运行 — 遇到中断并暂停
result = graph.invoke({"approved": False}, config)
print(result["__interrupt__"])
# [Interrupt(value='Do you approve this action?')]
# 使用人工响应恢复执行
result = graph.invoke(Command(resume=True), config)
print(result["approved"]) # True
// 暂停执行以进行人工审核,并使用 Command 恢复执行。
import { interrupt, Command, MemorySaver, StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
approved: z.boolean().default(false),
});
const approvalNode = async (state: typeof State.State) => {
// 暂停并请求批准
const approved = interrupt("Do you approve this action?");
// 恢复时,Command({ resume }) 的返回值会传到这里
return { approved };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("approval", approvalNode)
.addEdge(START, "approval")
.addEdge("approval", END)
.compile({ checkpointer });
const config = { configurable: { thread_id: "thread-1" } };
// 初始运行 — 遇到中断并暂停
let result = await graph.invoke({ approved: false }, config);
console.log(result.__interrupt__);
// [{ value: 'Do you approve this action?', ... }]
// 使用人工响应恢复执行
result = await graph.invoke(new Command({ resume: true }), config);
console.log(result.approved); // true
一个常见模式:中断以显示草稿,然后根据人工决策进行路由。
class EmailAgentState(TypedDict):
email_content: str
draft_response: str
classification: dict
def human_review(state: EmailAgentState) -> Command[Literal["send_reply", "END"]]:
"""使用 interrupt 暂停以进行人工审核,并根据决策进行路由。"""
classification = state.get("classification", {})
# interrupt() 必须放在最前面 — 它之前的任何代码在恢复时都会重新运行
human_decision = interrupt({
"email_id": state.get("email_content", ""),
"draft_response": state.get("draft_response", ""),
"urgency": classification.get("urgency"),
"action": "请审核并批准/编辑此回复"
})
# 处理人工决策
if human_decision.get("approved"):
return Command(
update={"draft_response": human_decision.get("edited_response", state.get("draft_response", ""))},
goto="send_reply"
)
else:
# 拒绝 — 人工将直接处理
return Command(update={}, goto=END)
// 中断以进行人工审核,然后根据决策路由到发送或结束。
import { interrupt, Command, END, GraphNode } from "@langchain/langgraph";
const humanReview: GraphNode<typeof EmailAgentState> = async (state) => {
const classification = state.classification!;
// interrupt() 必须放在最前面 — 它之前的任何代码在恢复时都会重新运行
const humanDecision = interrupt({
emailId: state.emailContent,
draftResponse: state.responseText,
urgency: classification.urgency,
action: "请审核并批准/编辑此回复",
});
// 处理人工决策
if (humanDecision.approved) {
return new Command({
update: { responseText: humanDecision.editedResponse || state.responseText },
goto: "sendReply",
});
} else {
return new Command({ update: {}, goto: END });
}
};
在循环中使用 interrupt() 来验证人工输入,如果无效则重新提示。
def get_age_node(state):
prompt = "What is your age?"
while True:
answer = interrupt(prompt)
# 验证输入
if isinstance(answer, int) and answer > 0:
break
else:
# 无效输入 — 使用更具体的提示再次询问
prompt = f"'{answer}' is not a valid age. Please enter a positive number."
return {"age": answer}
每个 Command(resume=...) 调用提供下一个答案。如果无效,循环会以更清晰的消息重新中断。
config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config)
# __interrupt__: "What is your age?"
retry = graph.invoke(Command(resume="thirty"), config)
# __interrupt__: "'thirty' is not a valid age..."
final = graph.invoke(Command(resume=30), config)
print(final["age"]) # 30
const getAgeNode = (state: typeof State.State) => {
let prompt = "What is your age?";
while (true) {
const answer = interrupt(prompt);
// 验证输入
if (typeof answer === "number" && answer > 0) {
return { age: answer };
} else {
// 无效输入 — 使用更具体的提示再次询问
prompt = `'${answer}' is not a valid age. Please enter a positive number.`;
}
}
};
当并行分支各自调用 interrupt() 时,可以通过将每个中断 ID 映射到其恢复值,在单次调用中恢复所有中断。
# 通过将中断 ID 映射到值来恢复多个并行中断。
from typing import Annotated, TypedDict
import operator
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt
class State(TypedDict):
vals: Annotated[list[str], operator.add]
def node_a(state):
answer = interrupt("question_a")
return {"vals": [f"a:{answer}"]}
def node_b(state):
answer = interrupt("question_b")
return {"vals": [f"b:{answer}"]}
graph = (
StateGraph(State)
.add_node("a", node_a)
.add_node("b", node_b)
.add_edge(START, "a")
.add_edge(START, "b")
.add_edge("a", END)
.add_edge("b", END)
.compile(checkpointer=InMemorySaver())
)
config = {"configurable": {"thread_id": "1"}}
# 两个并行节点都遇到 interrupt() 并暂停
result = graph.invoke({"vals": []}, config)
# result["__interrupt__"] 包含两个带有 ID 的 Interrupt 对象
# 使用 id -> value 的映射一次性恢复所有待处理的中断
resume_map = {
i.id: f"answer for {i.value}"
for i in result["__interrupt__"]
}
result = graph.invoke(Command(resume=resume_map), config)
# result["vals"] = ["a:answer for question_a", "b:answer for question_b"]
const State = Annotation.Root({
vals: Annotation<string[]>({
reducer: (left, right) => left.concat(Array.isArray(right) ? right : [right]),
default: () => [],
}),
});
function nodeA(_state: typeof State.State) {
const answer = interrupt("question_a") as string;
return { vals: [`a:${answer}`] };
}
function nodeB(_state: typeof State.State) {
const answer = interrupt("question_b") as string;
return { vals: [`b:${answer}`] };
}
const graph = new StateGraph(State)
.addNode("a", nodeA)
.addNode("b", nodeB)
.addEdge(START, "a")
.addEdge(START, "b")
.addEdge("a", END)
.addEdge("b", END)
.compile({ checkpointer: new MemorySaver() });
const config = { configurable: { thread_id: "1" } };
const interruptedResult = await graph.invoke({ vals: [] }, config);
// 一次性恢复所有待处理的中断
const resumeMap: Record<string, string> = {};
if (isInterrupted(interruptedResult)) {
for (const i of interruptedResult[INTERRUPT]) {
if (i.id != null) {
resumeMap[i.id] = `answer for ${i.value}`;
}
}
}
const result = await graph.invoke(new Command({ resume: resumeMap }), config);
// result.vals = ["a:answer for question_a", "b:answer for question_b"]
用户可修复的错误使用 interrupt() 来暂停并收集缺失的数据 — 这是本技能涵盖的模式。有关完整的 4 层错误处理策略(RetryPolicy、Command 错误循环等),请参阅 fundamentals 技能。
当图恢复执行时,节点会从开头重新开始 — interrupt() 之前的所有代码都会重新运行。在子图中,父节点和子图节点都会重新执行。
应做事项:
interrupt() 之前使用 upsert(而非 insert)操作interrupt() 之后不应做事项:
interrupt() 之前创建新记录 — 每次恢复都会产生重复interrupt() 之前向列表追加内容 — 每次恢复都会产生重复条目# 正确:Upsert 是幂等的 — 在中断前使用是安全的
def node_a(state: State):
db.upsert_user(user_id=state["user_id"], status="pending_approval")
approved = interrupt("Approve this change?")
return {"approved": approved}
# 正确:副作用在中断之后 — 只运行一次
def node_a(state: State):
approved = interrupt("Approve this change?")
if approved:
db.create_audit_log(user_id=state["user_id"], action="approved")
return {"approved": approved}
# 错误:Insert 在每次恢复时都会创建重复记录!
def node_a(state: State):
audit_id = db.create_audit_log({ # 恢复时会再次运行!
"user_id": state["user_id"],
"action": "pending_approval",
})
approved = interrupt("Approve this change?")
return {"approved": approved}
// 正确:副作用在中断之后 — 只运行一次
const nodeA = async (state: typeof State.State) => {
const approved = interrupt("Approve this change?");
if (approved) {
await db.createAuditLog({
userId: state.userId,
action: "approved",
});
}
return { approved };
};
// 错误:Insert 在每次恢复时都会创建重复记录!
const nodeA = async (state: typeof State.State) => {
await db.createAuditLog({ // 恢复时会再次运行!
userId: state.userId,
action: "pending_approval",
});
const approved = interrupt("Approve this change?");
return { approved };
};
当子图包含 interrupt() 时,恢复执行会重新执行父节点(调用子图的节点)和子图节点(调用 interrupt() 的节点):
def node_in_parent_graph(state: State):
some_code() # <-- 恢复时重新执行
subgraph_result = subgraph.invoke(some_input)
# ...
def node_in_subgraph(state: State):
some_other_code() # <-- 恢复时也会重新执行
result = interrupt("What's your name?")
# ...
async function nodeInSubgraph(state: State) {
someOtherCode(); // <-- 恢复时也会重新执行
const result = interrupt("What's your name?");
// ...
}
Command(resume=...) 是唯一一种设计作为 invoke()/stream() 输入的 Command 模式。请勿将 Command(update=...) 作为输入传递 — 它会从最新的检查点恢复,并且图会看起来卡住。有关完整反模式解释,请参阅 fundamentals 技能。
# 中断功能需要检查点保存器。
# 错误
graph = builder.compile()
# 正确
graph = builder.compile(checkpointer=InMemorySaver())
// 正确
const graph = builder.compile({ checkpointer: new MemorySaver() });
# 使用 Command 从中断恢复(普通字典会重启图)。
# 错误
graph.invoke({"resume_data": "approve"}, config)
# 正确
graph.invoke(Command(resume="approve"), config)
// 正确
await graph.invoke(new Command({ resume: "approve" }), config);
Command(update=...) 作为 invoke 输入传递 — 图会看起来卡住(应使用普通字典)interrupt() 之前执行非幂等副作用 — 恢复时会产生重复interrupt() 之前的代码只运行一次 — 它会在每次恢复时重新运行每周安装量
2.4K
代码仓库
GitHub 星标数
423
首次出现
2026年3月3日
安全审计
安装于
claude-code2.0K
codex1.9K
cursor1.9K
github-copilot1.8K
opencode1.8K
gemini-cli1.8K
interrupt(value) — pauses execution, surfaces a value to the caller
Command(resume=value) — resumes execution, providing the value back to interrupt()Three things are required for interrupts to work:
checkpointer=InMemorySaver() (dev) or PostgresSaver (prod){"configurable": {"thread_id": "..."}} to every invoke/stream callinterrupt() must be JSON-serializableinterrupt(value) pauses the graph. The value surfaces in the result under __interrupt__. Command(resume=value) resumes — the resume value becomes the return value of interrupt().
Critical : when the graph resumes, the node restarts from the beginning — all code before interrupt() re-runs.
class State(TypedDict): approved: bool
def approval_node(state: State): # Pause and ask for approval approved = interrupt("Do you approve this action?") # When resumed, Command(resume=...) returns that value here return {"approved": approved}
checkpointer = InMemorySaver() graph = ( StateGraph(State) .add_node("approval", approval_node) .add_edge(START, "approval") .add_edge("approval", END) .compile(checkpointer=checkpointer) )
config = {"configurable": {"thread_id": "thread-1"}}
result = graph.invoke({"approved": False}, config) print(result["interrupt "])
result = graph.invoke(Command(resume=True), config) print(result["approved"]) # True
</python>
<typescript>
Pause execution for human review and resume with Command.
```typescript
import { interrupt, Command, MemorySaver, StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
approved: z.boolean().default(false),
});
const approvalNode = async (state: typeof State.State) => {
// Pause and ask for approval
const approved = interrupt("Do you approve this action?");
// When resumed, Command({ resume }) returns that value here
return { approved };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("approval", approvalNode)
.addEdge(START, "approval")
.addEdge("approval", END)
.compile({ checkpointer });
const config = { configurable: { thread_id: "thread-1" } };
// Initial run — hits interrupt and pauses
let result = await graph.invoke({ approved: false }, config);
console.log(result.__interrupt__);
// [{ value: 'Do you approve this action?', ... }]
// Resume with the human's response
result = await graph.invoke(new Command({ resume: true }), config);
console.log(result.approved); // true
A common pattern: interrupt to show a draft, then route based on the human's decision.
class EmailAgentState(TypedDict): email_content: str draft_response: str classification: dict
def human_review(state: EmailAgentState) -> Command[Literal["send_reply", "end "]]: """Pause for human review using interrupt and route based on decision.""" classification = state.get("classification", {})
# interrupt() must come first — any code before it will re-run on resume
human_decision = interrupt({
"email_id": state.get("email_content", ""),
"draft_response": state.get("draft_response", ""),
"urgency": classification.get("urgency"),
"action": "Please review and approve/edit this response"
})
# Process the human's decision
if human_decision.get("approved"):
return Command(
update={"draft_response": human_decision.get("edited_response", state.get("draft_response", ""))},
goto="send_reply"
)
else:
# Rejection — human will handle directly
return Command(update={}, goto=END)
</python>
<typescript>
Interrupt for human review, then route to send or end based on the decision.
```typescript
import { interrupt, Command, END, GraphNode } from "@langchain/langgraph";
const humanReview: GraphNode<typeof EmailAgentState> = async (state) => {
const classification = state.classification!;
// interrupt() must come first — any code before it will re-run on resume
const humanDecision = interrupt({
emailId: state.emailContent,
draftResponse: state.responseText,
urgency: classification.urgency,
action: "Please review and approve/edit this response",
});
// Process the human's decision
if (humanDecision.approved) {
return new Command({
update: { responseText: humanDecision.editedResponse || state.responseText },
goto: "sendReply",
});
} else {
return new Command({ update: {}, goto: END });
}
};
Use interrupt() in a loop to validate human input and re-prompt if invalid.
def get_age_node(state): prompt = "What is your age?"
while True:
answer = interrupt(prompt)
# Validate the input
if isinstance(answer, int) and answer > 0:
break
else:
# Invalid input — ask again with a more specific prompt
prompt = f"'{answer}' is not a valid age. Please enter a positive number."
return {"age": answer}
Each `Command(resume=...)` call provides the next answer. If invalid, the loop re-interrupts with a clearer message.
```python
config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config)
# __interrupt__: "What is your age?"
retry = graph.invoke(Command(resume="thirty"), config)
# __interrupt__: "'thirty' is not a valid age..."
final = graph.invoke(Command(resume=30), config)
print(final["age"]) # 30
const getAgeNode = (state: typeof State.State) => { let prompt = "What is your age?";
while (true) { const answer = interrupt(prompt);
// Validate the input
if (typeof answer === "number" && answer > 0) {
return { age: answer };
} else {
// Invalid input — ask again with a more specific prompt
prompt = `'${answer}' is not a valid age. Please enter a positive number.`;
}
} };
</typescript>
</ex-validation-loop>
---
## Multiple Interrupts
When parallel branches each call `interrupt()`, resume all of them in a single invocation by mapping each interrupt ID to its resume value.
<ex-multiple-interrupts>
<python>
Resume multiple parallel interrupts by mapping interrupt IDs to values.
```python
from typing import Annotated, TypedDict
import operator
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt
class State(TypedDict):
vals: Annotated[list[str], operator.add]
def node_a(state):
answer = interrupt("question_a")
return {"vals": [f"a:{answer}"]}
def node_b(state):
answer = interrupt("question_b")
return {"vals": [f"b:{answer}"]}
graph = (
StateGraph(State)
.add_node("a", node_a)
.add_node("b", node_b)
.add_edge(START, "a")
.add_edge(START, "b")
.add_edge("a", END)
.add_edge("b", END)
.compile(checkpointer=InMemorySaver())
)
config = {"configurable": {"thread_id": "1"}}
# Both parallel nodes hit interrupt() and pause
result = graph.invoke({"vals": []}, config)
# result["__interrupt__"] contains both Interrupt objects with IDs
# Resume all pending interrupts at once using a map of id -> value
resume_map = {
i.id: f"answer for {i.value}"
for i in result["__interrupt__"]
}
result = graph.invoke(Command(resume=resume_map), config)
# result["vals"] = ["a:answer for question_a", "b:answer for question_b"]
const State = Annotation.Root({ vals: Annotation<string[]>({ reducer: (left, right) => left.concat(Array.isArray(right) ? right : [right]), default: () => [], }), });
function nodeA(_state: typeof State.State) { const answer = interrupt("question_a") as string; return { vals: [a:${answer}] }; }
function nodeB(_state: typeof State.State) { const answer = interrupt("question_b") as string; return { vals: [b:${answer}] }; }
const graph = new StateGraph(State) .addNode("a", nodeA) .addNode("b", nodeB) .addEdge(START, "a") .addEdge(START, "b") .addEdge("a", END) .addEdge("b", END) .compile({ checkpointer: new MemorySaver() });
const config = { configurable: { thread_id: "1" } };
const interruptedResult = await graph.invoke({ vals: [] }, config);
// Resume all pending interrupts at once const resumeMap: Record<string, string> = {}; if (isInterrupted(interruptedResult)) { for (const i of interruptedResult[INTERRUPT]) { if (i.id != null) { resumeMap[i.id] = answer for ${i.value}; } } } const result = await graph.invoke(new Command({ resume: resumeMap }), config); // result.vals = ["a:answer for question_a", "b:answer for question_b"]
</typescript>
</ex-multiple-interrupts>
User-fixable errors use `interrupt()` to pause and collect missing data — that's the pattern covered by this skill. For the full 4-tier error handling strategy (RetryPolicy, Command error loops, etc.), see the **fundamentals** skill.
---
## Side Effects Before Interrupt Must Be Idempotent
When the graph resumes, the node restarts from the **beginning** — ALL code before `interrupt()` re-runs. In subgraphs, BOTH the parent node and the subgraph node re-execute.
<idempotency-rules>
**Do:**
- Use **upsert** (not insert) operations before `interrupt()`
- Use **check-before-create** patterns
- Place side effects **after** `interrupt()` when possible
- Separate side effects into their own nodes
**Don't:**
- Create new records before `interrupt()` — duplicates on each resume
- Append to lists before `interrupt()` — duplicate entries on each resume
</idempotency-rules>
<ex-idempotent-patterns>
<python>
Idempotent operations before interrupt vs non-idempotent (wrong).
```python
# GOOD: Upsert is idempotent — safe before interrupt
def node_a(state: State):
db.upsert_user(user_id=state["user_id"], status="pending_approval")
approved = interrupt("Approve this change?")
return {"approved": approved}
# GOOD: Side effect AFTER interrupt — only runs once
def node_a(state: State):
approved = interrupt("Approve this change?")
if approved:
db.create_audit_log(user_id=state["user_id"], action="approved")
return {"approved": approved}
# BAD: Insert creates duplicates on each resume!
def node_a(state: State):
audit_id = db.create_audit_log({ # Runs again on resume!
"user_id": state["user_id"],
"action": "pending_approval",
})
approved = interrupt("Approve this change?")
return {"approved": approved}
// GOOD: Side effect AFTER interrupt — only runs once const nodeA = async (state: typeof State.State) => { const approved = interrupt("Approve this change?"); if (approved) { await db.createAuditLog({ userId: state.userId, action: "approved" }); } return { approved }; };
// BAD: Insert creates duplicates on each resume! const nodeA = async (state: typeof State.State) => { await db.createAuditLog({ // Runs again on resume! userId: state.userId, action: "pending_approval", }); const approved = interrupt("Approve this change?"); return { approved }; };
</typescript>
</ex-idempotent-patterns>
<subgraph-interrupt-re-execution>
### Subgraph re-execution on resume
When a subgraph contains an `interrupt()`, resuming re-executes BOTH the parent node (that invoked the subgraph) AND the subgraph node (that called `interrupt()`):
<python>
```python
def node_in_parent_graph(state: State):
some_code() # <-- Re-executes on resume
subgraph_result = subgraph.invoke(some_input)
# ...
def node_in_subgraph(state: State):
some_other_code() # <-- Also re-executes on resume
result = interrupt("What's your name?")
# ...
async function nodeInSubgraph(state: State) { someOtherCode(); // <-- Also re-executes on resume const result = interrupt("What's your name?"); // ... }
</typescript>
</subgraph-interrupt-re-execution>
---
## Command(resume) Warning
`Command(resume=...)` is the **only** Command pattern intended as input to `invoke()`/`stream()`. Do NOT pass `Command(update=...)` as input — it resumes from the latest checkpoint and the graph appears stuck. See the fundamentals skill for the full antipattern explanation.
---
## Fixes
<fix-checkpointer-required-for-interrupts>
<python>
Checkpointer required for interrupt functionality.
```python
# WRONG
graph = builder.compile()
# CORRECT
graph = builder.compile(checkpointer=InMemorySaver())
// CORRECT const graph = builder.compile({ checkpointer: new MemorySaver() });
</typescript>
</fix-checkpointer-required-for-interrupts>
<fix-resume-with-command>
<python>
Use Command to resume from an interrupt (regular dict restarts graph).
```python
# WRONG
graph.invoke({"resume_data": "approve"}, config)
# CORRECT
graph.invoke(Command(resume="approve"), config)
// CORRECT await graph.invoke(new Command({ resume: "approve" }), config);
</typescript>
</fix-resume-with-command>
<boundaries>
### What You Should NOT Do
- Use interrupts without a checkpointer — will fail
- Resume without the same thread_id — creates a new thread instead of resuming
- Pass `Command(update=...)` as invoke input — graph appears stuck (use plain dict)
- Perform non-idempotent side effects before `interrupt()` — creates duplicates on resume
- Assume code before `interrupt()` only runs once — it re-runs every resume
</boundaries>
Weekly Installs
2.4K
Repository
GitHub Stars
423
First Seen
Mar 3, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
claude-code2.0K
codex1.9K
cursor1.9K
github-copilot1.8K
opencode1.8K
gemini-cli1.8K
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
102,200 周安装