python-backend by yonatangross/orchestkit
npx skills add https://github.com/yonatangross/orchestkit --skill python-backend使用 asyncio、FastAPI、SQLAlchemy 2.0 和连接池构建生产级 Python 后端的模式。每个类别在 rules/ 目录下都有独立的规则文件,可按需加载。
| 类别 | 规则数量 | 影响程度 | 使用场景 |
|---|---|---|---|
| Asyncio | 3 | 高 | TaskGroup、结构化并发、取消处理 |
| FastAPI | 3 | 高 | 依赖项、中间件、后台任务 |
| SQLAlchemy | 3 | 高 | 异步会话、关系、迁移 |
| 连接池 | 3 | 中 | 数据库连接池、HTTP 会话、调优 |
总计:4 个类别共 12 条规则
# FastAPI + SQLAlchemy 异步会话
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
# 带超时的 Asyncio TaskGroup
async def fetch_all(urls: list[str]) -> list[dict]:
async with asyncio.timeout(30):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
return [t.result() for t in tasks]
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
使用结构化并发、TaskGroup 和 Python 3.11+ 特性的现代 Python asyncio 模式。
gather(),提供结构化并发和自动取消asyncio.timeout() 上下文管理器,用于组合式超时控制except* 配合 ExceptionGroup 处理多个任务失败asyncio.to_thread() 用于将同步代码桥接到异步环境| 决策点 | 推荐方案 |
|---|---|
| 任务生成 | 使用 TaskGroup 而非 gather() |
| 超时控制 | 使用 asyncio.timeout() 上下文管理器 |
| 并发限制 | 使用 asyncio.Semaphore |
| 同步桥接 | 使用 asyncio.to_thread() |
| 取消处理 | 始终重新抛出 CancelledError |
适用于生产环境的 FastAPI 模式,涵盖生命周期、依赖项、中间件和设置。
asynccontextmanager 的 Lifespan 用于启动/关闭时的资源管理Depends() 实现的 依赖注入.env 和字段验证的 Pydantic Settings| 决策点 | 推荐方案 |
|---|---|
| 生命周期 | 使用 asynccontextmanager(而非事件) |
| 依赖项 | 基于类的服务配合依赖注入 |
| 设置 | 使用 .env 文件的 Pydantic Settings |
| 响应 | 使用 ORJSONResponse 以获得性能优势 |
| 健康检查 | 检查所有关键依赖项 |
使用 SQLAlchemy 2.0、AsyncSession 并与 FastAPI 集成的异步数据库模式。
expire_on_commit=Falselazy="raise" 以防止意外的 N+1 查询selectinload 进行集合的预加载| 决策点 | 推荐方案 |
|---|---|
| 会话作用域 | 每个请求一个 AsyncSession |
| 延迟加载 | lazy="raise" + 显式加载 |
| 预加载 | 对集合使用 selectinload |
| expire_on_commit | 设为 False(防止延迟加载错误) |
| 连接池 | 设置 pool_pre_ping=True |
为高性能异步 Python 应用程序提供的数据库和 HTTP 连接池。
pool_size、max_overflow、pool_pre_ping 的 SQLAlchemy 连接池min_size/max_size 和连接生命周期的 直接 asyncpg 连接池TCPConnector 限制和 DNS 缓存的 aiohttp 会话pool_size = (concurrent_requests / avg_queries_per_request) * 1.5
# 切勿在新代码中使用 gather() - 缺乏结构化并发
# 切勿吞没 CancelledError - 这会破坏 TaskGroup 和超时机制
# 切勿使用同步调用阻塞事件循环(如 time.sleep, requests.get)
# 切勿使用全局可变状态来管理数据库会话
# 切勿跳过依赖注入(在路由中直接创建会话)
# 切勿在任务间共享 AsyncSession(会导致竞态条件)
# 切勿在异步代码中使用同步 Session(会阻塞事件循环)
# 切勿为每个请求创建 engine/连接池
# 切勿忘记在关闭时关闭连接池
ork:architecture-patterns - 整洁架构和分层分离ork:async-jobs - 用于后台处理的 Celery/ARQstreaming-api-patterns - SSE/WebSocket 异步模式ork:database-patterns - 数据库模式设计每周安装量
81
代码仓库
GitHub 星标数
132
首次出现
2026年2月14日
安全审计
已安装于
gemini-cli75
codex75
github-copilot75
opencode75
cursor73
kimi-cli70
Patterns for building production Python backends with asyncio, FastAPI, SQLAlchemy 2.0, and connection pooling. Each category has individual rule files in rules/ loaded on-demand.
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Asyncio | 3 | HIGH | TaskGroup, structured concurrency, cancellation handling |
| FastAPI | 3 | HIGH | Dependencies, middleware, background tasks |
| SQLAlchemy | 3 | HIGH | Async sessions, relationships, migrations |
| Pooling | 3 | MEDIUM | Database pools, HTTP sessions, tuning |
Total: 12 rules across 4 categories
# FastAPI + SQLAlchemy async session
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
# Asyncio TaskGroup with timeout
async def fetch_all(urls: list[str]) -> list[dict]:
async with asyncio.timeout(30):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
return [t.result() for t in tasks]
Modern Python asyncio patterns using structured concurrency, TaskGroup, and Python 3.11+ features.
gather() with structured concurrency and auto-cancellationasyncio.timeout() context manager for composable timeoutsexcept* with ExceptionGroup for handling multiple task failuresasyncio.to_thread() for bridging sync code to async| Decision | Recommendation |
|---|---|
| Task spawning | TaskGroup not gather() |
| Timeouts | asyncio.timeout() context manager |
| Concurrency limit | asyncio.Semaphore |
| Sync bridge | asyncio.to_thread() |
| Cancellation | Always re-raise CancelledError |
Production-ready FastAPI patterns for lifespan, dependencies, middleware, and settings.
asynccontextmanager for startup/shutdown resource managementDepends().env and field validation| Decision | Recommendation |
|---|---|
| Lifespan | asynccontextmanager (not events) |
| Dependencies | Class-based services with DI |
| Settings | Pydantic Settings with .env |
| Response | ORJSONResponse for performance |
| Health | Check all critical dependencies |
Async database patterns with SQLAlchemy 2.0, AsyncSession, and FastAPI integration.
expire_on_commit=Falselazy="raise" on relationships to prevent accidental N+1 queriesselectinload for eager loading collections| Decision | Recommendation |
|---|---|
| Session scope | One AsyncSession per request |
| Lazy loading | lazy="raise" + explicit loads |
| Eager loading | selectinload for collections |
| expire_on_commit | False (prevents lazy load errors) |
| Pool | pool_pre_ping=True |
Database and HTTP connection pooling for high-performance async Python applications.
pool_size, max_overflow, pool_pre_pingmin_size/max_size and connection lifecycleTCPConnector limits and DNS cachingpool_size = (concurrent_requests / avg_queries_per_request) * 1.5
# NEVER use gather() for new code - no structured concurrency
# NEVER swallow CancelledError - breaks TaskGroup and timeout
# NEVER block the event loop with sync calls (time.sleep, requests.get)
# NEVER use global mutable state for db sessions
# NEVER skip dependency injection (create sessions in routes)
# NEVER share AsyncSession across tasks (race condition)
# NEVER use sync Session in async code (blocks event loop)
# NEVER create engine/pool per request
# NEVER forget to close pools on shutdown
ork:architecture-patterns - Clean architecture and layer separationork:async-jobs - Celery/ARQ for background processingstreaming-api-patterns - SSE/WebSocket async patternsork:database-patterns - Database schema designWeekly Installs
81
Repository
GitHub Stars
132
First Seen
Feb 14, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
gemini-cli75
codex75
github-copilot75
opencode75
cursor73
kimi-cli70
Firestore 基础入门指南 - 配置、安全规则、SDK 使用与索引优化
1,300 周安装