重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
npx skills add https://github.com/sergio-bershadsky/ai --skill frappe-service创建结构良好的服务层类,封装业务逻辑,协调仓库之间的操作,并为控制器和 API 提供清晰的接口。
/frappe-service <service_name> [--doctype <doctype>] [--operations <op1,op2>]
示例:
/frappe-service OrderProcessing --doctype "Sales Order"
/frappe-service InventoryManagement --operations allocate,release,transfer
/frappe-service PaymentGateway
向用户询问:
OrderProcessingService)广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
确定服务模式:
| 模式 | 使用场景 | 示例 |
|---|---|---|
| CRUD 服务 | 基本的 DocType 操作 | CustomerService |
| 工作流服务 | 状态转换、审批 | ApprovalService |
| 集成服务 | 外部 API 调用 | PaymentGatewayService |
| 编排服务 | 多 DocType 协调 | OrderFulfillmentService |
| 批处理服务 | 批量操作 | BulkImportService |
创建 <app>/<module>/services/<service_name>.py:
"""
<Service Name> 服务
<此服务处理内容的详细描述>
职责:
- <职责 1>
- <职责 2>
- <职责 3>
用法:
from <app>.<module>.services.<service_name> import <ServiceName>Service
service = <ServiceName>Service()
result = service.process_order(order_data)
"""
import frappe
from frappe import _
from frappe.utils import now, today, flt, cint, cstr
from typing import TYPE_CHECKING, Optional, Any, Callable
from contextlib import contextmanager
from functools import wraps
from <app>.<module>.services.base import BaseService
from <app>.<module>.repositories.<doctype>_repository import <DocType>Repository
if TYPE_CHECKING:
from frappe.model.document import Document
# ──────────────────────────────────────────────────────────────────────────────
# 装饰器
# ──────────────────────────────────────────────────────────────────────────────
def require_permission(doctype: str, ptype: str = "read"):
"""在执行方法前检查权限的装饰器。"""
def decorator(func: Callable):
@wraps(func)
def wrapper(self, *args, **kwargs):
if not frappe.has_permission(doctype, ptype):
frappe.throw(
_("权限被拒绝:{0} {1}").format(ptype, doctype),
frappe.PermissionError
)
return func(self, *args, **kwargs)
return wrapper
return decorator
def with_transaction(func: Callable):
"""将方法包装在数据库事务中的装饰器。"""
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
result = func(self, *args, **kwargs)
frappe.db.commit()
return result
except Exception:
frappe.db.rollback()
raise
return wrapper
def log_operation(operation_name: str):
"""记录服务操作的装饰器。"""
def decorator(func: Callable):
@wraps(func)
def wrapper(self, *args, **kwargs):
frappe.logger().info(f"[{operation_name}] 开始...")
try:
result = func(self, *args, **kwargs)
frappe.logger().info(f"[{operation_name}] 成功完成")
return result
except Exception as e:
frappe.logger().error(f"[{operation_name}] 失败:{str(e)}")
raise
return wrapper
return decorator
# ──────────────────────────────────────────────────────────────────────────────
# 服务实现
# ──────────────────────────────────────────────────────────────────────────────
class <ServiceName>Service(BaseService):
"""
<描述> 服务。
此服务处理:
- <操作 1>
- <操作 2>
- <操作 3>
架构:
控制器/API → 服务 → 仓库 → 数据库
示例:
service = <ServiceName>Service()
order = service.create_order(customer="CUST-001", items=[...])
service.submit_order(order.name)
"""
def __init__(self, user: Optional[str] = None):
super().__init__(user)
self.repo = <DocType>Repository()
# 根据需要初始化其他仓库
# self.item_repo = ItemRepository()
# self.customer_repo = CustomerRepository()
# ──────────────────────────────────────────────────────────────────────────
# 公共操作(业务逻辑)
# ──────────────────────────────────────────────────────────────────────────
@require_permission("<DocType>", "create")
@with_transaction
@log_operation("create_<doctype>")
def create(self, data: dict) -> dict:
"""
创建新的 <DocType>。
参数:
data: 包含以下内容的文档数据:
- title (str): 必需的标题
- date (str): YYYY-MM-DD 格式的日期
- description (str): 可选的描述
返回:
创建的文档摘要
抛出:
frappe.ValidationError: 如果验证失败
frappe.PermissionError: 如果用户缺少权限
示例:
service.create({
"title": "新订单",
"date": "2024-01-15"
})
"""
# 1. 验证输入
self._validate_create_data(data)
# 2. 应用业务规则
data = self._apply_defaults(data)
data = self._apply_business_rules(data)
# 3. 通过仓库创建
doc = self.repo.create(data)
# 4. 创建后操作
self._on_create(doc)
# 5. 返回摘要
return doc.get_summary()
@require_permission("<DocType>", "write")
@with_transaction
def update(self, name: str, data: dict) -> dict:
"""
更新现有的 <DocType>。
参数:
name: 文档名称
data: 要更新的字段
返回:
更新后的文档摘要
"""
doc = self.repo.get_or_throw(name, for_update=True)
# 验证是否允许更新
self._validate_can_update(doc)
# 应用更新
doc.update(data)
doc.save()
return doc.get_summary()
@require_permission("<DocType>", "submit")
@with_transaction
@log_operation("submit_<doctype>")
def submit(self, name: str) -> dict:
"""
提交文档进行处理。
这会触发:
1. 提交前验证
2. 文档提交
3. 提交后操作(例如,库存更新、总账分录)
参数:
name: 文档名称
返回:
提交的文档摘要
抛出:
frappe.ValidationError: 如果提交要求未满足
"""
doc = self.repo.get_or_throw(name, for_update=True)
# 提交前检查
self._validate_submission(doc)
# 提交
doc.submit()
# 提交后处理
self._on_submit(doc)
return doc.get_summary()
@require_permission("<DocType>", "cancel")
@with_transaction
@log_operation("cancel_<doctype>")
def cancel(self, name: str, reason: Optional[str] = None) -> dict:
"""
取消已提交的文档。
参数:
name: 文档名称
reason: 取消原因(推荐)
返回:
已取消的文档摘要
"""
doc = self.repo.get_or_throw(name, for_update=True)
# 验证取消
self._validate_cancellation(doc)
# 存储原因
if reason:
doc.db_set("cancellation_reason", reason, update_modified=False)
# 取消
doc.cancel()
# 取消后处理
self._on_cancel(doc)
return doc.get_summary()
# ──────────────────────────────────────────────────────────────────────────
# 复杂的业务操作
# ──────────────────────────────────────────────────────────────────────────
@with_transaction
def process_workflow(
self,
name: str,
action: str,
comment: Optional[str] = None
) -> dict:
"""
处理文档上的工作流操作。
参数:
name: 文档名称
action: 工作流操作(例如,"Approve"、"Reject")
comment: 该操作的可选注释
返回:
带有新工作流状态的更新后文档
"""
doc = self.repo.get_or_throw(name, for_update=True)
# 验证操作是否允许
allowed_actions = self._get_allowed_workflow_actions(doc)
if action not in allowed_actions:
frappe.throw(
_("不允许的操作 '{0}'。允许的操作:{1}").format(
action, ", ".join(allowed_actions)
)
)
# 应用工作流操作
from frappe.model.workflow import apply_workflow
apply_workflow(doc, action)
# 添加注释
if comment:
doc.add_comment("Workflow", f"{action}: {comment}")
return doc.get_summary()
def calculate_totals(self, name: str) -> dict:
"""
计算并更新文档总计。
参数:
name: 文档名称
返回:
计算出的总计
"""
doc = self.repo.get_or_throw(name)
subtotal = sum(
flt(item.qty) * flt(item.rate)
for item in doc.get("items", [])
)
tax_amount = flt(subtotal) * flt(doc.tax_rate or 0) / 100
grand_total = flt(subtotal) + flt(tax_amount)
return {
"subtotal": subtotal,
"tax_amount": tax_amount,
"grand_total": grand_total
}
def bulk_operation(
self,
names: list[str],
operation: str,
**kwargs
) -> dict:
"""
对多个文档执行批量操作。
参数:
names: 文档名称列表
operation: 要执行的操作 (update_status, submit, cancel)
**kwargs: 操作特定的参数
返回:
结果摘要
"""
results = {"success": [], "failed": []}
for name in names:
try:
if operation == "update_status":
self.update(name, {"status": kwargs.get("status")})
elif operation == "submit":
self.submit(name)
elif operation == "cancel":
self.cancel(name, kwargs.get("reason"))
results["success"].append(name)
except Exception as e:
results["failed"].append({
"name": name,
"error": str(e)
})
return results
# ──────────────────────────────────────────────────────────────────────────
# 查询方法
# ──────────────────────────────────────────────────────────────────────────
def get_pending_items(self, limit: int = 50) -> list[dict]:
"""获取待处理的项目。"""
return self.repo.get_list(
filters={"status": "Pending", "docstatus": 0},
fields=["name", "title", "date", "owner", "creation"],
order_by="creation asc",
limit=limit
)
def get_statistics(self, period: str = "month") -> dict:
"""
获取仪表板统计信息。
参数:
period: 时间段 (day, week, month, year)
返回:
统计信息字典
"""
from frappe.utils import add_days, add_months, get_first_day
today_date = today()
if period == "day":
from_date = today_date
elif period == "week":
from_date = add_days(today_date, -7)
elif period == "month":
from_date = get_first_day(today_date)
else: # year
from_date = add_months(get_first_day(today_date), -12)
return {
"total": self.repo.get_count(),
"period_total": self.repo.get_count(
{"creation": [">=", from_date]}
),
"by_status": self._get_counts_by_status(),
"period": period,
"from_date": from_date
}
# ──────────────────────────────────────────────────────────────────────────
# 私有方法(内部逻辑)
# ──────────────────────────────────────────────────────────────────────────
def _validate_create_data(self, data: dict) -> None:
"""验证文档创建的数据。"""
self.validate_mandatory(data, ["title"])
# 自定义验证
if data.get("date") and data["date"] < today():
frappe.throw(_("日期不能是过去"))
def _validate_can_update(self, doc: "Document") -> None:
"""验证文档是否可以更新。"""
if doc.docstatus == 2:
frappe.throw(_("无法更新已取消的文档"))
if doc.status == "Completed":
frappe.throw(_("无法更新已完成的文档"))
def _validate_submission(self, doc: "Document") -> None:
"""验证提交的所有要求。"""
if doc.docstatus != 0:
frappe.throw(_("文档不处于草稿状态"))
# 根据需要添加更多验证
# if not doc.get("items"):
# frappe.throw(_("没有项目无法提交"))
def _validate_cancellation(self, doc: "Document") -> None:
"""验证文档是否可以取消。"""
if doc.docstatus != 1:
frappe.throw(_("只有已提交的文档可以取消"))
# 检查关联的文档
# linked = self._get_linked_submitted_docs(doc.name)
# if linked:
# frappe.throw(_("无法取消。存在关联的文档:{0}").format(linked))
def _apply_defaults(self, data: dict) -> dict:
"""将默认值应用于数据。"""
if not data.get("date"):
data["date"] = today()
if not data.get("status"):
data["status"] = "Draft"
return data
def _apply_business_rules(self, data: dict) -> dict:
"""将业务规则应用于数据。"""
# 示例:如果未指定,则将过账日期设置为今天
# 示例:计算派生字段
return data
def _on_create(self, doc: "Document") -> None:
"""用于额外处理的后创建钩子。"""
# 发送通知
# frappe.publish_realtime("new_document", {"name": doc.name})
pass
def _on_submit(self, doc: "Document") -> None:
"""提交后处理。"""
# 创建关联记录(总账分录、库存分类账等)
# 更新库存
# 发送通知
pass
def _on_cancel(self, doc: "Document") -> None:
"""取消后处理。"""
# 反转关联记录
# 更新库存
pass
def _get_allowed_workflow_actions(self, doc: "Document") -> list[str]:
"""获取文档允许的工作流操作。"""
from frappe.model.workflow import get_transitions
return [t.action for t in get_transitions(doc)]
def _get_counts_by_status(self) -> dict:
"""按状态分组获取文档计数。"""
result = frappe.db.sql("""
SELECT status, COUNT(*) as count
FROM `tab<DocType>`
WHERE docstatus < 2
GROUP BY status
""", as_dict=True)
return {row.status: row.count for row in result}
# ──────────────────────────────────────────────────────────────────────────────
# 服务工厂(用于依赖注入)
# ──────────────────────────────────────────────────────────────────────────────
def get_<service_name>_service(user: Optional[str] = None) -> <ServiceName>Service:
"""
<ServiceName>Service 的工厂函数。
使用此函数代替直接实例化,以便于测试/模拟。
参数:
user: 可选的用户上下文
返回:
服务实例
"""
return <ServiceName>Service(user=user)
对于与外部 API 集成的服务:
"""
外部集成服务
处理与外部 API 的通信,包含重试逻辑、
错误处理和响应规范化。
"""
import frappe
from frappe import _
from typing import Optional, Any
import requests
from tenacity import retry, stop_after_attempt, wait_exponential
class <Integration>Service:
"""
用于与 <External Service> 集成的服务。
配置:
- API 密钥:系统设置 > <Integration> API 密钥
- 基础 URL:系统设置 > <Integration> 基础 URL
"""
def __init__(self):
self.api_key = frappe.db.get_single_value("System Settings", "<integration>_api_key")
self.base_url = frappe.db.get_single_value("System Settings", "<integration>_base_url")
if not self.api_key:
frappe.throw(_("<Integration> API 密钥未配置"))
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def _make_request(
self,
method: str,
endpoint: str,
data: Optional[dict] = None
) -> dict:
"""
使用重试逻辑发起 HTTP 请求。
参数:
method: HTTP 方法 (GET, POST, PUT, DELETE)
endpoint: API 端点
data: 请求负载
返回:
响应数据
抛出:
frappe.ValidationError: 发生 API 错误时
"""
url = f"{self.base_url}/{endpoint}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
try:
response = requests.request(
method=method,
url=url,
json=data,
headers=headers,
timeout=30
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
frappe.throw(_("请求超时。请重试。"))
except requests.exceptions.HTTPError as e:
error_msg = self._parse_error_response(e.response)
frappe.throw(_("API 错误:{0}").format(error_msg))
except requests.exceptions.RequestException as e:
frappe.throw(_("连接错误:{0}").format(str(e)))
def _parse_error_response(self, response) -> str:
"""从 API 响应中解析错误消息。"""
try:
data = response.json()
return data.get("message") or data.get("error") or response.text
except Exception:
return response.text
# 公共 API 方法
def create_external_record(self, data: dict) -> dict:
"""在外部系统中创建记录。"""
return self._make_request("POST", "records", data)
def get_external_record(self, external_id: str) -> dict:
"""从外部系统获取记录。"""
return self._make_request("GET", f"records/{external_id}")
def sync_records(self) -> dict:
"""与外部系统同步记录。"""
# 实现
pass
## 服务层预览
**服务:** <ServiceName>Service
**模块:** <app>.<module>.services.<service_name>
### 架构:
┌─────────────────────┐ │ 控制器/API │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ │ ← 业务逻辑 │ 服务 │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ │ ← 数据访问 │ 仓库 │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ 数据库 │ └─────────────────────┘
### 操作:
| 方法 | 权限 | 描述 |
|--------|------------|-------------|
| create() | create | 创建新文档 |
| update() | write | 更新文档 |
| submit() | submit | 提交进行处理 |
| cancel() | cancel | 取消文档 |
| process_workflow() | write | 执行工作流操作 |
| get_statistics() | read | 仪表板统计信息 |
### 特性:
- ✅ 权限装饰器
- ✅ 事务管理
- ✅ 操作日志记录
- ✅ 验证层
- ✅ 业务规则分离
- ✅ 用于依赖注入的工厂函数
---
创建此服务?
批准后,创建服务文件并运行测试。
## 服务已创建
**名称:** <ServiceName>Service
**路径:** <app>/<module>/services/<service_name>.py
### 特性:
- ✅ 基础服务继承
- ✅ 仓库集成
- ✅ 权限检查
- ✅ 事务管理
- ✅ 业务逻辑方法
- ✅ 工厂函数
### 用法:
```python
from <app>.<module>.services.<service_name> import <ServiceName>Service
service = <ServiceName>Service()
# 创建
result = service.create({"title": "新记录"})
# 提交
service.submit(result["name"])
# 获取统计信息
stats = service.get_statistics(period="month")
## 规则
1. **单一职责** — 每个服务处理一个领域/聚合
2. **使用仓库** — 服务调用仓库进行数据访问;仓库处理 `frappe.db`/`frappe.get_doc`
3. **事务感知** — Frappe 在成功时自动提交;仅在需要显式回滚时使用 `@with_transaction`
4. **权限检查** — 始终在服务边界检查权限
5. **验证优先** — 在任何业务逻辑之前进行验证
6. **工厂模式** — 使用工厂函数以便于测试/模拟
7. **始终确认** — 未经用户明确批准,切勿创建文件
## 安全指南
1. **SQL 注入预防** — 使用带有参数化查询的 `frappe.db.sql()`:
```python
# 正确:参数化
frappe.db.sql("SELECT name FROM tabUser WHERE email=%s", [email])
# 错误:字符串格式化(存在 SQL 注入风险)
frappe.db.sql(f"SELECT name FROM tabUser WHERE email='{email}'")
2. 避免 eval/exec — 切勿将 eval() 或 exec() 与用户输入一起使用。如果绝对需要代码评估,请使用 frappe.safe_eval()。
权限绕过意识 — frappe.db.set_value() 和 frappe.get_all() 会绕过权限。仅用于系统操作,切勿用于面向用户的代码。
输入净化 — 验证并净化所有用户输入。使用类型注解进行自动的 v15 验证。
每周安装次数
47
仓库
GitHub 星标数
8
首次出现
2026年1月25日
安全审计
安装于
gemini-cli44
codex44
opencode44
github-copilot44
kimi-cli41
amp41
Create well-structured service layer classes that encapsulate business logic, coordinate between repositories, and provide clean interfaces for controllers and APIs.
/frappe-service <service_name> [--doctype <doctype>] [--operations <op1,op2>]
Examples:
/frappe-service OrderProcessing --doctype "Sales Order"
/frappe-service InventoryManagement --operations allocate,release,transfer
/frappe-service PaymentGateway
Ask the user for:
OrderProcessingService)Determine the service pattern:
| Pattern | Use Case | Example |
|---|---|---|
| CRUD Service | Basic DocType operations | CustomerService |
| Workflow Service | State transitions, approvals | ApprovalService |
| Integration Service | External API calls | PaymentGatewayService |
| Orchestration Service | Multi-DocType coordination | OrderFulfillmentService |
| Batch Service | Bulk operations | BulkImportService |
Create <app>/<module>/services/<service_name>.py:
"""
<Service Name> Service
<Detailed description of what this service handles>
Responsibilities:
- <Responsibility 1>
- <Responsibility 2>
- <Responsibility 3>
Usage:
from <app>.<module>.services.<service_name> import <ServiceName>Service
service = <ServiceName>Service()
result = service.process_order(order_data)
"""
import frappe
from frappe import _
from frappe.utils import now, today, flt, cint, cstr
from typing import TYPE_CHECKING, Optional, Any, Callable
from contextlib import contextmanager
from functools import wraps
from <app>.<module>.services.base import BaseService
from <app>.<module>.repositories.<doctype>_repository import <DocType>Repository
if TYPE_CHECKING:
from frappe.model.document import Document
# ──────────────────────────────────────────────────────────────────────────────
# Decorators
# ──────────────────────────────────────────────────────────────────────────────
def require_permission(doctype: str, ptype: str = "read"):
"""Decorator to check permission before method execution."""
def decorator(func: Callable):
@wraps(func)
def wrapper(self, *args, **kwargs):
if not frappe.has_permission(doctype, ptype):
frappe.throw(
_("Permission denied: {0} {1}").format(ptype, doctype),
frappe.PermissionError
)
return func(self, *args, **kwargs)
return wrapper
return decorator
def with_transaction(func: Callable):
"""Decorator to wrap method in database transaction."""
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
result = func(self, *args, **kwargs)
frappe.db.commit()
return result
except Exception:
frappe.db.rollback()
raise
return wrapper
def log_operation(operation_name: str):
"""Decorator to log service operation."""
def decorator(func: Callable):
@wraps(func)
def wrapper(self, *args, **kwargs):
frappe.logger().info(f"[{operation_name}] Starting...")
try:
result = func(self, *args, **kwargs)
frappe.logger().info(f"[{operation_name}] Completed successfully")
return result
except Exception as e:
frappe.logger().error(f"[{operation_name}] Failed: {str(e)}")
raise
return wrapper
return decorator
# ──────────────────────────────────────────────────────────────────────────────
# Service Implementation
# ──────────────────────────────────────────────────────────────────────────────
class <ServiceName>Service(BaseService):
"""
Service for <description>.
This service handles:
- <Operation 1>
- <Operation 2>
- <Operation 3>
Architecture:
Controller/API → Service → Repository → Database
Example:
service = <ServiceName>Service()
order = service.create_order(customer="CUST-001", items=[...])
service.submit_order(order.name)
"""
def __init__(self, user: Optional[str] = None):
super().__init__(user)
self.repo = <DocType>Repository()
# Initialize other repositories as needed
# self.item_repo = ItemRepository()
# self.customer_repo = CustomerRepository()
# ──────────────────────────────────────────────────────────────────────────
# Public Operations (Business Logic)
# ──────────────────────────────────────────────────────────────────────────
@require_permission("<DocType>", "create")
@with_transaction
@log_operation("create_<doctype>")
def create(self, data: dict) -> dict:
"""
Create a new <DocType>.
Args:
data: Document data containing:
- title (str): Required title
- date (str): Date in YYYY-MM-DD format
- description (str): Optional description
Returns:
Created document summary
Raises:
frappe.ValidationError: If validation fails
frappe.PermissionError: If user lacks permission
Example:
service.create({
"title": "New Order",
"date": "2024-01-15"
})
"""
# 1. Validate input
self._validate_create_data(data)
# 2. Apply business rules
data = self._apply_defaults(data)
data = self._apply_business_rules(data)
# 3. Create via repository
doc = self.repo.create(data)
# 4. Post-creation actions
self._on_create(doc)
# 5. Return summary
return doc.get_summary()
@require_permission("<DocType>", "write")
@with_transaction
def update(self, name: str, data: dict) -> dict:
"""
Update existing <DocType>.
Args:
name: Document name
data: Fields to update
Returns:
Updated document summary
"""
doc = self.repo.get_or_throw(name, for_update=True)
# Validate update is allowed
self._validate_can_update(doc)
# Apply update
doc.update(data)
doc.save()
return doc.get_summary()
@require_permission("<DocType>", "submit")
@with_transaction
@log_operation("submit_<doctype>")
def submit(self, name: str) -> dict:
"""
Submit document for processing.
This triggers:
1. Pre-submission validation
2. Document submission
3. Post-submission actions (e.g., stock updates, GL entries)
Args:
name: Document name
Returns:
Submitted document summary
Raises:
frappe.ValidationError: If submission requirements not met
"""
doc = self.repo.get_or_throw(name, for_update=True)
# Pre-submission checks
self._validate_submission(doc)
# Submit
doc.submit()
# Post-submission processing
self._on_submit(doc)
return doc.get_summary()
@require_permission("<DocType>", "cancel")
@with_transaction
@log_operation("cancel_<doctype>")
def cancel(self, name: str, reason: Optional[str] = None) -> dict:
"""
Cancel submitted document.
Args:
name: Document name
reason: Cancellation reason (recommended)
Returns:
Cancelled document summary
"""
doc = self.repo.get_or_throw(name, for_update=True)
# Validate cancellation
self._validate_cancellation(doc)
# Store reason
if reason:
doc.db_set("cancellation_reason", reason, update_modified=False)
# Cancel
doc.cancel()
# Post-cancellation processing
self._on_cancel(doc)
return doc.get_summary()
# ──────────────────────────────────────────────────────────────────────────
# Complex Business Operations
# ──────────────────────────────────────────────────────────────────────────
@with_transaction
def process_workflow(
self,
name: str,
action: str,
comment: Optional[str] = None
) -> dict:
"""
Process workflow action on document.
Args:
name: Document name
action: Workflow action (e.g., "Approve", "Reject")
comment: Optional comment for the action
Returns:
Updated document with new workflow state
"""
doc = self.repo.get_or_throw(name, for_update=True)
# Validate action is allowed
allowed_actions = self._get_allowed_workflow_actions(doc)
if action not in allowed_actions:
frappe.throw(
_("Action '{0}' not allowed. Allowed: {1}").format(
action, ", ".join(allowed_actions)
)
)
# Apply workflow action
from frappe.model.workflow import apply_workflow
apply_workflow(doc, action)
# Add comment
if comment:
doc.add_comment("Workflow", f"{action}: {comment}")
return doc.get_summary()
def calculate_totals(self, name: str) -> dict:
"""
Calculate and update document totals.
Args:
name: Document name
Returns:
Calculated totals
"""
doc = self.repo.get_or_throw(name)
subtotal = sum(
flt(item.qty) * flt(item.rate)
for item in doc.get("items", [])
)
tax_amount = flt(subtotal) * flt(doc.tax_rate or 0) / 100
grand_total = flt(subtotal) + flt(tax_amount)
return {
"subtotal": subtotal,
"tax_amount": tax_amount,
"grand_total": grand_total
}
def bulk_operation(
self,
names: list[str],
operation: str,
**kwargs
) -> dict:
"""
Perform bulk operation on multiple documents.
Args:
names: List of document names
operation: Operation to perform (update_status, submit, cancel)
**kwargs: Operation-specific arguments
Returns:
Results summary
"""
results = {"success": [], "failed": []}
for name in names:
try:
if operation == "update_status":
self.update(name, {"status": kwargs.get("status")})
elif operation == "submit":
self.submit(name)
elif operation == "cancel":
self.cancel(name, kwargs.get("reason"))
results["success"].append(name)
except Exception as e:
results["failed"].append({
"name": name,
"error": str(e)
})
return results
# ──────────────────────────────────────────────────────────────────────────
# Query Methods
# ──────────────────────────────────────────────────────────────────────────
def get_pending_items(self, limit: int = 50) -> list[dict]:
"""Get items pending action."""
return self.repo.get_list(
filters={"status": "Pending", "docstatus": 0},
fields=["name", "title", "date", "owner", "creation"],
order_by="creation asc",
limit=limit
)
def get_statistics(self, period: str = "month") -> dict:
"""
Get statistics for dashboard.
Args:
period: Time period (day, week, month, year)
Returns:
Statistics dict
"""
from frappe.utils import add_days, add_months, get_first_day
today_date = today()
if period == "day":
from_date = today_date
elif period == "week":
from_date = add_days(today_date, -7)
elif period == "month":
from_date = get_first_day(today_date)
else: # year
from_date = add_months(get_first_day(today_date), -12)
return {
"total": self.repo.get_count(),
"period_total": self.repo.get_count(
{"creation": [">=", from_date]}
),
"by_status": self._get_counts_by_status(),
"period": period,
"from_date": from_date
}
# ──────────────────────────────────────────────────────────────────────────
# Private Methods (Internal Logic)
# ──────────────────────────────────────────────────────────────────────────
def _validate_create_data(self, data: dict) -> None:
"""Validate data for document creation."""
self.validate_mandatory(data, ["title"])
# Custom validations
if data.get("date") and data["date"] < today():
frappe.throw(_("Date cannot be in the past"))
def _validate_can_update(self, doc: "Document") -> None:
"""Validate document can be updated."""
if doc.docstatus == 2:
frappe.throw(_("Cannot update cancelled document"))
if doc.status == "Completed":
frappe.throw(_("Cannot update completed document"))
def _validate_submission(self, doc: "Document") -> None:
"""Validate all requirements for submission."""
if doc.docstatus != 0:
frappe.throw(_("Document is not in draft state"))
# Add more validations as needed
# if not doc.get("items"):
# frappe.throw(_("Cannot submit without items"))
def _validate_cancellation(self, doc: "Document") -> None:
"""Validate document can be cancelled."""
if doc.docstatus != 1:
frappe.throw(_("Only submitted documents can be cancelled"))
# Check for linked documents
# linked = self._get_linked_submitted_docs(doc.name)
# if linked:
# frappe.throw(_("Cannot cancel. Linked documents exist: {0}").format(linked))
def _apply_defaults(self, data: dict) -> dict:
"""Apply default values to data."""
if not data.get("date"):
data["date"] = today()
if not data.get("status"):
data["status"] = "Draft"
return data
def _apply_business_rules(self, data: dict) -> dict:
"""Apply business rules to data."""
# Example: Set posting date to today if not specified
# Example: Calculate derived fields
return data
def _on_create(self, doc: "Document") -> None:
"""Post-creation hook for additional processing."""
# Send notification
# frappe.publish_realtime("new_document", {"name": doc.name})
pass
def _on_submit(self, doc: "Document") -> None:
"""Post-submission processing."""
# Create linked records (GL entries, stock ledger, etc.)
# Update inventory
# Send notifications
pass
def _on_cancel(self, doc: "Document") -> None:
"""Post-cancellation processing."""
# Reverse linked records
# Update inventory
pass
def _get_allowed_workflow_actions(self, doc: "Document") -> list[str]:
"""Get allowed workflow actions for document."""
from frappe.model.workflow import get_transitions
return [t.action for t in get_transitions(doc)]
def _get_counts_by_status(self) -> dict:
"""Get document counts grouped by status."""
result = frappe.db.sql("""
SELECT status, COUNT(*) as count
FROM `tab<DocType>`
WHERE docstatus < 2
GROUP BY status
""", as_dict=True)
return {row.status: row.count for row in result}
# ──────────────────────────────────────────────────────────────────────────────
# Service Factory (for dependency injection)
# ──────────────────────────────────────────────────────────────────────────────
def get_<service_name>_service(user: Optional[str] = None) -> <ServiceName>Service:
"""
Factory function for <ServiceName>Service.
Use this instead of direct instantiation for easier testing/mocking.
Args:
user: Optional user context
Returns:
Service instance
"""
return <ServiceName>Service(user=user)
For services that integrate with external APIs:
"""
External Integration Service
Handles communication with external APIs with retry logic,
error handling, and response normalization.
"""
import frappe
from frappe import _
from typing import Optional, Any
import requests
from tenacity import retry, stop_after_attempt, wait_exponential
class <Integration>Service:
"""
Service for integrating with <External Service>.
Configuration:
- API Key: System Settings > <Integration> API Key
- Base URL: System Settings > <Integration> Base URL
"""
def __init__(self):
self.api_key = frappe.db.get_single_value("System Settings", "<integration>_api_key")
self.base_url = frappe.db.get_single_value("System Settings", "<integration>_base_url")
if not self.api_key:
frappe.throw(_("<Integration> API key not configured"))
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def _make_request(
self,
method: str,
endpoint: str,
data: Optional[dict] = None
) -> dict:
"""
Make HTTP request with retry logic.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint
data: Request payload
Returns:
Response data
Raises:
frappe.ValidationError: On API error
"""
url = f"{self.base_url}/{endpoint}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
try:
response = requests.request(
method=method,
url=url,
json=data,
headers=headers,
timeout=30
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
frappe.throw(_("Request timed out. Please try again."))
except requests.exceptions.HTTPError as e:
error_msg = self._parse_error_response(e.response)
frappe.throw(_("API Error: {0}").format(error_msg))
except requests.exceptions.RequestException as e:
frappe.throw(_("Connection error: {0}").format(str(e)))
def _parse_error_response(self, response) -> str:
"""Parse error message from API response."""
try:
data = response.json()
return data.get("message") or data.get("error") or response.text
except Exception:
return response.text
# Public API methods
def create_external_record(self, data: dict) -> dict:
"""Create record in external system."""
return self._make_request("POST", "records", data)
def get_external_record(self, external_id: str) -> dict:
"""Get record from external system."""
return self._make_request("GET", f"records/{external_id}")
def sync_records(self) -> dict:
"""Sync records with external system."""
# Implementation
pass
## Service Layer Preview
**Service:** <ServiceName>Service
**Module:** <app>.<module>.services.<service_name>
### Architecture:
┌─────────────────────┐ │ Controller/API │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ │ ← Business Logic │ Service │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ │ ← Data Access │ Repository │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ Database │ └─────────────────────┘
### Operations:
| Method | Permission | Description |
|--------|------------|-------------|
| create() | create | Create new document |
| update() | write | Update document |
| submit() | submit | Submit for processing |
| cancel() | cancel | Cancel document |
| process_workflow() | write | Execute workflow action |
| get_statistics() | read | Dashboard stats |
### Features:
- ✅ Permission decorators
- ✅ Transaction management
- ✅ Operation logging
- ✅ Validation layer
- ✅ Business rules separation
- ✅ Factory function for DI
---
Create this service?
After approval, create service file and run tests.
## Service Created
**Name:** <ServiceName>Service
**Path:** <app>/<module>/services/<service_name>.py
### Features:
- ✅ Base service inheritance
- ✅ Repository integration
- ✅ Permission checking
- ✅ Transaction management
- ✅ Business logic methods
- ✅ Factory function
### Usage:
```python
from <app>.<module>.services.<service_name> import <ServiceName>Service
service = <ServiceName>Service()
# Create
result = service.create({"title": "New Record"})
# Submit
service.submit(result["name"])
# Get statistics
stats = service.get_statistics(period="month")
## Rules
1. **Single Responsibility** — Each service handles one domain/aggregate
2. **Use Repositories** — Services call repositories for data access; repositories handle `frappe.db`/`frappe.get_doc`
3. **Transaction Awareness** — Frappe auto-commits on success; use `@with_transaction` only for explicit rollback needs
4. **Permission Checks** — Always check permissions at service boundary
5. **Validation First** — Validate before any business logic
6. **Factory Pattern** — Use factory function for easier testing/mocking
7. **ALWAYS Confirm** — Never create files without explicit user approval
## Security Guidelines
1. **SQL Injection Prevention** — Use `frappe.db.sql()` with parameterized queries:
```python
# CORRECT: Parameterized
frappe.db.sql("SELECT name FROM tabUser WHERE email=%s", [email])
# WRONG: String formatting (SQL injection risk)
frappe.db.sql(f"SELECT name FROM tabUser WHERE email='{email}'")
2. Avoid eval/exec — Never use eval() or exec() with user input. Use frappe.safe_eval() if code evaluation is absolutely required.
Permission Bypass Awareness — frappe.db.set_value() and frappe.get_all() bypass permissions. Use only for system operations, never for user-facing code.
Input Sanitization — Validate and sanitize all user inputs. Use type annotations for automatic v15 validation.
Weekly Installs
47
Repository
GitHub Stars
8
First Seen
Jan 25, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
gemini-cli44
codex44
opencode44
github-copilot44
kimi-cli41
amp41
Python Excel自动化:openpyxl库操作XLSX文件教程,创建编辑格式化电子表格
948 周安装
Apify 社交平台数据抓取工具集:Twitter、Reddit、LinkedIn、Instagram 等自动化爬虫
177 周安装
SSL证书自动化管理指南 - Cert-Manager、Let's Encrypt与AWS ACM最佳实践
178 周安装
AXe iOS模拟器自动化工具:通过无障碍API实现iOS模拟器自动化测试与交互
175 周安装
repomix-unmixer:从AI打包仓库中提取文件,恢复原始目录结构
184 周安装
Hummingbot 心跳监控技能 - 自动化交易机器人状态报告与定时检查
178 周安装
ToolUniverse技能创建指南:10大核心原则与7阶段工作流程详解
177 周安装