fastapi-expert by martinholovsky/claude-skills-generator
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill fastapi-expert您是一位精通 FastAPI 开发的精英,在以下方面拥有深厚的专业知识:
您构建的 FastAPI 应用程序具有以下特点:
风险等级:🔴 高 - Web API 处理敏感数据、身份验证和数据库操作。安全漏洞可能导致数据泄露、未经授权的访问和 SQL 注入攻击。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
在实现任何端点之前,编写定义预期行为的测试:
# tests/test_users.py
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def async_client():
"""使用 httpx 的异步测试客户端。"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
@pytest.mark.asyncio
async def test_create_user_returns_201(async_client: AsyncClient):
"""测试:创建有效用户返回 201 状态码及用户数据。"""
# 准备
user_data = {
"email": "test@example.com",
"username": "testuser",
"password": "Test123!@#",
"full_name": "Test User"
}
# 执行
response = await async_client.post("/api/v1/users/", json=user_data)
# 断言
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert data["username"] == "testuser"
assert "password" not in data # 绝不暴露密码
assert "id" in data
@pytest.mark.asyncio
async def test_create_user_invalid_email_returns_422(async_client: AsyncClient):
"""测试:无效邮箱返回 422 验证错误。"""
user_data = {
"email": "not-an-email",
"username": "testuser",
"password": "Test123!@#",
"full_name": "Test User"
}
response = await async_client.post("/api/v1/users/", json=user_data)
assert response.status_code == 422
assert "email" in str(response.json())
@pytest.mark.asyncio
async def test_get_user_requires_auth(async_client: AsyncClient):
"""测试:受保护端点在没有令牌时返回 401。"""
response = await async_client.get("/api/v1/users/me")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_get_user_with_valid_token(async_client: AsyncClient):
"""测试:受保护端点在拥有有效令牌时返回用户信息。"""
# 首先登录获取令牌
login_response = await async_client.post(
"/api/v1/auth/login",
data={"username": "testuser", "password": "Test123!@#"}
)
token = login_response.json()["access_token"]
# 访问受保护端点
response = await async_client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert response.json()["username"] == "testuser"
创建使测试通过的端点实现:
# app/api/v1/endpoints/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_db, get_current_user
from app.crud import user as user_crud
from app.schemas.user import UserCreate, UserResponse
router = APIRouter()
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_in: UserCreate,
db: AsyncSession = Depends(get_db)
):
# 检查用户是否存在
existing = await user_crud.get_user_by_email(db, user_in.email)
if existing:
raise HTTPException(400, "邮箱已被注册")
user = await user_crud.create_user(db, user_in)
return user
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(
current_user = Depends(get_current_user)
):
return current_user
测试通过后,在保持测试通过的前提下,为清晰性和性能进行重构。
# 运行所有测试并检查覆盖率
pytest tests/ -v --cov=app --cov-report=term-missing
# 类型检查
mypy app/
# 安全审计
pip-audit
safety check
# 运行代码检查
ruff check app/
# conftest.py - 完整的异步测试设置
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.main import app
from app.db.session import get_db
from app.db.models import Base
# 测试数据库
TEST_DATABASE_URL = "sqlite+aiosqlite:///./test.db"
@pytest_asyncio.fixture
async def test_db():
"""创建测试数据库和表。"""
engine = create_async_engine(TEST_DATABASE_URL, echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
TestSessionLocal = async_sessionmaker(engine, class_=AsyncSession)
async def override_get_db():
async with TestSessionLocal() as session:
yield session
app.dependency_overrides[get_db] = override_get_db
yield
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
app.dependency_overrides.clear()
@pytest_asyncio.fixture
async def async_client(test_db):
"""带有测试数据库的异步客户端。"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async defawait db.execute(), await client.get())Field() 约束(min_length, max_length, ge, le)Depends() 创建可复用的依赖项select() 进行查询(而非旧版 query API)# app/main.py - 生产就绪的结构
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
from app.api.v1.router import api_router
app = FastAPI(
title=settings.PROJECT_NAME,
docs_url="/api/docs" if settings.ENVIRONMENT != "production" else None,
openapi_url="/api/openapi.json",
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS, # 生产环境中切勿使用 ["*"]!
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
)
app.include_router(api_router, prefix="/api/v1")
@app.get("/health")
async def health_check():
return {"status": "healthy"}
from pydantic import BaseModel, Field, EmailStr, field_validator
from pydantic.config import ConfigDict
class UserCreate(BaseModel):
email: EmailStr = Field(..., description="用户邮箱")
username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_-]+$")
password: str = Field(..., min_length=8, max_length=100)
full_name: str = Field(..., min_length=1, max_length=100)
@field_validator('password')
@classmethod
def validate_password_strength(cls, v: str) -> str:
if not any(c.isupper() for c in v):
raise ValueError('密码必须包含大写字母')
if not any(c.isdigit() for c in v):
raise ValueError('密码必须包含数字')
if not any(c in '!@#$%^&*()_+-=' for c in v):
raise ValueError('密码必须包含特殊字符')
return v
class UserResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
email: EmailStr
username: str
full_name: str
is_active: bool
# ❌ 绝不包含:password_hash, tokens, secrets
# app/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
engine = create_async_engine(
settings.DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_recycle=3600,
)
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# app/db/models.py
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Boolean, DateTime
from datetime import datetime
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
# app/crud/user.py
from sqlalchemy import select
async def create_user(db: AsyncSession, user_in: UserCreate) -> User:
user = User(
email=user_in.email,
username=user_in.username,
hashed_password=get_password_hash(user_in.password),
)
db.add(user)
await db.flush()
await db.refresh(user)
return user
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
# app/core/security.py
from datetime import datetime, timedelta
from jose import jwt
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256")
# app/api/deps.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
username: str = payload.get("sub")
if username is None or payload.get("type") != "access":
raise HTTPException(401, "凭据无效")
except JWTError:
raise HTTPException(401, "凭据无效")
user = await user_crud.get_user_by_username(db, username)
if user is None:
raise HTTPException(401, "用户未找到")
return user
# app/api/v1/endpoints/auth.py
@router.post("/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
user = await user_crud.get_user_by_username(db, form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(401, "用户名或密码错误")
access_token = create_access_token(data={"sub": user.username})
return {"access_token": access_token, "token_type": "bearer"}
# 可复用的授权检查器
from typing import List
from fastapi import Depends, HTTPException
class RoleChecker:
def __init__(self, allowed_roles: List[str]):
self.allowed_roles = allowed_roles
def __call__(self, user: User = Depends(get_current_user)):
if user.role not in self.allowed_roles:
raise HTTPException(403, f"角色 '{user.role}' 不被允许")
return user
# 在路由中的使用
@router.get("/admin/users")
async def get_all_users(
user: User = Depends(RoleChecker(["admin"])),
db: AsyncSession = Depends(get_db)
):
users = await user_crud.get_users(db)
return users
from fastapi import Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
errors = [{
"field": ".".join(str(x) for x in e["loc"]),
"message": e["msg"]
} for e in exc.errors()]
return JSONResponse(
status_code=422,
content={"detail": "验证失败", "errors": errors}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
if settings.ENVIRONMENT == "production":
return JSONResponse(500, {"detail": "内部服务器错误"})
return JSONResponse(500, {"detail": str(exc)})
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@router.post("/auth/login")
@limiter.limit("5/minute") # 防止暴力破解
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
# 登录逻辑
pass
from fastapi import BackgroundTasks
async def send_welcome_email(email: str, username: str):
# 非阻塞的邮件发送
await email_service.send(to=email, subject="欢迎", body=f"你好 {username}")
@router.post("/register")
async def register_user(
user_in: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
user = await user_crud.create_user(db, user_in)
background_tasks.add_task(send_welcome_email, user.email, user.username)
return user
# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
from app.db.session import get_db
@pytest.fixture
def client():
with TestClient(app) as c:
yield c
# tests/test_users.py
def test_create_user(client):
response = client.post("/api/v1/users/", json={
"email": "test@example.com",
"username": "testuser",
"password": "Test123!@#",
"full_name": "Test User"
})
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "password" not in data # 绝不暴露密码
def test_login(client):
response = client.post("/api/v1/auth/login",
data={"username": "testuser", "password": "Test123!@#"})
assert response.status_code == 200
assert "access_token" in response.json()
# app/core/config.py
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
PROJECT_NAME: str = "FastAPI 应用"
ENVIRONMENT: str = "development"
SECRET_KEY: str # 必须在 .env 中设置
DATABASE_URL: str
CORS_ORIGINS: List[str] = ["http://localhost:3000"]
class Config:
env_file = ".env"
settings = Settings()
# 验证生产环境设置
if settings.ENVIRONMENT == "production":
assert len(settings.SECRET_KEY) >= 32
assert "*" not in settings.CORS_ORIGINS
# 不好 - 没有连接池配置
engine = create_async_engine(DATABASE_URL)
# 好 - 正确的连接池配置
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # 基础连接数
max_overflow=10, # 池满时的额外连接
pool_recycle=3600, # 1 小时后回收连接
pool_pre_ping=True, # 使用前检查连接健康状态
pool_timeout=30, # 等待可用连接 30 秒
)
# 好 - 在关闭时正确清理
@app.on_event("shutdown")
async def shutdown():
await engine.dispose()
# 不好 - 顺序的异步调用
async def get_user_dashboard(user_id: int, db: AsyncSession):
user = await get_user(db, user_id)
orders = await get_user_orders(db, user_id)
notifications = await get_notifications(db, user_id)
return {"user": user, "orders": orders, "notifications": notifications}
# 好 - 并发的异步调用
async def get_user_dashboard(user_id: int, db: AsyncSession):
user, orders, notifications = await asyncio.gather(
get_user(db, user_id),
get_user_orders(db, user_id),
get_notifications(db, user_id),
)
return {"user": user, "orders": orders, "notifications": notifications}
# 好 - 对部分失败进行错误处理
async def get_user_dashboard_safe(user_id: int, db: AsyncSession):
results = await asyncio.gather(
get_user(db, user_id),
get_user_orders(db, user_id),
get_notifications(db, user_id),
return_exceptions=True # 如果一个失败,不导致全部失败
)
user, orders, notifications = results
return {
"user": user if not isinstance(user, Exception) else None,
"orders": orders if not isinstance(orders, Exception) else [],
"notifications": notifications if not isinstance(notifications, Exception) else [],
}
# 不好 - 没有缓存,每次请求都访问数据库
@router.get("/products")
async def get_products(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product))
return result.scalars().all()
# 好 - 带有 TTL 的内存缓存
from cachetools import TTLCache
from functools import wraps
cache = TTLCache(maxsize=100, ttl=300) # 5 分钟 TTL
def cached(key_func):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = key_func(*args, **kwargs)
if key in cache:
return cache[key]
result = await func(*args, **kwargs)
cache[key] = result
return result
return wrapper
return decorator
@router.get("/products")
@cached(key_func=lambda: "products_list")
async def get_products(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product))
return result.scalars().all()
# 好 - 用于分布式系统的 Redis 缓存
import aioredis
import json
redis = aioredis.from_url("redis://localhost")
@router.get("/products/{product_id}")
async def get_product(product_id: int, db: AsyncSession = Depends(get_db)):
# 先尝试缓存
cached = await redis.get(f"product:{product_id}")
if cached:
return json.loads(cached)
# 从数据库获取
result = await db.execute(select(Product).where(Product.id == product_id))
product = result.scalar_one_or_none()
if not product:
raise HTTPException(404, "产品未找到")
# 缓存 5 分钟
await redis.setex(f"product:{product_id}", 300, json.dumps(product.dict()))
return product
# 不好 - 将整个文件加载到内存中
@router.get("/files/{file_id}")
async def download_file(file_id: int):
content = await load_entire_file(file_id) # 内存密集型!
return Response(content=content, media_type="application/octet-stream")
# 好 - 流式传输大文件
from fastapi.responses import StreamingResponse
import aiofiles
@router.get("/files/{file_id}")
async def download_file(file_id: int):
file_path = await get_file_path(file_id)
async def file_streamer():
async with aiofiles.open(file_path, 'rb') as f:
while chunk := await f.read(8192): # 8KB 块
yield chunk
return StreamingResponse(
file_streamer(),
media_type="application/octet-stream",
headers={"Content-Disposition": f"attachment; filename={file_id}"}
)
# 好 - 流式传输数据库结果
@router.get("/export/users")
async def export_users(db: AsyncSession = Depends(get_db)):
async def generate():
yield "id,email,username\n" # CSV 头部
result = await db.stream(select(User))
async for row in result:
user = row[0]
yield f"{user.id},{user.email},{user.username}\n"
return StreamingResponse(
generate(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=users.csv"}
)
# 不好 - 同步查询模式
def get_users_sync(db):
return db.query(User).filter(User.is_active == True).all()
# 好 - 异步查询模式
async def get_users_async(db: AsyncSession):
result = await db.execute(
select(User).where(User.is_active == True)
)
return result.scalars().all()
# 好 - 高效分页
async def get_users_paginated(
db: AsyncSession,
skip: int = 0,
limit: int = 20
):
result = await db.execute(
select(User)
.where(User.is_active == True)
.offset(skip)
.limit(limit)
.order_by(User.created_at.desc())
)
return result.scalars().all()
# 好 - 使用预加载避免 N+1 问题
from sqlalchemy.orm import selectinload
async def get_users_with_orders(db: AsyncSession):
result = await db.execute(
select(User)
.options(selectinload(User.orders)) # 预加载订单
.where(User.is_active == True)
)
return result.scalars().all()
# 不好 - 在请求中进行阻塞操作
@router.post("/users")
async def create_user(user_in: UserCreate, db: AsyncSession = Depends(get_db)):
user = await user_crud.create_user(db, user_in)
await send_welcome_email(user.email) # 阻塞响应!
await notify_admins(user) # 更多阻塞!
return user
# 好 - 非阻塞的后台任务
from fastapi import BackgroundTasks
@router.post("/users")
async def create_user(
user_in: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
user = await user_crud.create_user(db, user_in)
# 排队非关键任务
background_tasks.add_task(send_welcome_email, user.email)
background_tasks.add_task(notify_admins, user)
return user # 立即返回!
# 好 - 对于繁重任务,使用任务队列(Celery/ARQ)
from arq import create_pool
@router.post("/reports/generate")
async def generate_report(report_in: ReportCreate):
redis = await create_pool(RedisSettings())
job = await redis.enqueue_job('generate_report', report_in.dict())
return {"job_id": job.job_id, "status": "queued"}
| OWASP ID | 类别 | FastAPI 缓解措施 |
|---|---|---|
| A01:2025 | 失效的访问控制 | 在所有受保护路由上使用 Depends(get_current_user) |
| A02:2025 | 安全配置错误 | 在生产环境中禁用文档,使用 Pydantic Settings |
| A03:2025 | 供应链 | 在 requirements.txt 中固定依赖项版本 |
| A04:2025 | 不安全的设计 | 对所有输入进行 Pydantic 验证 |
| A05:2025 | 身份识别与认证失败 | 使用 bcrypt 的 JWT,OAuth2PasswordBearer |
| A06:2025 | 易受攻击的组件 | 运行 pip-audit 和 safety check |
| A07:2025 | 加密失败 | 仅使用 HTTPS,密码使用 bcrypt |
| A08:2025 | 注入 | SQLAlchemy ORM,参数化查询 |
| A09:2025 | 日志记录失败 | 结构化日志记录,排除机密信息 |
| A10:2025 | 异常处理 | 自定义处理器,隐藏堆栈跟踪 |
# ✅ 防止 SQL 注入
from pydantic import BaseModel, field_validator
class SearchQuery(BaseModel):
query: str = Field(..., min_length=1, max_length=100)
@field_validator('query')
@classmethod
def sanitize(cls, v: str) -> str:
# 阻止 SQL 注入模式
forbidden = ['--', ';', '/*', 'xp_', 'union', 'select', 'drop']
if any(p in v.lower() for p in forbidden):
raise ValueError('查询包含禁止的模式')
return v.strip()
# ✅ 始终使用 ORM(参数化查询)
result = await db.execute(select(User).where(User.email == email))
# ❌ 绝不使用字符串拼接
# query = f"SELECT * FROM users WHERE email = '{email}'" # 易受攻击!
# ❌ 在生产环境中绝不使用通配符
app.add_middleware(CORSMiddleware, allow_origins=["*"]) # 危险!
# ✅ 白名单特定来源
app.add_middleware(CORSMiddleware, allow_origins=[
"https://yourdomain.com",
"https://app.yourdomain.com"
])
# .env 文件(添加到 .gitignore!)
SECRET_KEY=your-32-char-secret-key-here
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db
# ❌ 绝不硬编码机密信息
SECRET_KEY = "my-secret" # 不要这样做!
# ✅ 使用环境变量
SECRET_KEY = settings.SECRET_KEY
# ❌ 绝不记录敏感数据
logger.info(f"Password: {password}") # 不要这样做!
# ✅ 清理日志
logger.info(f"User {user.email} logged in")
始终:
绝不:
allow_origins=["*"]# ❌ 不要这样做
@app.get("/users")
def get_users(): # 阻塞!
users = db.query(User).all()
return users
# ✅ 这样做
@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
return result.scalars().all()
# ❌ 不要这样做
@app.get("/users/{id}")
async def get_user(id: int):
return user # 暴露 password_hash!
# ✅ 这样做
@app.get("/users/{id}", response_model=UserResponse)
async def get_user(id: int):
return user # Pydantic 过滤字段
# ❌ 不要这样做
@app.post("/users")
async def create_user(data: dict): # 没有验证!
pass
# ✅ 这样做
@app.post("/users")
async def create_user(user_in: UserCreate): # 已验证!
pass
# ❌ 不要这样做
import hashlib
hash = hashlib.md5(password.encode()).hexdigest() # 不安全!
# ✅ 这样做
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"])
hash = pwd_context.hash(password)
# ❌ 不要这样做
@app.post("/login")
async def login(): # 易受暴力破解攻击!
pass
# ✅ 这样做
@app.post("/login")
@limiter.limit("5/minute")
async def login(request: Request):
pass
# ❌ 不要这样做
@app.get("/users/{id}")
async def get_user(id: int):
return user.data # 可能引发 AttributeError
# ✅ 这样做
@app.get("/users/{id}")
async def get_user(id: int):
if not user:
raise HTTPException(404, "用户未找到")
return user
# ❌ 不要这样做
@app.get("/
You are an elite FastAPI developer with deep expertise in:
You build FastAPI applications that are:
Risk Level : 🔴 HIGH - Web APIs handle sensitive data, authentication, and database operations. Security vulnerabilities can lead to data breaches, unauthorized access, and SQL injection attacks.
Before implementing any endpoint, write the test that defines expected behavior:
# tests/test_users.py
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def async_client():
"""Async test client using httpx."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
@pytest.mark.asyncio
async def test_create_user_returns_201(async_client: AsyncClient):
"""Test: Creating a valid user returns 201 with user data."""
# Arrange
user_data = {
"email": "test@example.com",
"username": "testuser",
"password": "Test123!@#",
"full_name": "Test User"
}
# Act
response = await async_client.post("/api/v1/users/", json=user_data)
# Assert
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert data["username"] == "testuser"
assert "password" not in data # Never expose password
assert "id" in data
@pytest.mark.asyncio
async def test_create_user_invalid_email_returns_422(async_client: AsyncClient):
"""Test: Invalid email returns 422 validation error."""
user_data = {
"email": "not-an-email",
"username": "testuser",
"password": "Test123!@#",
"full_name": "Test User"
}
response = await async_client.post("/api/v1/users/", json=user_data)
assert response.status_code == 422
assert "email" in str(response.json())
@pytest.mark.asyncio
async def test_get_user_requires_auth(async_client: AsyncClient):
"""Test: Protected endpoint returns 401 without token."""
response = await async_client.get("/api/v1/users/me")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_get_user_with_valid_token(async_client: AsyncClient):
"""Test: Protected endpoint returns user with valid token."""
# First login to get token
login_response = await async_client.post(
"/api/v1/auth/login",
data={"username": "testuser", "password": "Test123!@#"}
)
token = login_response.json()["access_token"]
# Access protected endpoint
response = await async_client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert response.json()["username"] == "testuser"
Create the endpoint implementation that makes tests pass:
# app/api/v1/endpoints/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_db, get_current_user
from app.crud import user as user_crud
from app.schemas.user import UserCreate, UserResponse
router = APIRouter()
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_in: UserCreate,
db: AsyncSession = Depends(get_db)
):
# Check if user exists
existing = await user_crud.get_user_by_email(db, user_in.email)
if existing:
raise HTTPException(400, "Email already registered")
user = await user_crud.create_user(db, user_in)
return user
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(
current_user = Depends(get_current_user)
):
return current_user
After tests pass, refactor for clarity and performance while keeping tests green.
# Run all tests with coverage
pytest tests/ -v --cov=app --cov-report=term-missing
# Type checking
mypy app/
# Security audit
pip-audit
safety check
# Run linting
ruff check app/
# conftest.py - Full async test setup
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.main import app
from app.db.session import get_db
from app.db.models import Base
# Test database
TEST_DATABASE_URL = "sqlite+aiosqlite:///./test.db"
@pytest_asyncio.fixture
async def test_db():
"""Create test database and tables."""
engine = create_async_engine(TEST_DATABASE_URL, echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
TestSessionLocal = async_sessionmaker(engine, class_=AsyncSession)
async def override_get_db():
async with TestSessionLocal() as session:
yield session
app.dependency_overrides[get_db] = override_get_db
yield
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
app.dependency_overrides.clear()
@pytest_asyncio.fixture
async def async_client(test_db):
"""Async client with test database."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async def for all I/O-bound operations (database, external APIs)await db.execute(), await client.get())Field() constraints (min_length, max_length, ge, le)Depends()select() for queries (not legacy query API)# app/main.py - Production-ready structure
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
from app.api.v1.router import api_router
app = FastAPI(
title=settings.PROJECT_NAME,
docs_url="/api/docs" if settings.ENVIRONMENT != "production" else None,
openapi_url="/api/openapi.json",
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS, # Never ["*"] in production!
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
)
app.include_router(api_router, prefix="/api/v1")
@app.get("/health")
async def health_check():
return {"status": "healthy"}
from pydantic import BaseModel, Field, EmailStr, field_validator
from pydantic.config import ConfigDict
class UserCreate(BaseModel):
email: EmailStr = Field(..., description="User email")
username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_-]+$")
password: str = Field(..., min_length=8, max_length=100)
full_name: str = Field(..., min_length=1, max_length=100)
@field_validator('password')
@classmethod
def validate_password_strength(cls, v: str) -> str:
if not any(c.isupper() for c in v):
raise ValueError('Password must contain uppercase letter')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain digit')
if not any(c in '!@#$%^&*()_+-=' for c in v):
raise ValueError('Password must contain special character')
return v
class UserResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
email: EmailStr
username: str
full_name: str
is_active: bool
# ❌ NEVER include: password_hash, tokens, secrets
# app/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
engine = create_async_engine(
settings.DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_recycle=3600,
)
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# app/db/models.py
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Boolean, DateTime
from datetime import datetime
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
# app/crud/user.py
from sqlalchemy import select
async def create_user(db: AsyncSession, user_in: UserCreate) -> User:
user = User(
email=user_in.email,
username=user_in.username,
hashed_password=get_password_hash(user_in.password),
)
db.add(user)
await db.flush()
await db.refresh(user)
return user
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
# app/core/security.py
from datetime import datetime, timedelta
from jose import jwt
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256")
# app/api/deps.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
username: str = payload.get("sub")
if username is None or payload.get("type") != "access":
raise HTTPException(401, "Invalid credentials")
except JWTError:
raise HTTPException(401, "Invalid credentials")
user = await user_crud.get_user_by_username(db, username)
if user is None:
raise HTTPException(401, "User not found")
return user
# app/api/v1/endpoints/auth.py
@router.post("/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
user = await user_crud.get_user_by_username(db, form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(401, "Incorrect username or password")
access_token = create_access_token(data={"sub": user.username})
return {"access_token": access_token, "token_type": "bearer"}
# Reusable authorization checkers
from typing import List
from fastapi import Depends, HTTPException
class RoleChecker:
def __init__(self, allowed_roles: List[str]):
self.allowed_roles = allowed_roles
def __call__(self, user: User = Depends(get_current_user)):
if user.role not in self.allowed_roles:
raise HTTPException(403, f"Role '{user.role}' not allowed")
return user
# Usage in routes
@router.get("/admin/users")
async def get_all_users(
user: User = Depends(RoleChecker(["admin"])),
db: AsyncSession = Depends(get_db)
):
users = await user_crud.get_users(db)
return users
from fastapi import Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
errors = [{
"field": ".".join(str(x) for x in e["loc"]),
"message": e["msg"]
} for e in exc.errors()]
return JSONResponse(
status_code=422,
content={"detail": "Validation failed", "errors": errors}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
if settings.ENVIRONMENT == "production":
return JSONResponse(500, {"detail": "Internal server error"})
return JSONResponse(500, {"detail": str(exc)})
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@router.post("/auth/login")
@limiter.limit("5/minute") # Prevent brute force
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
# Login logic
pass
from fastapi import BackgroundTasks
async def send_welcome_email(email: str, username: str):
# Non-blocking email sending
await email_service.send(to=email, subject="Welcome", body=f"Hi {username}")
@router.post("/register")
async def register_user(
user_in: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
user = await user_crud.create_user(db, user_in)
background_tasks.add_task(send_welcome_email, user.email, user.username)
return user
# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
from app.db.session import get_db
@pytest.fixture
def client():
with TestClient(app) as c:
yield c
# tests/test_users.py
def test_create_user(client):
response = client.post("/api/v1/users/", json={
"email": "test@example.com",
"username": "testuser",
"password": "Test123!@#",
"full_name": "Test User"
})
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "password" not in data # Never expose password
def test_login(client):
response = client.post("/api/v1/auth/login",
data={"username": "testuser", "password": "Test123!@#"})
assert response.status_code == 200
assert "access_token" in response.json()
# app/core/config.py
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
PROJECT_NAME: str = "FastAPI App"
ENVIRONMENT: str = "development"
SECRET_KEY: str # MUST be set in .env
DATABASE_URL: str
CORS_ORIGINS: List[str] = ["http://localhost:3000"]
class Config:
env_file = ".env"
settings = Settings()
# Validate production settings
if settings.ENVIRONMENT == "production":
assert len(settings.SECRET_KEY) >= 32
assert "*" not in settings.CORS_ORIGINS
# Bad - No connection pooling configuration
engine = create_async_engine(DATABASE_URL)
# Good - Proper connection pooling
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # Base number of connections
max_overflow=10, # Extra connections when pool is full
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Check connection health before use
pool_timeout=30, # Wait 30s for available connection
)
# Good - Proper cleanup on shutdown
@app.on_event("shutdown")
async def shutdown():
await engine.dispose()
# Bad - Sequential async calls
async def get_user_dashboard(user_id: int, db: AsyncSession):
user = await get_user(db, user_id)
orders = await get_user_orders(db, user_id)
notifications = await get_notifications(db, user_id)
return {"user": user, "orders": orders, "notifications": notifications}
# Good - Concurrent async calls
async def get_user_dashboard(user_id: int, db: AsyncSession):
user, orders, notifications = await asyncio.gather(
get_user(db, user_id),
get_user_orders(db, user_id),
get_notifications(db, user_id),
)
return {"user": user, "orders": orders, "notifications": notifications}
# Good - With error handling for partial failures
async def get_user_dashboard_safe(user_id: int, db: AsyncSession):
results = await asyncio.gather(
get_user(db, user_id),
get_user_orders(db, user_id),
get_notifications(db, user_id),
return_exceptions=True # Don't fail all if one fails
)
user, orders, notifications = results
return {
"user": user if not isinstance(user, Exception) else None,
"orders": orders if not isinstance(orders, Exception) else [],
"notifications": notifications if not isinstance(notifications, Exception) else [],
}
# Bad - No caching, database hit every request
@router.get("/products")
async def get_products(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product))
return result.scalars().all()
# Good - In-memory caching with TTL
from cachetools import TTLCache
from functools import wraps
cache = TTLCache(maxsize=100, ttl=300) # 5 minutes TTL
def cached(key_func):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = key_func(*args, **kwargs)
if key in cache:
return cache[key]
result = await func(*args, **kwargs)
cache[key] = result
return result
return wrapper
return decorator
@router.get("/products")
@cached(key_func=lambda: "products_list")
async def get_products(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product))
return result.scalars().all()
# Good - Redis caching for distributed systems
import aioredis
import json
redis = aioredis.from_url("redis://localhost")
@router.get("/products/{product_id}")
async def get_product(product_id: int, db: AsyncSession = Depends(get_db)):
# Try cache first
cached = await redis.get(f"product:{product_id}")
if cached:
return json.loads(cached)
# Fetch from database
result = await db.execute(select(Product).where(Product.id == product_id))
product = result.scalar_one_or_none()
if not product:
raise HTTPException(404, "Product not found")
# Cache for 5 minutes
await redis.setex(f"product:{product_id}", 300, json.dumps(product.dict()))
return product
# Bad - Load entire file into memory
@router.get("/files/{file_id}")
async def download_file(file_id: int):
content = await load_entire_file(file_id) # Memory intensive!
return Response(content=content, media_type="application/octet-stream")
# Good - Stream large files
from fastapi.responses import StreamingResponse
import aiofiles
@router.get("/files/{file_id}")
async def download_file(file_id: int):
file_path = await get_file_path(file_id)
async def file_streamer():
async with aiofiles.open(file_path, 'rb') as f:
while chunk := await f.read(8192): # 8KB chunks
yield chunk
return StreamingResponse(
file_streamer(),
media_type="application/octet-stream",
headers={"Content-Disposition": f"attachment; filename={file_id}"}
)
# Good - Stream database results
@router.get("/export/users")
async def export_users(db: AsyncSession = Depends(get_db)):
async def generate():
yield "id,email,username\n" # CSV header
result = await db.stream(select(User))
async for row in result:
user = row[0]
yield f"{user.id},{user.email},{user.username}\n"
return StreamingResponse(
generate(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=users.csv"}
)
# Bad - Synchronous query pattern
def get_users_sync(db):
return db.query(User).filter(User.is_active == True).all()
# Good - Async query pattern
async def get_users_async(db: AsyncSession):
result = await db.execute(
select(User).where(User.is_active == True)
)
return result.scalars().all()
# Good - Efficient pagination
async def get_users_paginated(
db: AsyncSession,
skip: int = 0,
limit: int = 20
):
result = await db.execute(
select(User)
.where(User.is_active == True)
.offset(skip)
.limit(limit)
.order_by(User.created_at.desc())
)
return result.scalars().all()
# Good - Avoid N+1 with eager loading
from sqlalchemy.orm import selectinload
async def get_users_with_orders(db: AsyncSession):
result = await db.execute(
select(User)
.options(selectinload(User.orders)) # Eager load orders
.where(User.is_active == True)
)
return result.scalars().all()
# Bad - Blocking operation in request
@router.post("/users")
async def create_user(user_in: UserCreate, db: AsyncSession = Depends(get_db)):
user = await user_crud.create_user(db, user_in)
await send_welcome_email(user.email) # Blocks response!
await notify_admins(user) # More blocking!
return user
# Good - Non-blocking background tasks
from fastapi import BackgroundTasks
@router.post("/users")
async def create_user(
user_in: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
user = await user_crud.create_user(db, user_in)
# Queue non-critical tasks
background_tasks.add_task(send_welcome_email, user.email)
background_tasks.add_task(notify_admins, user)
return user # Return immediately!
# Good - For heavy tasks, use task queue (Celery/ARQ)
from arq import create_pool
@router.post("/reports/generate")
async def generate_report(report_in: ReportCreate):
redis = await create_pool(RedisSettings())
job = await redis.enqueue_job('generate_report', report_in.dict())
return {"job_id": job.job_id, "status": "queued"}
| OWASP ID | Category | FastAPI Mitigation |
|---|---|---|
| A01:2025 | Broken Access Control | Depends(get_current_user) on all protected routes |
| A02:2025 | Security Misconfiguration | Disable docs in prod, use Pydantic Settings |
| A03:2025 | Supply Chain | Pin dependencies in requirements.txt |
| A04:2025 | Insecure Design | Pydantic validation on all inputs |
| A05:2025 | Identification & Auth | JWT with bcrypt, OAuth2PasswordBearer |
| A06:2025 | Vulnerable Components | Run pip-audit and safety check |
# ✅ PREVENT SQL INJECTION
from pydantic import BaseModel, field_validator
class SearchQuery(BaseModel):
query: str = Field(..., min_length=1, max_length=100)
@field_validator('query')
@classmethod
def sanitize(cls, v: str) -> str:
# Block SQL injection patterns
forbidden = ['--', ';', '/*', 'xp_', 'union', 'select', 'drop']
if any(p in v.lower() for p in forbidden):
raise ValueError('Query contains forbidden patterns')
return v.strip()
# ✅ ALWAYS use ORM (parameterized queries)
result = await db.execute(select(User).where(User.email == email))
# ❌ NEVER string concatenation
# query = f"SELECT * FROM users WHERE email = '{email}'" # VULNERABLE!
# ❌ NEVER use wildcard in production
app.add_middleware(CORSMiddleware, allow_origins=["*"]) # DANGEROUS!
# ✅ Whitelist specific origins
app.add_middleware(CORSMiddleware, allow_origins=[
"https://yourdomain.com",
"https://app.yourdomain.com"
])
# .env file (add to .gitignore!)
SECRET_KEY=your-32-char-secret-key-here
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db
# ❌ NEVER hardcode secrets
SECRET_KEY = "my-secret" # DON'T!
# ✅ Use environment variables
SECRET_KEY = settings.SECRET_KEY
# ❌ NEVER log sensitive data
logger.info(f"Password: {password}") # DON'T!
# ✅ Sanitize logs
logger.info(f"User {user.email} logged in")
ALWAYS:
NEVER:
allow_origins=["*"] with credentials# ❌ DON'T
@app.get("/users")
def get_users(): # Blocking!
users = db.query(User).all()
return users
# ✅ DO
@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
return result.scalars().all()
# ❌ DON'T
@app.get("/users/{id}")
async def get_user(id: int):
return user # Exposes password_hash!
# ✅ DO
@app.get("/users/{id}", response_model=UserResponse)
async def get_user(id: int):
return user # Pydantic filters fields
# ❌ DON'T
@app.post("/users")
async def create_user(data: dict): # No validation!
pass
# ✅ DO
@app.post("/users")
async def create_user(user_in: UserCreate): # Validated!
pass
# ❌ DON'T
import hashlib
hash = hashlib.md5(password.encode()).hexdigest() # INSECURE!
# ✅ DO
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"])
hash = pwd_context.hash(password)
# ❌ DON'T
@app.post("/login")
async def login(): # Vulnerable to brute force!
pass
# ✅ DO
@app.post("/login")
@limiter.limit("5/minute")
async def login(request: Request):
pass
# ❌ DON'T
@app.get("/users/{id}")
async def get_user(id: int):
return user.data # Can raise AttributeError
# ✅ DO
@app.get("/users/{id}")
async def get_user(id: int):
if not user:
raise HTTPException(404, "User not found")
return user
# ❌ DON'T
@app.get("/protected")
async def route(token: str):
# Manually verify token every time
user = verify_token(token)
if not user:
raise HTTPException(401)
# ✅ DO
@app.get("/protected")
async def route(user: User = Depends(get_current_user)):
# Authentication handled by dependency
pass
Requirements Analysis
Test Planning
Security Planning
Code Quality
async defSecurity Implementation
Database
Performance
Testing Verification
pytest tests/ -vpytest --cov=appCode Quality Verification
mypy app/ruff check app/pip-auditsafety checkAPI Verification
You are a FastAPI expert focused on:
Key principles : Validate all inputs with Pydantic, use async/await for I/O, implement auth on protected endpoints, never expose sensitive data, test with pytest, handle errors gracefully, log security events.
FastAPI combines Python's simplicity with performance. Build APIs that are fast, secure, and maintainable.
Weekly Installs
105
Repository
GitHub Stars
29
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykPass
Installed on
gemini-cli88
opencode85
codex85
github-copilot79
cursor76
amp66
xdrop 文件传输脚本:Bun 环境下安全上传下载工具,支持加密分享
37,500 周安装
Skywork-ppt:AI 驱动的 PPT 生成与编辑工具,支持模板仿制与本地文件操作
104 周安装
Godot UI 主题化与动态样式管理 - 专业主题切换、DPI 缩放、深色模式实现
104 周安装
Hummingbot XEMM机会发现工具:跨交易所做市套利策略分析脚本
104 周安装
Firestore 标准版完整指南:配置、安全规则、SDK使用与索引优化
104 周安装
Zustand 最佳实践指南:React 状态管理性能优化与架构规则
104 周安装
Conductor 并行编码代理 Mac 应用 Rails 项目设置指南 - 自动化配置脚本
104 周安装
| A07:2025 | Cryptographic Failures | HTTPS only, bcrypt for passwords |
| A08:2025 | Injection | SQLAlchemy ORM, parameterized queries |
| A09:2025 | Logging Failures | Structured logging, exclude secrets |
| A10:2025 | Exception Handling | Custom handlers, hide stack traces |
Production Readiness