langgraph-implementation by existential-birds/beagle
npx skills add https://github.com/existential-birds/beagle --skill langgraph-implementationLangGraph 使用基于图的架构构建有状态、多参与者的智能体应用:
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import MessagesState, add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Send, interrupt, RetryPolicy
from typing import Annotated
from typing_extensions import TypedDict
class State(TypedDict):
counter: int # LastValue - 存储最后一个值
messages: Annotated[list, operator.add] # Reducer - 追加列表
items: Annotated[list, lambda a, b: a + [b] if b else a] # 自定义 reducer
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from langgraph.graph.message import MessagesState
class State(MessagesState):
# 继承:messages: Annotated[list[AnyMessage], add_messages]
user_id: str
context: dict
from pydantic import BaseModel
class State(BaseModel):
messages: Annotated[list, add_messages]
validated_field: str # Pydantic 在赋值时验证
builder = StateGraph(State)
# 添加节点 - 接收状态并返回部分更新的函数
builder.add_node("process", process_fn)
builder.add_node("decide", decide_fn)
# 添加边
builder.add_edge(START, "process")
builder.add_edge("process", "decide")
builder.add_edge("decide", END)
# 编译
graph = builder.compile()
def my_node(state: State) -> dict:
"""节点接收完整状态,返回部分更新。"""
return {"counter": state["counter"] + 1}
# 访问配置
def my_node(state: State, config: RunnableConfig) -> dict:
thread_id = config["configurable"]["thread_id"]
return {"result": process(state, thread_id)}
# 使用运行时上下文 (v0.6+)
def my_node(state: State, runtime: Runtime[Context]) -> dict:
user_id = runtime.context.get("user_id")
return {"result": user_id}
from typing import Literal
def router(state: State) -> Literal["agent", "tools", "__end__"]:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return END # 或 "__end__"
builder.add_conditional_edges("agent", router)
# 使用 path_map 进行可视化
builder.add_conditional_edges(
"agent",
router,
path_map={"agent": "agent", "tools": "tools", "__end__": END}
)
from langgraph.types import Command
def dynamic_node(state: State) -> Command[Literal["next", "__end__"]]:
if state["should_continue"]:
return Command(goto="next", update={"step": state["step"] + 1})
return Command(goto=END)
# 必须声明目标以进行可视化
builder.add_node("dynamic", dynamic_node, destinations=["next", END])
from langgraph.types import Send
def fan_out(state: State) -> list[Send]:
"""使用不同的输入路由到多个节点实例。"""
return [Send("worker", {"item": item}) for item in state["items"]]
builder.add_conditional_edges(START, fan_out)
builder.add_edge("worker", "aggregate") # 工作节点汇聚
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver # 开发环境
from langgraph.checkpoint.postgres import PostgresSaver # 生产环境
# 内存中(仅用于测试)
graph = builder.compile(checkpointer=InMemorySaver())
# SQLite(开发环境)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
# 基于线程的调用
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"messages": [...]}, config)
# 获取当前状态
state = graph.get_state(config)
# 获取状态历史
for state in graph.get_state_history(config):
print(state.values, state.next)
# 手动更新状态
graph.update_state(config, {"key": "new_value"}, as_node="node_name")
from langgraph.types import interrupt, Command
def review_node(state: State) -> dict:
# 暂停并将值呈现给客户端
human_input = interrupt({"question": "请审阅", "data": state["draft"]})
return {"approved": human_input["approved"]}
# 使用命令恢复
graph.invoke(Command(resume={"approved": True}), config)
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"], # 在节点前暂停
interrupt_after=["agent"], # 在节点后暂停
)
# 检查待处理的中断
state = graph.get_state(config)
if state.next: # 有待处理的节点
# 恢复
graph.invoke(None, config)
# 流模式:"values", "updates", "custom", "messages", "debug"
# 仅更新(节点输出)
for chunk in graph.stream(input, stream_mode="updates"):
print(chunk) # {"node_name": {"key": "value"}}
# 每一步后的完整状态
for chunk in graph.stream(input, stream_mode="values"):
print(chunk)
# 多种模式
for mode, chunk in graph.stream(input, stream_mode=["updates", "messages"]):
if mode == "messages":
print("令牌:", chunk)
# 节点内的自定义流式处理
from langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer({"progress": 0.5}) # 自定义事件
return {"result": "done"}
# 定义子图
sub_builder = StateGraph(SubState)
sub_builder.add_node("step", step_fn)
sub_builder.add_edge(START, "step")
subgraph = sub_builder.compile()
# 在父图中用作节点
parent_builder = StateGraph(ParentState)
parent_builder.add_node("subprocess", subgraph)
parent_builder.add_edge(START, "subprocess")
# 子图检查点
subgraph = sub_builder.compile(
checkpointer=None, # 从父图继承(默认)
# checkpointer=True, # 使用持久化检查点
# checkpointer=False, # 禁用检查点
)
from langgraph.types import RetryPolicy, CachePolicy
retry = RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_attempts=3,
retry_on=ValueError, # 或可调用对象:lambda e: isinstance(e, ValueError)
)
cache = CachePolicy(ttl=3600) # 缓存 1 小时
builder.add_node("risky", risky_fn, retry_policy=retry, cache_policy=cache)
from langgraph.prebuilt import create_react_agent, ToolNode
# 简单智能体
graph = create_react_agent(
model="anthropic:claude-3-5-sonnet",
tools=[my_tool],
prompt="你是一个乐于助人的助手",
checkpointer=InMemorySaver(),
)
# 自定义工具节点
tool_node = ToolNode([tool1, tool2])
builder.add_node("tools", tool_node)
def should_continue(state) -> Literal["tools", "__end__"]:
if state["messages"][-1].tool_calls:
return "tools"
return END
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
# 当多个节点共享同一个触发器时,它们会并行执行
builder.add_edge(START, "node_a")
builder.add_edge(START, "node_b") # 与 node_a 并行运行
builder.add_edge(["node_a", "node_b"], "join") # 等待两者完成
查看 PATTERNS.md 以获取高级模式,包括多智能体系统、分层图和复杂工作流。
每周安装次数
119
代码仓库
GitHub 星标数
40
首次出现
2026年1月20日
安全审计
安装于
gemini-cli97
codex96
opencode96
claude-code87
cursor86
github-copilot83
LangGraph builds stateful, multi-actor agent applications using a graph-based architecture:
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import MessagesState, add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Send, interrupt, RetryPolicy
from typing import Annotated
from typing_extensions import TypedDict
class State(TypedDict):
counter: int # LastValue - stores last value
messages: Annotated[list, operator.add] # Reducer - appends lists
items: Annotated[list, lambda a, b: a + [b] if b else a] # Custom reducer
from langgraph.graph.message import MessagesState
class State(MessagesState):
# Inherits: messages: Annotated[list[AnyMessage], add_messages]
user_id: str
context: dict
from pydantic import BaseModel
class State(BaseModel):
messages: Annotated[list, add_messages]
validated_field: str # Pydantic validates on assignment
builder = StateGraph(State)
# Add nodes - functions that take state, return partial updates
builder.add_node("process", process_fn)
builder.add_node("decide", decide_fn)
# Add edges
builder.add_edge(START, "process")
builder.add_edge("process", "decide")
builder.add_edge("decide", END)
# Compile
graph = builder.compile()
def my_node(state: State) -> dict:
"""Node receives full state, returns partial update."""
return {"counter": state["counter"] + 1}
# With config access
def my_node(state: State, config: RunnableConfig) -> dict:
thread_id = config["configurable"]["thread_id"]
return {"result": process(state, thread_id)}
# With Runtime context (v0.6+)
def my_node(state: State, runtime: Runtime[Context]) -> dict:
user_id = runtime.context.get("user_id")
return {"result": user_id}
from typing import Literal
def router(state: State) -> Literal["agent", "tools", "__end__"]:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return END # or "__end__"
builder.add_conditional_edges("agent", router)
# With path_map for visualization
builder.add_conditional_edges(
"agent",
router,
path_map={"agent": "agent", "tools": "tools", "__end__": END}
)
from langgraph.types import Command
def dynamic_node(state: State) -> Command[Literal["next", "__end__"]]:
if state["should_continue"]:
return Command(goto="next", update={"step": state["step"] + 1})
return Command(goto=END)
# Must declare destinations for visualization
builder.add_node("dynamic", dynamic_node, destinations=["next", END])
from langgraph.types import Send
def fan_out(state: State) -> list[Send]:
"""Route to multiple node instances with different inputs."""
return [Send("worker", {"item": item}) for item in state["items"]]
builder.add_conditional_edges(START, fan_out)
builder.add_edge("worker", "aggregate") # Workers converge
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver # Development
from langgraph.checkpoint.postgres import PostgresSaver # Production
# In-memory (testing only)
graph = builder.compile(checkpointer=InMemorySaver())
# SQLite (development)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
# Thread-based invocation
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"messages": [...]}, config)
# Get current state
state = graph.get_state(config)
# Get state history
for state in graph.get_state_history(config):
print(state.values, state.next)
# Update state manually
graph.update_state(config, {"key": "new_value"}, as_node="node_name")
from langgraph.types import interrupt, Command
def review_node(state: State) -> dict:
# Pause and surface value to client
human_input = interrupt({"question": "Please review", "data": state["draft"]})
return {"approved": human_input["approved"]}
# Resume with Command
graph.invoke(Command(resume={"approved": True}), config)
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"], # Pause before node
interrupt_after=["agent"], # Pause after node
)
# Check pending interrupts
state = graph.get_state(config)
if state.next: # Has pending nodes
# Resume
graph.invoke(None, config)
# Stream modes: "values", "updates", "custom", "messages", "debug"
# Updates only (node outputs)
for chunk in graph.stream(input, stream_mode="updates"):
print(chunk) # {"node_name": {"key": "value"}}
# Full state after each step
for chunk in graph.stream(input, stream_mode="values"):
print(chunk)
# Multiple modes
for mode, chunk in graph.stream(input, stream_mode=["updates", "messages"]):
if mode == "messages":
print("Token:", chunk)
# Custom streaming from within nodes
from langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer({"progress": 0.5}) # Custom event
return {"result": "done"}
# Define subgraph
sub_builder = StateGraph(SubState)
sub_builder.add_node("step", step_fn)
sub_builder.add_edge(START, "step")
subgraph = sub_builder.compile()
# Use as node in parent
parent_builder = StateGraph(ParentState)
parent_builder.add_node("subprocess", subgraph)
parent_builder.add_edge(START, "subprocess")
# Subgraph checkpointing
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent (default)
# checkpointer=True, # Use persistent checkpointing
# checkpointer=False, # Disable checkpointing
)
from langgraph.types import RetryPolicy, CachePolicy
retry = RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_attempts=3,
retry_on=ValueError, # Or callable: lambda e: isinstance(e, ValueError)
)
cache = CachePolicy(ttl=3600) # Cache for 1 hour
builder.add_node("risky", risky_fn, retry_policy=retry, cache_policy=cache)
from langgraph.prebuilt import create_react_agent, ToolNode
# Simple agent
graph = create_react_agent(
model="anthropic:claude-3-5-sonnet",
tools=[my_tool],
prompt="You are a helpful assistant",
checkpointer=InMemorySaver(),
)
# Custom tool node
tool_node = ToolNode([tool1, tool2])
builder.add_node("tools", tool_node)
def should_continue(state) -> Literal["tools", "__end__"]:
if state["messages"][-1].tool_calls:
return "tools"
return END
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
# Multiple nodes execute in parallel when they share the same trigger
builder.add_edge(START, "node_a")
builder.add_edge(START, "node_b") # Runs parallel with node_a
builder.add_edge(["node_a", "node_b"], "join") # Wait for both
See PATTERNS.md for advanced patterns including multi-agent systems, hierarchical graphs, and complex workflows.
Weekly Installs
119
Repository
GitHub Stars
40
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
gemini-cli97
codex96
opencode96
claude-code87
cursor86
github-copilot83
AI Elements:基于shadcn/ui的AI原生应用组件库,快速构建对话界面
66,200 周安装