The Agent Skills Directory
npx skills add https://smithery.ai/skills/markusdegen/production-readiness89% 的组织已为智能体实施了可观测性,但 32% 的组织将质量问题视为投入生产的主要障碍。生产级多智能体系统需要优先处理以下方面:
多智能体系统的成本可能是单智能体系统的 2-4 倍。系统化优化可实现 30-50% 的成本降低。
{
"token_budgets": {
"Planner": {
"max_input_tokens": 4000,
"max_output_tokens": 2000,
"model": "claude-3-haiku"
},
"Executor": {
"max_input_tokens": 8000,
"max_output_tokens": 4000,
"model": "claude-3-sonnet"
},
"Verifier": {
"max_input_tokens": 6000,
"max_output_tokens": 1000,
"model": "claude-3-haiku"
}
}
}
def check_budget_before_call(agent_id: str, estimated_tokens: int):
budget = token_budgets[agent_id]
if estimated_tokens > budget.max_input_tokens:
# 在继续之前压缩上下文
compressed = compress_context(context, budget.max_input_tokens)
return compressed
return context
将任务路由到适当的模型层级:
| 任务类型 | 模型 | 成本节省 |
|---|---|---|
| 分类 | Haiku | 90% |
| 摘要 | Haiku | 90% |
| 简单提取 | Haiku | 90% |
| 复杂推理 | Sonnet | 基线 |
| 关键决策 | Opus | -200%(值得) |
| 策略 | 令牌节省 | 使用场景 |
|---|---|---|
| 滑动窗口 + 摘要 | 60-70% | 超过 5 轮的对话 |
| 简洁输出格式 | 70-85% | 智能体间链式调用 |
| 仅 ID 格式 | 85-95% | 批量操作 |
| 选择性上下文加载 | 40-60% | 大型知识库 |
问题:智能体累积上下文而不进行修剪。
影响:令牌成本呈指数级增长,系统在达到上下文限制时失败。
修复:实现 max_history_tokens 预算,压缩旧上下文。
捕获完整的执行路径:
{
"trace": {
"trace_id": "workflow_uuid",
"spans": [
{
"span_id": "uuid",
"parent_span_id": null,
"operation": "workflow.start",
"agent": "Orchestrator",
"start_time": "ISO8601",
"end_time": "ISO8601",
"attributes": {
"user_request": "...",
"tokens_input": 150
}
},
{
"span_id": "uuid",
"parent_span_id": "prev_uuid",
"operation": "plan.create",
"agent": "Planner",
"attributes": {
"tokens_input": 1200,
"tokens_output": 800,
"model": "claude-3-haiku",
"latency_ms": 450
}
}
]
}
}
| 类别 | 指标 |
|---|---|
| 成本 | 每个智能体的令牌数、每个工作流的成本、模型利用率 |
| 延迟 | 每个智能体的延迟、总工作流延迟、队列等待时间 |
| 质量 | 成功率、验证通过率、重试率 |
| 协调 | 消息量、冲突率、升级率 |
在关键点记录状态:
def log_state_transition(
agent_id: str,
operation: str,
state_before: dict,
state_after: dict
):
"""记录状态变更以实现重放能力。"""
logger.info({
"event": "state_transition",
"agent": agent_id,
"operation": operation,
"state_diff": compute_diff(state_before, state_after),
"timestamp": datetime.utcnow().isoformat()
})
使用 MAST 分类法进行自动化故障分类:
| 类别 | 百分比 | 示例 |
|---|---|---|
| 规范 | 41.77% | 缺少输入、模糊输出、无约束 |
| 对齐 | 36.94% | 静默忽略、角色混淆、状态冲突 |
| 验证 | 21.30% | 弱检查、过早终止、自我评分 |
静态权限列表是不够的。实现动态治理:
1. 经过认证的身份和目的
{
"agent": {
"id": "planner_001",
"name": "TaskPlanner",
"purpose": "将需求分解为任务图",
"risk_level": "low",
"data_access": ["requirements", "constraints"],
"forbidden_data": ["pii", "credentials"]
}
}
2. 中央策略引擎
def check_policy(agent_id: str, action: str, resource: str) -> bool:
"""检查智能体是否被允许执行该操作。"""
agent = get_agent_config(agent_id)
policy = get_policy(agent.role)
checks = [
action_aligns_with_purpose(action, agent.purpose),
resource_in_allowed_data(resource, agent.data_access),
not_in_forbidden(resource, agent.forbidden_data),
not_chaining_too_many_actions(agent_id)
]
return all(checks)
3. 带审计的运行时强制执行
def enforce_and_audit(agent_id: str, action: str, resource: str):
"""拦截、强制执行并审计智能体操作。"""
# 检查策略
permitted = check_policy(agent_id, action, resource)
# 无论结果如何都进行审计
audit_log.append({
"timestamp": datetime.utcnow().isoformat(),
"agent": agent_id,
"action": action,
"resource": resource,
"permitted": permitted,
"policy_version": current_policy_version
})
if not permitted:
raise PolicyViolation(f"{agent_id} cannot {action} on {resource}")
对人工智能智能体的关键要求:
| 要求 | 实施 |
|---|---|
| 透明度 | 记录智能体的能力和限制 |
| 人工监督 | 升级路径,关键决策中有人工参与 |
| 风险管理 | 按风险对智能体进行分类,实施相称的控制措施 |
| 技术文档 | 维护规范、审计日志、测试结果 |
| 准确性/稳健性 | 验证流程、故障处理 |
SOC 2 Type II 审计现在会仔细审查:
原则:将错误反馈到上下文中,以便智能体能够自我纠正。实施错误计数器以防止陷入循环。
当智能体遇到错误时,将其作为上下文反馈以进行自我纠正:
class ErrorContextManager:
MAX_ERRORS_PER_TOOL = 3
MAX_TOTAL_ERRORS = 5
def __init__(self):
self.error_counts = defaultdict(int)
self.error_history = []
def record_error(self, tool: str, error: dict) -> dict:
"""记录错误并返回重试的上下文。"""
self.error_counts[tool] += 1
self.error_history.append({
"tool": tool,
"error": error,
"timestamp": datetime.utcnow().isoformat(),
"attempt": self.error_counts[tool]
})
# 检查是否陷入循环
if self.error_counts[tool] >= self.MAX_ERRORS_PER_TOOL:
raise ToolSpinOut(f"{tool} failed {self.MAX_ERRORS_PER_TOOL} times")
if sum(self.error_counts.values()) >= self.MAX_TOTAL_ERRORS:
raise WorkflowSpinOut(f"Total errors exceeded {self.MAX_TOTAL_ERRORS}")
# 返回用于自我纠正的上下文
return {
"previous_error": error,
"attempt_number": self.error_counts[tool],
"suggestion": f"Previous attempt failed: {error['message']}. Try different approach."
}
以结构化格式将错误反馈给智能体:
{
"role": "system",
"content": "PREVIOUS ATTEMPT FAILED\n\nError: [error message]\nAttempt: 2 of 3\n\nPlease try a different approach. Consider:\n- Alternative method X\n- Check assumption Y\n- Verify input Z"
}
| 计数器 | 阈值 | 操作 |
|---|---|---|
| 每个工具的错误 | 3 | 停止使用该工具,尝试替代方案 |
| 总错误数 | 5 | 暂停工作流,请求人工帮助 |
| 重试但无进展 | 2 | 强制采用不同方法 |
| 重复出现相同错误 | 2 | 立即升级 |
consecutive_errors = 0
while True:
try:
result = await handle_next_step(thread, next_step)
thread["events"].append({
"type": next_step["intent"] + '_result',
"data": result,
})
# 成功!重置错误计数器
consecutive_errors = 0
except Exception as e:
consecutive_errors += 1
if consecutive_errors < 3:
# 将错误反馈到上下文并重试
thread["events"].append({
"type": 'error',
"data": format_error_for_context(e),
})
else:
# 错误太多 - 中断循环,升级
await escalate_to_human(thread, e)
break
有关详细的错误处理程序,请参阅 references/ops-runbook.md。
原则:在用户所在的地方提供服务。智能体触发应该是渠道无关的。
class WorkflowTrigger:
"""
渠道无关的工作流触发。
同一工作流可以从任何渠道启动。
"""
def trigger(self,
input_data: dict,
channel: str,
user_id: str,
metadata: dict = None) -> str:
"""
从任何渠道触发工作流。
Args:
input_data: 实际的请求/任务
channel: 来源渠道(slack, email, cli, api, webhook)
user_id: 触发者
metadata: 渠道特定的元数据
Returns:
workflow_id 用于追踪
"""
# 无论渠道如何,都规范化输入
normalized = self.normalize_input(input_data, channel)
# 创建带有渠道上下文的工作流
workflow_id = self.workflow_controller.launch(
input=normalized,
context={
"channel": channel,
"user_id": user_id,
"reply_to": self.get_reply_destination(channel, metadata)
}
)
return workflow_id
| 渠道 | 触发方法 | 响应方法 |
|---|---|---|
| Slack | 斜杠命令、提及、私信 | 线程回复 |
| 电子邮件 | 发送至 agent@domain.com | 回复邮件 |
| CLI | mas run <workflow> | 标准输出 |
| API | POST /workflows | Webhook 或轮询 |
| Webhook | 来自外部系统的 POST | 回调 URL |
| 仪表板 | 按钮点击 | UI 通知 |
┌─────────┐ ┌─────────────────┐ ┌──────────────┐
│ Slack │────►│ │────►│ │
├─────────┤ │ Unified │ │ Workflow │
│ Email │────►│ Trigger │────►│ Engine │
├─────────┤ │ Interface │ │ │
│ CLI │────►│ │────►│ │
├─────────┤ │ │ │ │
│ API │────►│ │────►│ │
└─────────┘ └─────────────────┘ └──────────────┘
当工作流完成时,通过原始渠道将响应路由回去:
def complete_workflow(self, workflow_id: str, result: dict):
"""将结果路由回发起渠道。"""
context = self.get_workflow_context(workflow_id)
reply_to = context["reply_to"]
match reply_to["type"]:
case "slack":
self.slack_client.post_message(
reply_to["channel"],
format_slack_response(result)
)
case "email":
self.email_client.send(
reply_to["to"],
format_email_response(result)
)
case "webhook":
self.http_client.post(reply_to["url"], result)
case "cli":
print(format_cli_response(result))
channels:
slack:
enabled: true
app_token: ${SLACK_APP_TOKEN}
triggers:
- slash_command: /mas
- mention: @mas-agent
response_format: slack_blocks
email:
enabled: true
inbox: mas-agent@company.com
response_format: html
api:
enabled: true
auth: bearer_token
rate_limit: 100/minute
cli:
enabled: true
response_format: text
有关多渠道部署程序,请参阅 references/ops-runbook.md。
有关详细的实施指南:
references/cost-optimization.md - 详细的成本降低策略references/compliance-details.md - 完整的合规性要求references/ops-runbook.md - 操作程序(包括错误处理和多渠道)../agent-specification/references/twelve-factor-agents.md - 所有 12 个因子的快速参考每周安装次数
–
来源
首次出现
–
89% of organizations have implemented observability for agents, but 32% cite quality issues as the primary barrier to production. Production MAS requires first-class treatment of:
Multi-agent systems can cost 2-4x more than single agents. Systematic optimization achieves 30-50% reductions.
{
"token_budgets": {
"Planner": {
"max_input_tokens": 4000,
"max_output_tokens": 2000,
"model": "claude-3-haiku"
},
"Executor": {
"max_input_tokens": 8000,
"max_output_tokens": 4000,
"model": "claude-3-sonnet"
},
"Verifier": {
"max_input_tokens": 6000,
"max_output_tokens": 1000,
"model": "claude-3-haiku"
}
}
}
def check_budget_before_call(agent_id: str, estimated_tokens: int):
budget = token_budgets[agent_id]
if estimated_tokens > budget.max_input_tokens:
# Compress context before proceeding
compressed = compress_context(context, budget.max_input_tokens)
return compressed
return context
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
Route tasks to appropriate model tiers:
| Task Type | Model | Cost Savings |
|---|---|---|
| Classification | Haiku | 90% |
| Summarization | Haiku | 90% |
| Simple extraction | Haiku | 90% |
| Complex reasoning | Sonnet | Baseline |
| Critical decisions | Opus | -200% (worth it) |
| Strategy | Token Savings | Use Case |
|---|---|---|
| Sliding window + summarization | 60-70% | Conversations >5 turns |
| Concise output formats | 70-85% | Agent-to-agent chaining |
| IDs-only formats | 85-95% | Bulk operations |
| Selective context loading | 40-60% | Large knowledge bases |
Problem : Agents accumulate context without pruning.
Impact : Token costs escalate exponentially, systems fail at context limits.
Fix : Implement max_history_tokens budgets, compress older context.
Capture complete execution paths:
{
"trace": {
"trace_id": "workflow_uuid",
"spans": [
{
"span_id": "uuid",
"parent_span_id": null,
"operation": "workflow.start",
"agent": "Orchestrator",
"start_time": "ISO8601",
"end_time": "ISO8601",
"attributes": {
"user_request": "...",
"tokens_input": 150
}
},
{
"span_id": "uuid",
"parent_span_id": "prev_uuid",
"operation": "plan.create",
"agent": "Planner",
"attributes": {
"tokens_input": 1200,
"tokens_output": 800,
"model": "claude-3-haiku",
"latency_ms": 450
}
}
]
}
}
| Category | Metrics |
|---|---|
| Cost | Tokens per agent, cost per workflow, model utilization ratio |
| Latency | Per-agent latency, total workflow latency, queue wait time |
| Quality | Success rate, verification pass rate, retry rate |
| Coordination | Message volume, conflict rate, escalation rate |
Log state at key points:
def log_state_transition(
agent_id: str,
operation: str,
state_before: dict,
state_after: dict
):
"""Log state changes for replay capability."""
logger.info({
"event": "state_transition",
"agent": agent_id,
"operation": operation,
"state_diff": compute_diff(state_before, state_after),
"timestamp": datetime.utcnow().isoformat()
})
Use MAST taxonomy for automated failure classification:
| Category | Percentage | Examples |
|---|---|---|
| Specification | 41.77% | Missing inputs, vague outputs, no constraints |
| Alignment | 36.94% | Silent ignoring, role confusion, state conflicts |
| Verification | 21.30% | Weak checks, premature termination, self-grading |
Static permission lists are insufficient. Implement dynamic governance:
1. Certified Identity and Purpose
{
"agent": {
"id": "planner_001",
"name": "TaskPlanner",
"purpose": "Decompose requirements into task graphs",
"risk_level": "low",
"data_access": ["requirements", "constraints"],
"forbidden_data": ["pii", "credentials"]
}
}
2. Central Policy Engine
def check_policy(agent_id: str, action: str, resource: str) -> bool:
"""Check if action is permitted for agent."""
agent = get_agent_config(agent_id)
policy = get_policy(agent.role)
checks = [
action_aligns_with_purpose(action, agent.purpose),
resource_in_allowed_data(resource, agent.data_access),
not_in_forbidden(resource, agent.forbidden_data),
not_chaining_too_many_actions(agent_id)
]
return all(checks)
3. Runtime Enforcement with Audit
def enforce_and_audit(agent_id: str, action: str, resource: str):
"""Intercept, enforce, and audit agent actions."""
# Check policy
permitted = check_policy(agent_id, action, resource)
# Audit regardless of outcome
audit_log.append({
"timestamp": datetime.utcnow().isoformat(),
"agent": agent_id,
"action": action,
"resource": resource,
"permitted": permitted,
"policy_version": current_policy_version
})
if not permitted:
raise PolicyViolation(f"{agent_id} cannot {action} on {resource}")
Key requirements for AI agents:
| Requirement | Implementation |
|---|---|
| Transparency | Document agent capabilities and limitations |
| Human oversight | Escalation paths, human-in-the-loop for critical decisions |
| Risk management | Classify agents by risk, implement proportional controls |
| Technical documentation | Maintain specs, audit logs, test results |
| Accuracy/robustness | Verification processes, failure handling |
SOC 2 Type II audits now scrutinize:
Principle : Feed errors back into context so agents can self-correct. Implement error counters to prevent spin-outs.
When an agent encounters an error, feed it back as context for self-correction:
class ErrorContextManager:
MAX_ERRORS_PER_TOOL = 3
MAX_TOTAL_ERRORS = 5
def __init__(self):
self.error_counts = defaultdict(int)
self.error_history = []
def record_error(self, tool: str, error: dict) -> dict:
"""Record error and return context for retry."""
self.error_counts[tool] += 1
self.error_history.append({
"tool": tool,
"error": error,
"timestamp": datetime.utcnow().isoformat(),
"attempt": self.error_counts[tool]
})
# Check for spin-out
if self.error_counts[tool] >= self.MAX_ERRORS_PER_TOOL:
raise ToolSpinOut(f"{tool} failed {self.MAX_ERRORS_PER_TOOL} times")
if sum(self.error_counts.values()) >= self.MAX_TOTAL_ERRORS:
raise WorkflowSpinOut(f"Total errors exceeded {self.MAX_TOTAL_ERRORS}")
# Return context for self-correction
return {
"previous_error": error,
"attempt_number": self.error_counts[tool],
"suggestion": f"Previous attempt failed: {error['message']}. Try different approach."
}
Feed errors back to the agent in a structured format:
{
"role": "system",
"content": "PREVIOUS ATTEMPT FAILED\n\nError: [error message]\nAttempt: 2 of 3\n\nPlease try a different approach. Consider:\n- Alternative method X\n- Check assumption Y\n- Verify input Z"
}
| Counter | Threshold | Action |
|---|---|---|
| Per-tool errors | 3 | Stop using that tool, try alternative |
| Total errors | 5 | Pause workflow, request human help |
| Retry without progress | 2 | Force different approach |
| Same error repeated | 2 | Escalate immediately |
consecutive_errors = 0
while True:
try:
result = await handle_next_step(thread, next_step)
thread["events"].append({
"type": next_step["intent"] + '_result',
"data": result,
})
# Success! Reset the error counter
consecutive_errors = 0
except Exception as e:
consecutive_errors += 1
if consecutive_errors < 3:
# Feed error back into context and retry
thread["events"].append({
"type": 'error',
"data": format_error_for_context(e),
})
else:
# Too many errors - break loop, escalate
await escalate_to_human(thread, e)
break
See references/ops-runbook.md for detailed error handling procedures.
Principle : Meet users where they are. Agent triggering should be channel-agnostic.
class WorkflowTrigger:
"""
Channel-agnostic workflow triggering.
Same workflow can be started from any channel.
"""
def trigger(self,
input_data: dict,
channel: str,
user_id: str,
metadata: dict = None) -> str:
"""
Trigger workflow from any channel.
Args:
input_data: The actual request/task
channel: Where this came from (slack, email, cli, api, webhook)
user_id: Who triggered it
metadata: Channel-specific metadata
Returns:
workflow_id for tracking
"""
# Normalize input regardless of channel
normalized = self.normalize_input(input_data, channel)
# Create workflow with channel context
workflow_id = self.workflow_controller.launch(
input=normalized,
context={
"channel": channel,
"user_id": user_id,
"reply_to": self.get_reply_destination(channel, metadata)
}
)
return workflow_id
| Channel | Trigger Method | Response Method |
|---|---|---|
| Slack | Slash command, mention, DM | Thread reply |
| Send to agent@domain.com | Reply email | |
| CLI | mas run <workflow> | Stdout |
| API | POST /workflows | Webhook or poll |
| Webhook | POST from external system | Callback URL |
| Dashboard | Button click | UI notification |
┌─────────┐ ┌─────────────────┐ ┌──────────────┐
│ Slack │────►│ │────►│ │
├─────────┤ │ Unified │ │ Workflow │
│ Email │────►│ Trigger │────►│ Engine │
├─────────┤ │ Interface │ │ │
│ CLI │────►│ │────►│ │
├─────────┤ │ │ │ │
│ API │────►│ │────►│ │
└─────────┘ └─────────────────┘ └──────────────┘
When workflow completes, route response back through original channel:
def complete_workflow(self, workflow_id: str, result: dict):
"""Route result back to originating channel."""
context = self.get_workflow_context(workflow_id)
reply_to = context["reply_to"]
match reply_to["type"]:
case "slack":
self.slack_client.post_message(
reply_to["channel"],
format_slack_response(result)
)
case "email":
self.email_client.send(
reply_to["to"],
format_email_response(result)
)
case "webhook":
self.http_client.post(reply_to["url"], result)
case "cli":
print(format_cli_response(result))
channels:
slack:
enabled: true
app_token: ${SLACK_APP_TOKEN}
triggers:
- slash_command: /mas
- mention: @mas-agent
response_format: slack_blocks
email:
enabled: true
inbox: mas-agent@company.com
response_format: html
api:
enabled: true
auth: bearer_token
rate_limit: 100/minute
cli:
enabled: true
response_format: text
See references/ops-runbook.md for multi-channel deployment procedures.
For detailed implementation guides:
references/cost-optimization.md - Detailed cost reduction strategiesreferences/compliance-details.md - Full compliance requirementsreferences/ops-runbook.md - Operational procedures (includes error handling and multi-channel)../agent-specification/references/twelve-factor-agents.md - Quick reference for all 12 factorsWeekly Installs
–
Source
First Seen
–
超能力技能使用指南:AI助手技能调用优先级与工作流程详解
45,100 周安装
langdocx:AI驱动的技术文档生成工具,自动化创建PDF/DOCX,克隆企业模板样式
1 周安装
ue-cli:通过命令行控制Unreal Engine Editor的远程控制工具
1 周安装
培训设计师技能:基于TBR 4Cs框架与大脑友好型原则的课程设计指南
1 周安装
SSH Skill v3.2 - 高性能SSH操作工具,支持长连接、跳板机、批量并发与服务器直传
1 周安装
runtime-context技能:AI智能体运行时环境检测与工具适配,实现跨平台兼容性
1 周安装
JSON 转 React Email 渲染器:用 JSON 规范生成 HTML/纯文本邮件 | @json-render/react-email
1 周安装