npx skills add https://github.com/agusmdev/burntop --skill fastapi-core-service本技能涵盖创建基础服务类,该类充当路由器和存储库之间的中介。服务包含业务逻辑并协调存储库调用。
创建 src/app/core/service.py:
from collections.abc import Sequence
from typing import Generic
from uuid import UUID
from fastapi_filter.contrib.sqlalchemy import Filter
from fastapi_pagination import Params
from fastapi_pagination.bases import AbstractPage
from app.core.repository import (
AbstractRepository,
CreateSchemaType,
ModelType,
UpdateSchemaType,
)
class BaseService(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
"""
基础服务类,提供标准的 CRUD 操作。
服务:
- 封装存储库操作
- 包含业务逻辑
- 协调多个存储库调用
- 处理横切关注点(日志记录、事件等)
服务不应:
- 编写 SQL 查询(委托给存储库)
- 处理 HTTP 相关事宜(这是路由器的职责)
- 直接访问数据库会话
类型参数:
ModelType: SQLAlchemy 模型类
CreateSchemaType: 用于创建操作的 Pydantic 模式
UpdateSchemaType: 用于更新操作的 Pydantic 模式
用法:
class ItemService(BaseService[Item, ItemCreate, ItemUpdate]):
pass
"""
def __init__(
self,
repository: AbstractRepository[ModelType, CreateSchemaType, UpdateSchemaType],
):
"""
使用存储库初始化服务。
参数:
repository: 用于数据访问的存储库实例
"""
self._repository = repository
# ==================== 基础 CRUD ====================
async def create(self, obj_in: CreateSchemaType) -> ModelType:
"""
创建新记录。
可重写以在创建前后添加业务逻辑。
参数:
obj_in: 创建数据
返回:
创建的模型实例
"""
return await self._repository.create(obj_in)
async def get_by_id(self, id: UUID) -> ModelType | None:
"""
通过 ID 获取单个记录。
参数:
id: 记录的 UUID
返回:
如果找到则返回模型实例,否则返回 None
"""
return await self._repository.get_by_id(id)
async def get_all(self) -> Sequence[ModelType]:
"""
获取所有记录。
返回:
模型实例序列
"""
return await self._repository.get_all()
async def update(
self,
id: UUID,
obj_in: UpdateSchemaType,
exclude_unset: bool = True,
) -> ModelType | None:
"""
更新现有记录。
可重写以在更新前后添加业务逻辑。
参数:
id: 要更新的记录的 UUID
obj_in: 更新数据
exclude_unset: 仅更新显式设置的字段
返回:
如果找到则返回更新后的模型实例,否则返回 None
"""
return await self._repository.update(id, obj_in, exclude_unset)
async def delete(self, id: UUID) -> bool:
"""
永久删除记录。
参数:
id: 要删除的记录的 UUID
返回:
如果已删除则返回 True,如果未找到则返回 False
"""
return await self._repository.delete(id)
# ==================== 分页与过滤 ====================
async def get_paginated(
self,
params: Params,
filter_spec: Filter | None = None,
) -> AbstractPage[ModelType]:
"""
获取分页记录,支持可选过滤。
此方法从路由器层接收过滤器并将其传递给存储库。
参数:
params: 分页参数
filter_spec: 可选的过滤器规范
返回:
分页结果
"""
return await self._repository.get_paginated(params, filter_spec)
async def get_filtered(
self,
filter_spec: Filter,
) -> Sequence[ModelType]:
"""
获取所有匹配过滤器的记录。
参数:
filter_spec: 过滤器规范
返回:
匹配的记录
"""
return await self._repository.get_filtered(filter_spec)
async def count(self, filter_spec: Filter | None = None) -> int:
"""
统计记录数。
参数:
filter_spec: 可选的过滤器规范
返回:
匹配的记录数
"""
return await self._repository.count(filter_spec)
# ==================== 批量操作 ====================
async def bulk_create(
self,
objs_in: Sequence[CreateSchemaType],
) -> Sequence[ModelType]:
"""
创建多个记录。
参数:
objs_in: 创建数据序列
返回:
创建的模型实例
"""
return await self._repository.bulk_create(objs_in)
async def bulk_upsert(
self,
objs_in: Sequence[CreateSchemaType],
index_elements: Sequence[str],
update_fields: Sequence[str] | None = None,
) -> Sequence[ModelType]:
"""
批量更新或插入记录。
参数:
objs_in: 要更新或插入的记录
index_elements: 唯一约束列
update_fields: 冲突时要更新的字段
返回:
更新或插入后的模型实例
"""
return await self._repository.bulk_upsert(
objs_in, index_elements, update_fields
)
async def bulk_delete(self, ids: Sequence[UUID]) -> int:
"""
删除多个记录。
参数:
ids: 要删除的 UUID
返回:
删除的数量
"""
return await self._repository.bulk_delete(ids)
# ==================== 软删除 ====================
async def soft_delete(self, id: UUID) -> bool:
"""
软删除记录。
参数:
id: 记录的 UUID
返回:
如果软删除成功则返回 True
"""
return await self._repository.soft_delete(id)
async def restore(self, id: UUID) -> bool:
"""
恢复软删除的记录。
参数:
id: 记录的 UUID
返回:
如果恢复成功则返回 True
"""
return await self._repository.restore(id)
async def get_by_id_with_deleted(self, id: UUID) -> ModelType | None:
"""
获取记录,包括软删除的记录。
参数:
id: 记录的 UUID
返回:
如果找到则返回模型实例
"""
return await self._repository.get_by_id_with_deleted(id)
# ==================== 实用工具 ====================
async def exists(self, id: UUID) -> bool:
"""
检查记录是否存在。
参数:
id: 要检查的 UUID
返回:
如果存在则返回 True
"""
return await self._repository.exists(id)
async def get_by_ids(self, ids: Sequence[UUID]) -> Sequence[ModelType]:
"""
通过多个 ID 获取记录。
参数:
ids: 要获取的 UUID
返回:
找到的模型实例
"""
return await self._repository.get_by_ids(ids)
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
# src/app/items/service.py
from uuid import UUID
from app.core.service import BaseService
from app.exceptions import ConflictError, NotFoundError
from app.items.models import Item
from app.items.repository import ItemRepository
from app.items.schemas import ItemCreate, ItemUpdate
class ItemService(BaseService[Item, ItemCreate, ItemUpdate]):
"""Item 实体的服务,包含业务逻辑。"""
def __init__(self, repository: ItemRepository):
super().__init__(repository)
# 为 IDE 支持提供类型提示
self._repository: ItemRepository = repository
async def create(self, obj_in: ItemCreate) -> Item:
"""
创建项目,并检查名称是否重复。
抛出:
ConflictError: 如果存在同名项目
"""
# 业务逻辑:检查名称是否重复
existing = await self._repository.get_by_name(obj_in.name)
if existing:
raise ConflictError(
resource="Item",
field="name",
value=obj_in.name,
)
return await super().create(obj_in)
async def get_by_id_or_raise(self, id: UUID) -> Item:
"""
通过 ID 获取项目,如果未找到则抛出 NotFoundError。
在需要确保项目存在时很有用。
抛出:
NotFoundError: 如果项目未找到
"""
item = await self.get_by_id(id)
if not item:
raise NotFoundError(resource="Item", id=id)
return item
async def update(
self,
id: UUID,
obj_in: ItemUpdate,
exclude_unset: bool = True,
) -> Item | None:
"""
更新项目,并检查名称是否重复。
抛出:
ConflictError: 如果新名称与现有项目冲突
"""
# 业务逻辑:更新时检查名称唯一性
if obj_in.name is not None:
existing = await self._repository.get_by_name(obj_in.name)
if existing and existing.id != id:
raise ConflictError(
resource="Item",
field="name",
value=obj_in.name,
)
return await super().update(id, obj_in, exclude_unset)
async def get_by_id_or_raise(self, id: UUID) -> ModelType:
"""获取记录,如果未找到则抛出 NotFoundError。"""
instance = await self.get_by_id(id)
if not instance:
raise NotFoundError(resource=self._model_name, id=id)
return instance
async def create(self, obj_in: CreateSchemaType) -> ModelType:
"""创建前进行验证。"""
await self._validate_create(obj_in)
return await super().create(obj_in)
async def _validate_create(self, obj_in: CreateSchemaType) -> None:
"""在子类中重写以添加验证逻辑。"""
pass
class OrderService(BaseService[Order, OrderCreate, OrderUpdate]):
def __init__(
self,
repository: OrderRepository,
item_repository: ItemRepository,
user_repository: UserRepository,
):
super().__init__(repository)
self._item_repo = item_repository
self._user_repo = user_repository
async def create(self, obj_in: OrderCreate) -> Order:
# 验证用户是否存在
user = await self._user_repo.get_by_id(obj_in.user_id)
if not user:
raise NotFoundError(resource="User", id=obj_in.user_id)
# 验证所有项目是否存在
items = await self._item_repo.get_by_ids(obj_in.item_ids)
if len(items) != len(obj_in.item_ids):
raise ValidationError("Some items not found")
return await super().create(obj_in)
class ItemService(BaseService[Item, ItemCreate, ItemUpdate]):
async def create(self, obj_in: ItemCreate) -> Item:
item = await super().create(obj_in)
await self._on_created(item)
return item
async def _on_created(self, item: Item) -> None:
"""项目创建后调用的钩子。"""
# 发送通知、更新缓存、触发事件等
logger.info(f"Item created: {item.id}")
默认情况下,每个存储库方法都会提交其事务。对于涉及多次写入的复杂操作,请考虑事务管理:
# 在存储库中,添加一个不提交的方法:
async def create_no_commit(self, obj_in: CreateSchemaType) -> ModelType:
data = obj_in.model_dump()
instance = self._model(**data)
self._session.add(instance)
await self._session.flush() # 获取 ID 但不提交
return instance
# 在服务中:
async def create_order_with_items(self, order: OrderCreate) -> Order:
async with self._session.begin(): # 事务
order = await self._order_repo.create_no_commit(order)
for item in order.items:
await self._order_item_repo.create_no_commit(item)
# 退出时提交
return order
每周安装数
1
代码仓库
GitHub 星标数
3
首次出现
1 天前
安全审计
安装于
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1
This skill covers creating the base service class that acts as an intermediary between routers and repositories. Services contain business logic and orchestrate repository calls.
Create src/app/core/service.py:
from collections.abc import Sequence
from typing import Generic
from uuid import UUID
from fastapi_filter.contrib.sqlalchemy import Filter
from fastapi_pagination import Params
from fastapi_pagination.bases import AbstractPage
from app.core.repository import (
AbstractRepository,
CreateSchemaType,
ModelType,
UpdateSchemaType,
)
class BaseService(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
"""
Base service class providing standard CRUD operations.
Services:
- Wrap repository operations
- Contain business logic
- Orchestrate multiple repository calls
- Handle cross-cutting concerns (logging, events, etc.)
Services should NOT:
- Write SQL queries (delegate to repository)
- Handle HTTP concerns (that's the router's job)
- Access the database session directly
Type Parameters:
ModelType: SQLAlchemy model class
CreateSchemaType: Pydantic schema for create operations
UpdateSchemaType: Pydantic schema for update operations
Usage:
class ItemService(BaseService[Item, ItemCreate, ItemUpdate]):
pass
"""
def __init__(
self,
repository: AbstractRepository[ModelType, CreateSchemaType, UpdateSchemaType],
):
"""
Initialize service with repository.
Args:
repository: Repository instance for data access
"""
self._repository = repository
# ==================== Basic CRUD ====================
async def create(self, obj_in: CreateSchemaType) -> ModelType:
"""
Create a new record.
Override to add business logic before/after creation.
Args:
obj_in: Creation data
Returns:
Created model instance
"""
return await self._repository.create(obj_in)
async def get_by_id(self, id: UUID) -> ModelType | None:
"""
Get a single record by ID.
Args:
id: UUID of the record
Returns:
Model instance if found, None otherwise
"""
return await self._repository.get_by_id(id)
async def get_all(self) -> Sequence[ModelType]:
"""
Get all records.
Returns:
Sequence of model instances
"""
return await self._repository.get_all()
async def update(
self,
id: UUID,
obj_in: UpdateSchemaType,
exclude_unset: bool = True,
) -> ModelType | None:
"""
Update an existing record.
Override to add business logic before/after update.
Args:
id: UUID of record to update
obj_in: Update data
exclude_unset: Only update explicitly set fields
Returns:
Updated model instance if found, None otherwise
"""
return await self._repository.update(id, obj_in, exclude_unset)
async def delete(self, id: UUID) -> bool:
"""
Permanently delete a record.
Args:
id: UUID of record to delete
Returns:
True if deleted, False if not found
"""
return await self._repository.delete(id)
# ==================== Pagination & Filtering ====================
async def get_paginated(
self,
params: Params,
filter_spec: Filter | None = None,
) -> AbstractPage[ModelType]:
"""
Get paginated records with optional filtering.
This method receives the filter from the router layer and
passes it to the repository.
Args:
params: Pagination parameters
filter_spec: Optional filter specification
Returns:
Paginated result
"""
return await self._repository.get_paginated(params, filter_spec)
async def get_filtered(
self,
filter_spec: Filter,
) -> Sequence[ModelType]:
"""
Get all records matching filter.
Args:
filter_spec: Filter specification
Returns:
Matching records
"""
return await self._repository.get_filtered(filter_spec)
async def count(self, filter_spec: Filter | None = None) -> int:
"""
Count records.
Args:
filter_spec: Optional filter specification
Returns:
Number of matching records
"""
return await self._repository.count(filter_spec)
# ==================== Bulk Operations ====================
async def bulk_create(
self,
objs_in: Sequence[CreateSchemaType],
) -> Sequence[ModelType]:
"""
Create multiple records.
Args:
objs_in: Sequence of creation data
Returns:
Created model instances
"""
return await self._repository.bulk_create(objs_in)
async def bulk_upsert(
self,
objs_in: Sequence[CreateSchemaType],
index_elements: Sequence[str],
update_fields: Sequence[str] | None = None,
) -> Sequence[ModelType]:
"""
Upsert multiple records.
Args:
objs_in: Records to upsert
index_elements: Unique constraint columns
update_fields: Fields to update on conflict
Returns:
Upserted model instances
"""
return await self._repository.bulk_upsert(
objs_in, index_elements, update_fields
)
async def bulk_delete(self, ids: Sequence[UUID]) -> int:
"""
Delete multiple records.
Args:
ids: UUIDs to delete
Returns:
Number deleted
"""
return await self._repository.bulk_delete(ids)
# ==================== Soft Delete ====================
async def soft_delete(self, id: UUID) -> bool:
"""
Soft delete a record.
Args:
id: UUID of record
Returns:
True if soft deleted
"""
return await self._repository.soft_delete(id)
async def restore(self, id: UUID) -> bool:
"""
Restore a soft-deleted record.
Args:
id: UUID of record
Returns:
True if restored
"""
return await self._repository.restore(id)
async def get_by_id_with_deleted(self, id: UUID) -> ModelType | None:
"""
Get record including soft-deleted.
Args:
id: UUID of record
Returns:
Model instance if found
"""
return await self._repository.get_by_id_with_deleted(id)
# ==================== Utility ====================
async def exists(self, id: UUID) -> bool:
"""
Check if record exists.
Args:
id: UUID to check
Returns:
True if exists
"""
return await self._repository.exists(id)
async def get_by_ids(self, ids: Sequence[UUID]) -> Sequence[ModelType]:
"""
Get multiple records by IDs.
Args:
ids: UUIDs to fetch
Returns:
Found model instances
"""
return await self._repository.get_by_ids(ids)
# src/app/items/service.py
from uuid import UUID
from app.core.service import BaseService
from app.exceptions import ConflictError, NotFoundError
from app.items.models import Item
from app.items.repository import ItemRepository
from app.items.schemas import ItemCreate, ItemUpdate
class ItemService(BaseService[Item, ItemCreate, ItemUpdate]):
"""Service for Item entity with business logic."""
def __init__(self, repository: ItemRepository):
super().__init__(repository)
# Type hint for IDE support
self._repository: ItemRepository = repository
async def create(self, obj_in: ItemCreate) -> Item:
"""
Create item with duplicate name check.
Raises:
ConflictError: If item with same name exists
"""
# Business logic: check for duplicate name
existing = await self._repository.get_by_name(obj_in.name)
if existing:
raise ConflictError(
resource="Item",
field="name",
value=obj_in.name,
)
return await super().create(obj_in)
async def get_by_id_or_raise(self, id: UUID) -> Item:
"""
Get item by ID or raise NotFoundError.
Useful when you need to ensure the item exists.
Raises:
NotFoundError: If item not found
"""
item = await self.get_by_id(id)
if not item:
raise NotFoundError(resource="Item", id=id)
return item
async def update(
self,
id: UUID,
obj_in: ItemUpdate,
exclude_unset: bool = True,
) -> Item | None:
"""
Update item with duplicate name check.
Raises:
ConflictError: If new name conflicts with existing item
"""
# Business logic: check name uniqueness on update
if obj_in.name is not None:
existing = await self._repository.get_by_name(obj_in.name)
if existing and existing.id != id:
raise ConflictError(
resource="Item",
field="name",
value=obj_in.name,
)
return await super().update(id, obj_in, exclude_unset)
async def get_by_id_or_raise(self, id: UUID) -> ModelType:
"""Get record or raise NotFoundError."""
instance = await self.get_by_id(id)
if not instance:
raise NotFoundError(resource=self._model_name, id=id)
return instance
async def create(self, obj_in: CreateSchemaType) -> ModelType:
"""Create with pre-validation."""
await self._validate_create(obj_in)
return await super().create(obj_in)
async def _validate_create(self, obj_in: CreateSchemaType) -> None:
"""Override in subclass to add validation logic."""
pass
class OrderService(BaseService[Order, OrderCreate, OrderUpdate]):
def __init__(
self,
repository: OrderRepository,
item_repository: ItemRepository,
user_repository: UserRepository,
):
super().__init__(repository)
self._item_repo = item_repository
self._user_repo = user_repository
async def create(self, obj_in: OrderCreate) -> Order:
# Validate user exists
user = await self._user_repo.get_by_id(obj_in.user_id)
if not user:
raise NotFoundError(resource="User", id=obj_in.user_id)
# Validate all items exist
items = await self._item_repo.get_by_ids(obj_in.item_ids)
if len(items) != len(obj_in.item_ids):
raise ValidationError("Some items not found")
return await super().create(obj_in)
class ItemService(BaseService[Item, ItemCreate, ItemUpdate]):
async def create(self, obj_in: ItemCreate) -> Item:
item = await super().create(obj_in)
await self._on_created(item)
return item
async def _on_created(self, item: Item) -> None:
"""Hook called after item creation."""
# Send notification, update cache, emit event, etc.
logger.info(f"Item created: {item.id}")
By default, each repository method commits its transaction. For complex operations spanning multiple writes, consider transaction management:
# In repository, add a method that doesn't commit:
async def create_no_commit(self, obj_in: CreateSchemaType) -> ModelType:
data = obj_in.model_dump()
instance = self._model(**data)
self._session.add(instance)
await self._session.flush() # Get ID without commit
return instance
# In service:
async def create_order_with_items(self, order: OrderCreate) -> Order:
async with self._session.begin(): # Transaction
order = await self._order_repo.create_no_commit(order)
for item in order.items:
await self._order_item_repo.create_no_commit(item)
# Commits on exit
return order
Weekly Installs
1
Repository
GitHub Stars
3
First Seen
1 day ago
Security Audits
Gen Agent Trust HubPassSocketFailSnykPass
Installed on
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1
Lark Drive API 使用指南:飞书云文档、Wiki、表格 Token 处理与文件管理
23,400 周安装