fastapi by martinholovsky/claude-skills-generator
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill fastapi风险等级 : 高
理由 : FastAPI 应用程序处理身份验证、数据库访问、文件上传和外部 API 通信。Starlette 中的 DoS 漏洞、注入风险以及不当的验证可能危及可用性和安全性。
您是一位专业的 FastAPI 开发人员,负责创建安全、高性能的 REST API 和 WebSocket 服务。您需要配置适当的验证、身份验证和安全标头。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 组件 | 版本 | 备注 |
|---|---|---|
| FastAPI | 0.115.3+ | 修复 CVE-2024-47874 |
| Starlette | 0.40.0+ | 修复 DoS 漏洞 |
| Pydantic | 2.0+ | 更好的验证 |
| Python | 3.11+ | 性能 |
[project]
dependencies = [
"fastapi>=0.115.3",
"starlette>=0.40.0",
"pydantic>=2.5",
"python-jose[cryptography]>=3.3",
"passlib[argon2]>=1.7",
"python-multipart>=0.0.6",
"slowapi>=0.1.9",
"secure>=0.3",
]
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from secure import SecureHeaders
app = FastAPI(
title="Secure API",
docs_url=None if PRODUCTION else "/docs", # 生产环境禁用
redoc_url=None,
)
# 安全标头
secure_headers = SecureHeaders()
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
secure_headers.framework.fastapi(response)
return response
# 限制性 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com"], # 切勿使用 ["*"]
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, field_validator, EmailStr
class UserCreate(BaseModel):
username: str = Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_-]+$')
email: EmailStr
password: str = Field(min_length=12)
@field_validator('password')
@classmethod
def validate_password(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('必须包含大写字母')
if not any(c.isdigit() for c in v):
raise ValueError('必须包含数字')
return v
@app.post("/users")
async def create_user(user: UserCreate):
# 输入已由 Pydantic 验证
return await user_service.create(user)
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await user_service.get(user_id)
if user is None:
raise credentials_exception
return user
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/login")
@limiter.limit("5/minute") # 对身份验证端点严格限制
async def login(request: Request, credentials: LoginRequest):
return await auth_service.login(credentials)
@app.get("/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
return await data_service.get_all()
from fastapi import UploadFile, File, HTTPException
import magic
ALLOWED_TYPES = {"image/jpeg", "image/png", "application/pdf"}
MAX_SIZE = 10 * 1024 * 1024 # 10MB
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
# 检查大小
content = await file.read()
if len(content) > MAX_SIZE:
raise HTTPException(400, "文件过大")
# 检查魔术字节,不仅仅是扩展名
mime_type = magic.from_buffer(content, mime=True)
if mime_type not in ALLOWED_TYPES:
raise HTTPException(400, f"不允许的文件类型: {mime_type}")
# 生成安全文件名
safe_name = f"{uuid4()}{Path(file.filename).suffix}"
# 存储在 webroot 之外
file_path = UPLOAD_DIR / safe_name
file_path.write_bytes(content)
return {"filename": safe_name}
始终从定义预期行为的测试开始:
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.mark.asyncio
async def test_create_item_success():
"""测试使用有效数据成功创建项目。"""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/items",
json={"name": "Test Item", "price": 29.99},
headers={"Authorization": "Bearer valid_token"}
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Item"
assert "id" in data
@pytest.mark.asyncio
async def test_create_item_validation_error():
"""测试验证拒绝无效价格。"""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/items",
json={"name": "Test", "price": -10},
headers={"Authorization": "Bearer valid_token"}
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_create_item_unauthorized():
"""测试端点需要身份验证。"""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/items", json={"name": "Test", "price": 10})
assert response.status_code == 401
仅编写使测试通过所需的代码:
@app.post("/items", status_code=201)
async def create_item(
item: ItemCreate,
user: User = Depends(get_current_user)
) -> ItemResponse:
created = await item_service.create(item, user.id)
return ItemResponse.from_orm(created)
在保持测试通过的同时提高代码质量。提取通用模式、改进命名、优化查询。
# 运行所有测试并计算覆盖率
pytest --cov=app --cov-report=term-missing
# 类型检查
mypy app --strict
# 安全扫描
bandit -r app -ll
# 提交前必须全部通过
# 错误做法 - 每个请求创建新连接
@app.get("/users/{user_id}")
async def get_user(user_id: int):
conn = await asyncpg.connect(DATABASE_URL)
try:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
finally:
await conn.close()
# 正确做法 - 使用连接池
from contextlib import asynccontextmanager
pool: asyncpg.Pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=5,
max_size=20,
command_timeout=60
)
yield
await pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
# 错误做法 - 顺序调用外部 API
@app.get("/dashboard")
async def get_dashboard(user_id: int):
profile = await fetch_profile(user_id) # 100ms
orders = await fetch_orders(user_id) # 150ms
notifications = await fetch_notifications(user_id) # 80ms
return {"profile": profile, "orders": orders, "notifications": notifications}
# 总计: ~330ms
# 正确做法 - 并发调用
@app.get("/dashboard")
async def get_dashboard(user_id: int):
profile, orders, notifications = await asyncio.gather(
fetch_profile(user_id),
fetch_orders(user_id),
fetch_notifications(user_id)
)
return {"profile": profile, "orders": orders, "notifications": notifications}
# 总计: ~150ms (最慢的调用)
# 错误做法 - 每次请求都重新计算昂贵的数据
@app.get("/stats")
async def get_stats():
return await compute_expensive_stats() # 每次 500ms
# 正确做法 - 使用 Redis 缓存
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
@asynccontextmanager
async def lifespan(app: FastAPI):
redis = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis), prefix="api-cache")
yield
@app.get("/stats")
@cache(expire=300) # 缓存 5 分钟
async def get_stats():
return await compute_expensive_stats()
# 正确做法 - 对于简单情况使用内存缓存
from functools import lru_cache
from datetime import datetime, timedelta
_cache = {}
_cache_time = {}
async def get_cached_config(key: str, ttl: int = 60):
now = datetime.utcnow()
if key in _cache and _cache_time[key] > now:
return _cache[key]
value = await fetch_config(key)
_cache[key] = value
_cache_time[key] = now + timedelta(seconds=ttl)
return value
# 错误做法 - 返回所有记录
@app.get("/items")
async def list_items():
return await db.fetch("SELECT * FROM items") # 可能数百万条
# 正确做法 - 基于游标的分页
from pydantic import BaseModel
class PaginatedResponse(BaseModel):
items: list
next_cursor: str | None
has_more: bool
@app.get("/items")
async def list_items(
cursor: str | None = None,
limit: int = Query(default=20, le=100)
) -> PaginatedResponse:
query = "SELECT * FROM items"
params = []
if cursor:
query += " WHERE id > $1"
params.append(decode_cursor(cursor))
query += f" ORDER BY id LIMIT {limit + 1}"
rows = await db.fetch(query, *params)
has_more = len(rows) > limit
items = rows[:limit]
return PaginatedResponse(
items=items,
next_cursor=encode_cursor(items[-1]["id"]) if items else None,
has_more=has_more
)
# 错误做法 - 阻塞响应以进行慢速操作
@app.post("/reports")
async def create_report(request: ReportRequest):
report = await generate_report(request) # 耗时 30 秒
await send_email(request.email, report)
return {"status": "completed"}
# 正确做法 - 立即返回,在后台处理
from fastapi import BackgroundTasks
@app.post("/reports", status_code=202)
async def create_report(
request: ReportRequest,
background_tasks: BackgroundTasks
):
report_id = str(uuid4())
background_tasks.add_task(process_report, report_id, request)
return {"report_id": report_id, "status": "processing"}
async def process_report(report_id: str, request: ReportRequest):
report = await generate_report(request)
await save_report(report_id, report)
await send_email(request.email, report)
@app.get("/reports/{report_id}")
async def get_report_status(report_id: str):
return await get_report(report_id)
| CVE ID | 严重性 | 描述 | 缓解措施 |
|---|---|---|---|
| CVE-2024-47874 | 高 | Starlette 多部分 DoS 导致内存耗尽 | 升级 Starlette 0.40.0+ |
| CVE-2024-12868 | 高 | 通过 fastapi 依赖项的下游 DoS | 升级 FastAPI 0.115.3+ |
| CVE-2023-30798 | 高 | Starlette <0.25 DoS | 升级 FastAPI 0.92+ |
| 类别 | 风险 | 缓解措施 |
|---|---|---|
| A01 访问控制 | 高 | 用于身份验证的依赖注入,权限装饰器 |
| A02 加密失败 | 高 | 使用适当算法的 JWT,Argon2 密码 |
| A03 注入 | 高 | Pydantic 验证,参数化查询 |
| A04 不安全设计 | 中 | 类型安全,验证层 |
| A05 配置错误 | 高 | 安全标头,生产环境禁用文档 |
| A06 易受攻击的组件 | 严重 | 保持 Starlette/FastAPI 更新 |
| A07 身份验证失败 | 高 | 对身份验证进行速率限制,安全的 JWT |
from fastapi import HTTPException
from fastapi.responses import JSONResponse
import logging
logger = logging.getLogger(__name__)
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
# 记录完整详情
logger.error(f"未处理的错误: {exc}", exc_info=True)
# 返回安全消息
return JSONResponse(
status_code=500,
content={"detail": "内部服务器错误"}
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail}
)
import pytest
from fastapi.testclient import TestClient
def test_rate_limiting():
client = TestClient(app)
# 超出速率限制
for _ in range(10):
response = client.post("/login", json={"username": "test", "password": "test"})
assert response.status_code == 429
def test_invalid_jwt_rejected():
client = TestClient(app)
response = client.get(
"/protected",
headers={"Authorization": "Bearer invalid.token.here"}
)
assert response.status_code == 401
def test_sql_injection_prevented():
client = TestClient(app)
response = client.get("/users", params={"search": "'; DROP TABLE users; --"})
assert response.status_code in [200, 400]
# 不应导致 500 (SQL 错误)
def test_file_upload_type_validation():
client = TestClient(app)
# 尝试上传伪装成图像的可执行文件
response = client.post(
"/upload",
files={"file": ("test.jpg", b"MZ\x90\x00", "image/jpeg")} # EXE 魔术字节
)
assert response.status_code == 400
# 切勿使用
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True)
# 始终使用
app.add_middleware(CORSMiddleware, allow_origins=["https://app.example.com"])
# 切勿使用 - 允许暴力破解
@app.post("/login")
async def login(creds): ...
# 始终使用
@app.post("/login")
@limiter.limit("5/minute")
async def login(request, creds): ...
# 切勿使用
app = FastAPI()
# 始终使用
app = FastAPI(
docs_url=None if os.environ.get("ENV") == "production" else "/docs",
redoc_url=None
)
# 切勿使用
jwt.encode(data, "secret", algorithm="HS256") # 硬编码的弱密钥
# 始终使用
jwt.encode(data, os.environ["JWT_SECRET"], algorithm="RS256") # 环境变量,强算法
# 切勿使用
if file.filename.endswith('.jpg'): ...
# 始终使用
mime = magic.from_buffer(content, mime=True)
if mime not in ALLOWED_TYPES: ...
您的目标是创建具有以下特点的 FastAPI 应用程序:
安全提醒 :
每周安装次数
104
代码仓库
GitHub 星标数
30
首次出现
Jan 20, 2026
安全审计
安装于
codex85
gemini-cli84
opencode82
cursor80
github-copilot77
claude-code66
Risk Level : HIGH
Justification : FastAPI applications handle authentication, database access, file uploads, and external API communication. DoS vulnerabilities in Starlette, injection risks, and improper validation can compromise availability and security.
You are an expert FastAPI developer creating secure, performant REST APIs and WebSocket services. You configure proper validation, authentication, and security headers.
| Component | Version | Notes |
|---|---|---|
| FastAPI | 0.115.3+ | CVE-2024-47874 fix |
| Starlette | 0.40.0+ | DoS vulnerability fix |
| Pydantic | 2.0+ | Better validation |
| Python | 3.11+ | Performance |
[project]
dependencies = [
"fastapi>=0.115.3",
"starlette>=0.40.0",
"pydantic>=2.5",
"python-jose[cryptography]>=3.3",
"passlib[argon2]>=1.7",
"python-multipart>=0.0.6",
"slowapi>=0.1.9",
"secure>=0.3",
]
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from secure import SecureHeaders
app = FastAPI(
title="Secure API",
docs_url=None if PRODUCTION else "/docs", # Disable in prod
redoc_url=None,
)
# Security headers
secure_headers = SecureHeaders()
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
secure_headers.framework.fastapi(response)
return response
# Restrictive CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com"], # Never ["*"]
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, field_validator, EmailStr
class UserCreate(BaseModel):
username: str = Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_-]+$')
email: EmailStr
password: str = Field(min_length=12)
@field_validator('password')
@classmethod
def validate_password(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('Must contain uppercase')
if not any(c.isdigit() for c in v):
raise ValueError('Must contain digit')
return v
@app.post("/users")
async def create_user(user: UserCreate):
# Input already validated by Pydantic
return await user_service.create(user)
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await user_service.get(user_id)
if user is None:
raise credentials_exception
return user
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/login")
@limiter.limit("5/minute") # Strict for auth endpoints
async def login(request: Request, credentials: LoginRequest):
return await auth_service.login(credentials)
@app.get("/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
return await data_service.get_all()
from fastapi import UploadFile, File, HTTPException
import magic
ALLOWED_TYPES = {"image/jpeg", "image/png", "application/pdf"}
MAX_SIZE = 10 * 1024 * 1024 # 10MB
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
# Check size
content = await file.read()
if len(content) > MAX_SIZE:
raise HTTPException(400, "File too large")
# Check magic bytes, not just extension
mime_type = magic.from_buffer(content, mime=True)
if mime_type not in ALLOWED_TYPES:
raise HTTPException(400, f"File type not allowed: {mime_type}")
# Generate safe filename
safe_name = f"{uuid4()}{Path(file.filename).suffix}"
# Store outside webroot
file_path = UPLOAD_DIR / safe_name
file_path.write_bytes(content)
return {"filename": safe_name}
Always start with tests that define expected behavior:
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.mark.asyncio
async def test_create_item_success():
"""Test successful item creation with valid data."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/items",
json={"name": "Test Item", "price": 29.99},
headers={"Authorization": "Bearer valid_token"}
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Item"
assert "id" in data
@pytest.mark.asyncio
async def test_create_item_validation_error():
"""Test validation rejects invalid price."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/items",
json={"name": "Test", "price": -10},
headers={"Authorization": "Bearer valid_token"}
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_create_item_unauthorized():
"""Test endpoint requires authentication."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/items", json={"name": "Test", "price": 10})
assert response.status_code == 401
Write only the code needed to make tests pass:
@app.post("/items", status_code=201)
async def create_item(
item: ItemCreate,
user: User = Depends(get_current_user)
) -> ItemResponse:
created = await item_service.create(item, user.id)
return ItemResponse.from_orm(created)
Improve code quality while keeping tests green. Extract common patterns, improve naming, optimize queries.
# Run all tests with coverage
pytest --cov=app --cov-report=term-missing
# Type checking
mypy app --strict
# Security scan
bandit -r app -ll
# All must pass before committing
# BAD - Creates new connection per request
@app.get("/users/{user_id}")
async def get_user(user_id: int):
conn = await asyncpg.connect(DATABASE_URL)
try:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
finally:
await conn.close()
# GOOD - Uses connection pool
from contextlib import asynccontextmanager
pool: asyncpg.Pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=5,
max_size=20,
command_timeout=60
)
yield
await pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
# BAD - Sequential external API calls
@app.get("/dashboard")
async def get_dashboard(user_id: int):
profile = await fetch_profile(user_id) # 100ms
orders = await fetch_orders(user_id) # 150ms
notifications = await fetch_notifications(user_id) # 80ms
return {"profile": profile, "orders": orders, "notifications": notifications}
# Total: ~330ms
# GOOD - Concurrent calls
@app.get("/dashboard")
async def get_dashboard(user_id: int):
profile, orders, notifications = await asyncio.gather(
fetch_profile(user_id),
fetch_orders(user_id),
fetch_notifications(user_id)
)
return {"profile": profile, "orders": orders, "notifications": notifications}
# Total: ~150ms (slowest call)
# BAD - Recomputes expensive data every request
@app.get("/stats")
async def get_stats():
return await compute_expensive_stats() # 500ms each time
# GOOD - Cache with Redis
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
@asynccontextmanager
async def lifespan(app: FastAPI):
redis = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis), prefix="api-cache")
yield
@app.get("/stats")
@cache(expire=300) # Cache for 5 minutes
async def get_stats():
return await compute_expensive_stats()
# GOOD - In-memory cache for simpler cases
from functools import lru_cache
from datetime import datetime, timedelta
_cache = {}
_cache_time = {}
async def get_cached_config(key: str, ttl: int = 60):
now = datetime.utcnow()
if key in _cache and _cache_time[key] > now:
return _cache[key]
value = await fetch_config(key)
_cache[key] = value
_cache_time[key] = now + timedelta(seconds=ttl)
return value
# BAD - Returns all records
@app.get("/items")
async def list_items():
return await db.fetch("SELECT * FROM items") # Could be millions
# GOOD - Cursor-based pagination
from pydantic import BaseModel
class PaginatedResponse(BaseModel):
items: list
next_cursor: str | None
has_more: bool
@app.get("/items")
async def list_items(
cursor: str | None = None,
limit: int = Query(default=20, le=100)
) -> PaginatedResponse:
query = "SELECT * FROM items"
params = []
if cursor:
query += " WHERE id > $1"
params.append(decode_cursor(cursor))
query += f" ORDER BY id LIMIT {limit + 1}"
rows = await db.fetch(query, *params)
has_more = len(rows) > limit
items = rows[:limit]
return PaginatedResponse(
items=items,
next_cursor=encode_cursor(items[-1]["id"]) if items else None,
has_more=has_more
)
# BAD - Blocks response for slow operations
@app.post("/reports")
async def create_report(request: ReportRequest):
report = await generate_report(request) # Takes 30 seconds
await send_email(request.email, report)
return {"status": "completed"}
# GOOD - Return immediately, process in background
from fastapi import BackgroundTasks
@app.post("/reports", status_code=202)
async def create_report(
request: ReportRequest,
background_tasks: BackgroundTasks
):
report_id = str(uuid4())
background_tasks.add_task(process_report, report_id, request)
return {"report_id": report_id, "status": "processing"}
async def process_report(report_id: str, request: ReportRequest):
report = await generate_report(request)
await save_report(report_id, report)
await send_email(request.email, report)
@app.get("/reports/{report_id}")
async def get_report_status(report_id: str):
return await get_report(report_id)
| CVE ID | Severity | Description | Mitigation |
|---|---|---|---|
| CVE-2024-47874 | HIGH | Starlette multipart DoS via memory exhaustion | Upgrade Starlette 0.40.0+ |
| CVE-2024-12868 | HIGH | Downstream DoS via fastapi dependency | Upgrade FastAPI 0.115.3+ |
| CVE-2023-30798 | HIGH | Starlette <0.25 DoS | Upgrade FastAPI 0.92+ |
| Category | Risk | Mitigations |
|---|---|---|
| A01 Access Control | HIGH | Dependency injection for auth, permission decorators |
| A02 Crypto Failures | HIGH | JWT with proper algorithms, Argon2 passwords |
| A03 Injection | HIGH | Pydantic validation, parameterized queries |
| A04 Insecure Design | MEDIUM | Type safety, validation layers |
| A05 Misconfiguration | HIGH | Security headers, disable docs in prod |
| A06 Vulnerable Components | CRITICAL | Keep Starlette/FastAPI updated |
| A07 Auth Failures | HIGH | Rate limiting on auth, secure JWT |
from fastapi import HTTPException
from fastapi.responses import JSONResponse
import logging
logger = logging.getLogger(__name__)
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
# Log full details
logger.error(f"Unhandled error: {exc}", exc_info=True)
# Return safe message
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail}
)
import pytest
from fastapi.testclient import TestClient
def test_rate_limiting():
client = TestClient(app)
# Exceed rate limit
for _ in range(10):
response = client.post("/login", json={"username": "test", "password": "test"})
assert response.status_code == 429
def test_invalid_jwt_rejected():
client = TestClient(app)
response = client.get(
"/protected",
headers={"Authorization": "Bearer invalid.token.here"}
)
assert response.status_code == 401
def test_sql_injection_prevented():
client = TestClient(app)
response = client.get("/users", params={"search": "'; DROP TABLE users; --"})
assert response.status_code in [200, 400]
# Should not cause 500 (SQL error)
def test_file_upload_type_validation():
client = TestClient(app)
# Try uploading executable disguised as image
response = client.post(
"/upload",
files={"file": ("test.jpg", b"MZ\x90\x00", "image/jpeg")} # EXE magic bytes
)
assert response.status_code == 400
# NEVER
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True)
# ALWAYS
app.add_middleware(CORSMiddleware, allow_origins=["https://app.example.com"])
# NEVER - allows brute force
@app.post("/login")
async def login(creds): ...
# ALWAYS
@app.post("/login")
@limiter.limit("5/minute")
async def login(request, creds): ...
# NEVER
app = FastAPI()
# ALWAYS
app = FastAPI(
docs_url=None if os.environ.get("ENV") == "production" else "/docs",
redoc_url=None
)
# NEVER
jwt.encode(data, "secret", algorithm="HS256") # Hardcoded weak secret
# ALWAYS
jwt.encode(data, os.environ["JWT_SECRET"], algorithm="RS256") # Env var, strong algo
# NEVER
if file.filename.endswith('.jpg'): ...
# ALWAYS
mime = magic.from_buffer(content, mime=True)
if mime not in ALLOWED_TYPES: ...
Your goal is to create FastAPI applications that are:
Security Reminder :
Weekly Installs
104
Repository
GitHub Stars
30
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex85
gemini-cli84
opencode82
cursor80
github-copilot77
claude-code66
xdrop 文件传输脚本:Bun 环境下安全上传下载工具,支持加密分享
37,500 周安装
Dify组件重构技能:React高复杂度组件自动化重构与测试指南
2,900 周安装
WordPress专家服务 - 自定义主题插件开发、WooCommerce与性能优化
2,900 周安装
HeroUI v3 React 组件库 | 基于 Tailwind CSS v4 的无障碍 UI 框架
3,000 周安装
tts文本转语音工具:支持语音克隆、情感控制、SRT时间线精准配音,Kokoro与Noiz双后端
2,900 周安装
高级DevOps工程师技能:CI/CD流水线、Kubernetes部署、基础设施即代码实践指南
3,000 周安装
Cloudflare Agents SDK 开发指南:构建持久化AI智能体与工作流
3,000 周安装