langgraph-architecture by existential-birds/beagle
npx skills add https://github.com/existential-birds/beagle --skill langgraph-architecture| 场景 | 替代方案 | 原因 |
|---|---|---|
| 单一 LLM 调用 | 直接 API 调用 | 开销不合理 |
| 线性流水线 | LangChain LCEL | 更简单的抽象 |
| 无状态工具使用 | 函数调用 | 不需要持久化 |
| 简单的 RAG | LangChain 检索器 | 内置模式 |
| 批处理 |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 异步任务 |
| 不同的执行模型 |
| TypedDict | Pydantic |
|---|---|
| 轻量级,更快 | 运行时验证 |
| 类字典访问 | 属性访问 |
| 无验证开销 | 类型强制转换 |
| 序列化更简单 | 复杂的嵌套模型 |
推荐:在大多数情况下使用 TypedDict。当需要验证或复杂的嵌套结构时使用 Pydantic。
| 使用场景 | 归约器 | 示例 |
|---|---|---|
| 聊天消息 | add_messages | 处理 ID、RemoveMessage |
| 简单追加 | operator.add | Annotated[list, operator.add] |
| 保留最新值 | None (LastValue) | field: str |
| 自定义合并 | Lambda | Annotated[list, lambda a, b: ...] |
| 覆盖列表 | Overwrite | 绕过归约器 |
# 小状态 (< 1MB) - 放入状态
class State(TypedDict):
messages: Annotated[list, add_messages]
context: str
# 大数据 - 使用存储
class State(TypedDict):
messages: Annotated[list, add_messages]
document_ref: str # 存储引用
def node(state, *, store: BaseStore):
doc = store.get(namespace, state["document_ref"])
# 处理而不使检查点膨胀
单一图适用于:
子图适用于:
| 条件边 | 命令 |
|---|---|
| 基于状态的路由 | 路由 + 状态更新 |
| 独立的路由器函数 | 节点内的决策 |
| 可视化更清晰 | 更灵活 |
| 标准模式 | 动态目的地 |
# 条件边 - 当路由是重点时
def router(state) -> Literal["a", "b"]:
return "a" if condition else "b"
builder.add_conditional_edges("node", router)
# 命令 - 当需要结合路由与更新时
def node(state) -> Command:
return Command(goto="next", update={"step": state["step"] + 1})
静态边 (add_edge):
动态路由 (add_conditional_edges, Command, Send):
| 检查点选择器 | 使用场景 | 特性 |
|---|---|---|
InMemorySaver | 仅测试 | 重启后丢失 |
SqliteSaver | 开发 | 单文件,本地 |
PostgresSaver | 生产 | 可扩展,并发 |
| 自定义 | 特殊需求 | 实现 BaseCheckpointSaver |
# 完全持久化 (默认)
graph = builder.compile(checkpointer=checkpointer)
# 子图选项
subgraph = sub_builder.compile(
checkpointer=None, # 从父图继承
checkpointer=True, # 独立检查点
checkpointer=False, # 无检查点 (原子性运行)
)
最适合:
┌─────────────┐
│ 监督者 │
└──────┬──────┘
┌────────┬───┴───┬────────┐
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│智能体1│ │智能体2│ │智能体3│ │智能体4│
└──────┘ └──────┘ └──────┘ └──────┘
最适合:
┌──────┐ ┌──────┐
│智能体1│◄───►│智能体2│
└──┬───┘ └───┬──┘
│ │
▼ ▼
┌──────┐ ┌──────┐
│智能体3│◄───►│智能体4│
└──────┘ └──────┘
最适合:
┌────────┐ ┌────────┐ ┌────────┐
│研究│───►│规划│───►│执行│
└────────┘ └────────┘ └────────┘
| 模式 | 使用场景 | 数据 |
|---|---|---|
updates | UI 更新 | 仅节点输出 |
values | 状态检查 | 每一步的完整状态 |
messages | 聊天用户体验 | LLM 令牌 |
custom | 进度/日志 | 通过 StreamWriter 的自定义数据 |
debug | 调试 | 任务 + 检查点 |
# 从子图流式传输
async for chunk in graph.astream(
input,
stream_mode="updates",
subgraphs=True # 包含子图事件
):
namespace, data = chunk # namespace 表示深度
| 策略 | 使用场景 |
|---|---|
interrupt_before | 操作前审批 |
interrupt_after | 完成后审查 |
节点中的 interrupt() | 动态、上下文相关的暂停 |
# 简单恢复 (同一线程)
graph.invoke(None, config)
# 带值恢复
graph.invoke(Command(resume="approved"), config)
# 恢复特定中断
graph.invoke(Command(resume={interrupt_id: value}), config)
# 修改状态并恢复
graph.update_state(config, {"field": "new_value"})
graph.invoke(None, config)
# 每个节点的重试
RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_interval=60.0,
max_attempts=3,
retry_on=lambda e: isinstance(e, (APIError, TimeoutError))
)
# 多个策略 (首次匹配优先)
builder.add_node("node", fn, retry_policy=[
RetryPolicy(retry_on=RateLimitError, max_attempts=5),
RetryPolicy(retry_on=Exception, max_attempts=2),
])
def node_with_fallback(state):
try:
return primary_operation(state)
except PrimaryError:
return fallback_operation(state)
# 或者对复杂的回退路由使用条件边
def route_on_error(state) -> Literal["retry", "fallback", "__end__"]:
if state.get("error") and state["attempts"] < 3:
return "retry"
elif state.get("error"):
return "fallback"
return END
# 设置递归限制
config = {"recursion_limit": 50}
graph.invoke(input, config)
# 在状态中跟踪剩余步骤
class State(TypedDict):
remaining_steps: RemainingSteps
def check_budget(state):
if state["remaining_steps"] < 5:
return "wrap_up"
return "continue"
实施前检查:
每周安装量
185
仓库
GitHub 星标数
40
首次出现
2026年1月20日
安全审计
安装于
opencode144
codex143
gemini-cli142
github-copilot128
cursor120
claude-code115
| Scenario | Alternative | Why |
|---|---|---|
| Single LLM call | Direct API call | Overhead not justified |
| Linear pipeline | LangChain LCEL | Simpler abstraction |
| Stateless tool use | Function calling | No persistence needed |
| Simple RAG | LangChain retrievers | Built-in patterns |
| Batch processing | Async tasks | Different execution model |
| TypedDict | Pydantic |
|---|---|
| Lightweight, faster | Runtime validation |
| Dict-like access | Attribute access |
| No validation overhead | Type coercion |
| Simpler serialization | Complex nested models |
Recommendation : Use TypedDict for most cases. Use Pydantic when you need validation or complex nested structures.
| Use Case | Reducer | Example |
|---|---|---|
| Chat messages | add_messages | Handles IDs, RemoveMessage |
| Simple append | operator.add | Annotated[list, operator.add] |
| Keep latest | None (LastValue) | field: str |
| Custom merge | Lambda | Annotated[list, lambda a, b: ...] |
| Overwrite list |
# SMALL STATE (< 1MB) - Put in state
class State(TypedDict):
messages: Annotated[list, add_messages]
context: str
# LARGE DATA - Use Store
class State(TypedDict):
messages: Annotated[list, add_messages]
document_ref: str # Reference to store
def node(state, *, store: BaseStore):
doc = store.get(namespace, state["document_ref"])
# Process without bloating checkpoints
Single Graph when:
Subgraphs when:
| Conditional Edges | Command |
|---|---|
| Routing based on state | Routing + state update |
| Separate router function | Decision in node |
| Clearer visualization | More flexible |
| Standard patterns | Dynamic destinations |
# Conditional Edge - when routing is the focus
def router(state) -> Literal["a", "b"]:
return "a" if condition else "b"
builder.add_conditional_edges("node", router)
# Command - when combining routing with updates
def node(state) -> Command:
return Command(goto="next", update={"step": state["step"] + 1})
Static Edges (add_edge):
Dynamic Routing (add_conditional_edges, Command, Send):
| Checkpointer | Use Case | Characteristics |
|---|---|---|
InMemorySaver | Testing only | Lost on restart |
SqliteSaver | Development | Single file, local |
PostgresSaver | Production | Scalable, concurrent |
| Custom | Special needs | Implement BaseCheckpointSaver |
# Full persistence (default)
graph = builder.compile(checkpointer=checkpointer)
# Subgraph options
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent
checkpointer=True, # Independent checkpointing
checkpointer=False, # No checkpointing (runs atomically)
)
Best for:
Clear hierarchy
Centralized decision making
Different agent specializations
┌─────────────┐
│ Supervisor │
└──────┬──────┘
┌────────┬───┴───┬────────┐
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │Agent1│ │Agent2│ │Agent3│ │Agent4│ └──────┘ └──────┘ └──────┘ └──────┘
Best for:
Collaborative agents
No clear hierarchy
Flexible communication
┌──────┐ ┌──────┐ │Agent1│◄───►│Agent2│ └──┬───┘ └───┬──┘ │ │ ▼ ▼ ┌──────┐ ┌──────┐ │Agent3│◄───►│Agent4│ └──────┘ └──────┘
Best for:
Sequential specialization
Clear stage transitions
Different capabilities per stage
┌────────┐ ┌────────┐ ┌────────┐ │Research│───►│Planning│───►│Execute │ └────────┘ └────────┘ └────────┘
| Mode | Use Case | Data |
|---|---|---|
updates | UI updates | Node outputs only |
values | State inspection | Full state each step |
messages | Chat UX | LLM tokens |
custom | Progress/logs | Your data via StreamWriter |
debug | Debugging | Tasks + checkpoints |
# Stream from subgraphs
async for chunk in graph.astream(
input,
stream_mode="updates",
subgraphs=True # Include subgraph events
):
namespace, data = chunk # namespace indicates depth
| Strategy | Use Case |
|---|---|
interrupt_before | Approval before action |
interrupt_after | Review after completion |
interrupt() in node | Dynamic, contextual pauses |
# Simple resume (same thread)
graph.invoke(None, config)
# Resume with value
graph.invoke(Command(resume="approved"), config)
# Resume specific interrupt
graph.invoke(Command(resume={interrupt_id: value}), config)
# Modify state and resume
graph.update_state(config, {"field": "new_value"})
graph.invoke(None, config)
# Per-node retry
RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_interval=60.0,
max_attempts=3,
retry_on=lambda e: isinstance(e, (APIError, TimeoutError))
)
# Multiple policies (first match wins)
builder.add_node("node", fn, retry_policy=[
RetryPolicy(retry_on=RateLimitError, max_attempts=5),
RetryPolicy(retry_on=Exception, max_attempts=2),
])
def node_with_fallback(state):
try:
return primary_operation(state)
except PrimaryError:
return fallback_operation(state)
# Or use conditional edges for complex fallback routing
def route_on_error(state) -> Literal["retry", "fallback", "__end__"]:
if state.get("error") and state["attempts"] < 3:
return "retry"
elif state.get("error"):
return "fallback"
return END
# Set recursion limit
config = {"recursion_limit": 50}
graph.invoke(input, config)
# Track remaining steps in state
class State(TypedDict):
remaining_steps: RemainingSteps
def check_budget(state):
if state["remaining_steps"] < 5:
return "wrap_up"
return "continue"
Before implementing:
Weekly Installs
185
Repository
GitHub Stars
40
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode144
codex143
gemini-cli142
github-copilot128
cursor120
claude-code115
AI Elements:基于shadcn/ui的AI原生应用组件库,快速构建对话界面
62,200 周安装
Angular最佳实践指南:性能优化、变更检测与包大小优化
173 周安装
More Trees自动化工具包 - 通过Rube MCP实现Composio生态自动化操作
1 周安装
Mopinion自动化集成:通过Rube MCP和Composio实现反馈管理自动化
1 周安装
Mapbox自动化工具:通过Rube MCP和Composio实现地理信息系统自动化操作
1 周安装
Listclean自动化工具:通过Rube MCP和Composio实现高效数据清理
1 周安装
Linkup自动化:通过Rube MCP和Composio工具包实现Linkup操作自动化
1 周安装
Overwrite| Bypass reducer |