npx skills add https://github.com/lobbi-docs/claude --skill 'Beanie ODM'此技能提供了使用 Beanie ODM 与异步 Motor 驱动进行 MongoDB 集成的模式,专为 FastAPI 应用程序优化。
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from app.domains.users.models import User
from app.domains.products.models import Product
async def init_database(settings: Settings):
client = AsyncIOMotorClient(settings.mongodb_url)
await init_beanie(
database=client[settings.database_name],
document_models=[
User,
Product,
# 添加所有文档模型
]
)
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
mongodb_url: str = "mongodb://localhost:27017"
database_name: str = "app_db"
class Config:
env_file = ".env"
from beanie import Document, Indexed
from pydantic import Field, EmailStr
from datetime import datetime
from typing import Optional
class User(Document):
email: Indexed(EmailStr, unique=True)
name: str
hashed_password: str
is_active: bool = True
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class Settings:
name = "users" # 集合名称
use_state_management = True
class Config:
json_schema_extra = {
"example": {
"email": "user@example.com",
"name": "John Doe"
}
}
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from beanie import Document, Link, BackLink
from typing import List, Optional
class Author(Document):
name: str
books: List[BackLink["Book"]] = Field(original_field="author")
class Settings:
name = "authors"
class Book(Document):
title: str
author: Link[Author]
categories: List[Link["Category"]] = []
class Settings:
name = "books"
class Category(Document):
name: str
books: List[BackLink[Book]] = Field(original_field="categories")
class Settings:
name = "categories"
from beanie import Document
from pydantic import BaseModel
from typing import List
class Address(BaseModel):
street: str
city: str
country: str
postal_code: str
class Contact(BaseModel):
type: str # "email", "phone"
value: str
is_primary: bool = False
class Customer(Document):
name: str
addresses: List[Address] = []
contacts: List[Contact] = []
class Settings:
name = "customers"
# 创建
user = User(email="user@example.com", name="John")
await user.insert()
# 带验证的创建
user = await User.insert_one(
User(email="user@example.com", name="John")
)
# 通过 ID 读取
user = await User.get(user_id)
# 带过滤器的读取
users = await User.find(User.is_active == True).to_list()
# 更新
user.name = "Jane"
await user.save()
# 部分更新
await user.set({User.name: "Jane", User.updated_at: datetime.utcnow()})
# 删除
await user.delete()
from beanie.operators import In, RegEx, And, Or
# 使用操作符查找
active_users = await User.find(
And(
User.is_active == True,
User.created_at >= start_date
)
).to_list()
# 正则表达式搜索
users = await User.find(
RegEx(User.name, "^John", options="i")
).to_list()
# In 操作符
users = await User.find(
In(User.email, ["a@test.com", "b@test.com"])
).to_list()
# 分页
users = await User.find_all().skip(20).limit(10).to_list()
# 排序
users = await User.find_all().sort(-User.created_at).to_list()
# 投影(选择特定字段)
users = await User.find_all().project(UserSummary).to_list()
from beanie import PydanticObjectId
class UserStats(BaseModel):
total_users: int
active_users: int
avg_age: float
# 聚合管道
pipeline = [
{"$match": {"is_active": True}},
{"$group": {
"_id": None,
"total": {"$sum": 1},
"avg_age": {"$avg": "$age"}
}}
]
result = await User.aggregate(pipeline).to_list()
# 使用 Beanie 聚合
from beanie.odm.queries.aggregation import AggregationQuery
stats = await User.find(User.is_active == True).aggregate([
{"$group": {
"_id": "$department",
"count": {"$sum": 1}
}}
]).to_list()
from beanie import Document, Indexed
from pymongo import IndexModel, ASCENDING, DESCENDING, TEXT
class Product(Document):
# 单字段索引
sku: Indexed(str, unique=True)
# 在 Settings 中定义的复合索引
name: str
category: str
price: float
description: str
class Settings:
name = "products"
indexes = [
# 复合索引
IndexModel(
[("category", ASCENDING), ("price", DESCENDING)],
name="category_price_idx"
),
# 文本索引
IndexModel(
[("name", TEXT), ("description", TEXT)],
name="search_idx"
),
# TTL 索引
IndexModel(
[("expires_at", ASCENDING)],
expireAfterSeconds=0,
name="ttl_idx"
)
]
from beanie import Document
from motor.motor_asyncio import AsyncIOMotorClientSession
async def transfer_funds(
from_account_id: str,
to_account_id: str,
amount: float,
session: AsyncIOMotorClientSession
):
async with await session.start_transaction():
from_account = await Account.get(from_account_id, session=session)
to_account = await Account.get(to_account_id, session=session)
if from_account.balance < amount:
raise ValueError("Insufficient funds")
await from_account.set(
{Account.balance: from_account.balance - amount},
session=session
)
await to_account.set(
{Account.balance: to_account.balance + amount},
session=session
)
from typing import List, Optional
from beanie import PydanticObjectId
class UserService:
async def get_by_id(self, user_id: str) -> Optional[User]:
return await User.get(PydanticObjectId(user_id))
async def get_by_email(self, email: str) -> Optional[User]:
return await User.find_one(User.email == email)
async def get_all(
self,
skip: int = 0,
limit: int = 100,
is_active: Optional[bool] = None
) -> List[User]:
query = User.find_all()
if is_active is not None:
query = User.find(User.is_active == is_active)
return await query.skip(skip).limit(limit).to_list()
async def create(self, data: UserCreate) -> User:
user = User(**data.model_dump())
await user.insert()
return user
async def update(self, user_id: str, data: UserUpdate) -> Optional[User]:
user = await self.get_by_id(user_id)
if not user:
return None
update_data = data.model_dump(exclude_unset=True)
update_data["updated_at"] = datetime.utcnow()
await user.set(update_data)
return user
有关详细模式和迁移指南:
references/migrations.md - 数据库迁移策略references/performance.md - 查询优化技巧references/relationships.md - Link 和 BackLink 模式examples/ 中的工作示例:
examples/document_models.py - 完整的文档定义examples/aggregations.py - 聚合管道示例examples/service.py - 服务层实现每周安装次数
–
代码仓库
GitHub 星标数
9
首次出现时间
–
安全审计
This skill provides patterns for MongoDB integration using Beanie ODM with async Motor driver, optimized for FastAPI applications.
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from app.domains.users.models import User
from app.domains.products.models import Product
async def init_database(settings: Settings):
client = AsyncIOMotorClient(settings.mongodb_url)
await init_beanie(
database=client[settings.database_name],
document_models=[
User,
Product,
# Add all document models
]
)
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
mongodb_url: str = "mongodb://localhost:27017"
database_name: str = "app_db"
class Config:
env_file = ".env"
from beanie import Document, Indexed
from pydantic import Field, EmailStr
from datetime import datetime
from typing import Optional
class User(Document):
email: Indexed(EmailStr, unique=True)
name: str
hashed_password: str
is_active: bool = True
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class Settings:
name = "users" # Collection name
use_state_management = True
class Config:
json_schema_extra = {
"example": {
"email": "user@example.com",
"name": "John Doe"
}
}
from beanie import Document, Link, BackLink
from typing import List, Optional
class Author(Document):
name: str
books: List[BackLink["Book"]] = Field(original_field="author")
class Settings:
name = "authors"
class Book(Document):
title: str
author: Link[Author]
categories: List[Link["Category"]] = []
class Settings:
name = "books"
class Category(Document):
name: str
books: List[BackLink[Book]] = Field(original_field="categories")
class Settings:
name = "categories"
from beanie import Document
from pydantic import BaseModel
from typing import List
class Address(BaseModel):
street: str
city: str
country: str
postal_code: str
class Contact(BaseModel):
type: str # "email", "phone"
value: str
is_primary: bool = False
class Customer(Document):
name: str
addresses: List[Address] = []
contacts: List[Contact] = []
class Settings:
name = "customers"
# Create
user = User(email="user@example.com", name="John")
await user.insert()
# Create with validation
user = await User.insert_one(
User(email="user@example.com", name="John")
)
# Read by ID
user = await User.get(user_id)
# Read with filter
users = await User.find(User.is_active == True).to_list()
# Update
user.name = "Jane"
await user.save()
# Partial update
await user.set({User.name: "Jane", User.updated_at: datetime.utcnow()})
# Delete
await user.delete()
from beanie.operators import In, RegEx, And, Or
# Find with operators
active_users = await User.find(
And(
User.is_active == True,
User.created_at >= start_date
)
).to_list()
# Regex search
users = await User.find(
RegEx(User.name, "^John", options="i")
).to_list()
# In operator
users = await User.find(
In(User.email, ["a@test.com", "b@test.com"])
).to_list()
# Pagination
users = await User.find_all().skip(20).limit(10).to_list()
# Sorting
users = await User.find_all().sort(-User.created_at).to_list()
# Projection (select specific fields)
users = await User.find_all().project(UserSummary).to_list()
from beanie import PydanticObjectId
class UserStats(BaseModel):
total_users: int
active_users: int
avg_age: float
# Aggregation pipeline
pipeline = [
{"$match": {"is_active": True}},
{"$group": {
"_id": None,
"total": {"$sum": 1},
"avg_age": {"$avg": "$age"}
}}
]
result = await User.aggregate(pipeline).to_list()
# Using Beanie aggregation
from beanie.odm.queries.aggregation import AggregationQuery
stats = await User.find(User.is_active == True).aggregate([
{"$group": {
"_id": "$department",
"count": {"$sum": 1}
}}
]).to_list()
from beanie import Document, Indexed
from pymongo import IndexModel, ASCENDING, DESCENDING, TEXT
class Product(Document):
# Single field index
sku: Indexed(str, unique=True)
# Compound index defined in Settings
name: str
category: str
price: float
description: str
class Settings:
name = "products"
indexes = [
# Compound index
IndexModel(
[("category", ASCENDING), ("price", DESCENDING)],
name="category_price_idx"
),
# Text index
IndexModel(
[("name", TEXT), ("description", TEXT)],
name="search_idx"
),
# TTL index
IndexModel(
[("expires_at", ASCENDING)],
expireAfterSeconds=0,
name="ttl_idx"
)
]
from beanie import Document
from motor.motor_asyncio import AsyncIOMotorClientSession
async def transfer_funds(
from_account_id: str,
to_account_id: str,
amount: float,
session: AsyncIOMotorClientSession
):
async with await session.start_transaction():
from_account = await Account.get(from_account_id, session=session)
to_account = await Account.get(to_account_id, session=session)
if from_account.balance < amount:
raise ValueError("Insufficient funds")
await from_account.set(
{Account.balance: from_account.balance - amount},
session=session
)
await to_account.set(
{Account.balance: to_account.balance + amount},
session=session
)
from typing import List, Optional
from beanie import PydanticObjectId
class UserService:
async def get_by_id(self, user_id: str) -> Optional[User]:
return await User.get(PydanticObjectId(user_id))
async def get_by_email(self, email: str) -> Optional[User]:
return await User.find_one(User.email == email)
async def get_all(
self,
skip: int = 0,
limit: int = 100,
is_active: Optional[bool] = None
) -> List[User]:
query = User.find_all()
if is_active is not None:
query = User.find(User.is_active == is_active)
return await query.skip(skip).limit(limit).to_list()
async def create(self, data: UserCreate) -> User:
user = User(**data.model_dump())
await user.insert()
return user
async def update(self, user_id: str, data: UserUpdate) -> Optional[User]:
user = await self.get_by_id(user_id)
if not user:
return None
update_data = data.model_dump(exclude_unset=True)
update_data["updated_at"] = datetime.utcnow()
await user.set(update_data)
return user
For detailed patterns and migration guides:
references/migrations.md - Database migration strategiesreferences/performance.md - Query optimization tipsreferences/relationships.md - Link and BackLink patternsWorking examples in examples/:
examples/document_models.py - Complete document definitionsexamples/aggregations.py - Aggregation pipeline examplesexamples/service.py - Service layer implementationWeekly Installs
–
Repository
GitHub Stars
9
First Seen
–
Security Audits
agent-browser 浏览器自动化工具 - Vercel Labs 命令行网页操作与测试
147,400 周安装