重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
fastapi-development by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill fastapi-development一个全面的技能,用于使用 FastAPI 构建现代化、高性能的 Python API。掌握异步/等待模式、Pydantic 数据验证、依赖注入、身份验证、数据库集成以及生产就绪的部署策略。
在以下情况下使用此技能:
FastAPI 建立在三个基本原则之上:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
# 基本安装
pip install fastapi
# 包含用于生产的 ASGI 服务器
pip install "fastapi[all]"
# 或单独安装
pip install fastapi uvicorn[standard]
# 附加依赖项
pip install python-multipart # 用于表单数据
pip install python-jose[cryptography] # 用于 JWT
pip install passlib[bcrypt] # 用于密码哈希
pip install sqlalchemy # 用于 SQL 数据库
pip install databases # 用于异步数据库支持
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
# 运行命令:uvicorn main:app --reload
from pydantic import BaseModel, Field, EmailStr, HttpUrl
from typing import Optional, List
from datetime import datetime
class User(BaseModel):
id: int
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
full_name: Optional[str] = None
is_active: bool = True
created_at: datetime = Field(default_factory=datetime.utcnow)
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=8)
full_name: Optional[str] = None
class UserResponse(BaseModel):
id: int
username: str
email: EmailStr
full_name: Optional[str]
is_active: bool
class Config:
orm_mode = True # 用于 SQLAlchemy 模型
class Image(BaseModel):
url: HttpUrl
name: str
class Item(BaseModel):
name: str
description: Optional[str] = None
price: float = Field(..., gt=0)
tax: Optional[float] = None
tags: List[str] = []
images: Optional[List[Image]] = None
# 请求示例:
# {
# "name": "Laptop",
# "price": 999.99,
# "tags": ["electronics", "computers"],
# "images": [
# {"url": "http://example.com/img1.jpg", "name": "Front view"}
# ]
# }
from pydantic import BaseModel, Field, validator
class Product(BaseModel):
name: str = Field(..., example="MacBook Pro")
price: float = Field(..., gt=0, example=1999.99)
discount: Optional[float] = Field(None, ge=0, le=100, example=10.0)
@validator('discount')
def discount_check(cls, v, values):
if v and 'price' in values:
discounted = values['price'] * (1 - v/100)
if discounted < 0:
raise ValueError('Discounted price cannot be negative')
return v
class Config:
schema_extra = {
"example": {
"name": "MacBook Pro 16",
"price": 2499.99,
"discount": 15.0
}
}
from fastapi import FastAPI, Path, Query, Body
from typing import Optional
app = FastAPI()
# 带路径参数的 GET
@app.get("/items/{item_id}")
async def read_item(
item_id: int = Path(..., title="The ID of the item", ge=1),
q: Optional[str] = Query(None, max_length=50)
):
return {"item_id": item_id, "q": q}
# 带请求体的 POST
@app.post("/items/")
async def create_item(item: Item):
return {"item": item, "message": "Item created"}
# 用于更新的 PUT
@app.put("/items/{item_id}")
async def update_item(
item_id: int,
item: Item = Body(...),
):
return {"item_id": item_id, "item": item}
# DELETE
@app.delete("/items/{item_id}")
async def delete_item(item_id: int):
return {"message": f"Item {item_id} deleted"}
# 用于部分更新的 PATCH
@app.patch("/items/{item_id}")
async def partial_update_item(
item_id: int,
item: dict = Body(...)
):
return {"item_id": item_id, "updated_fields": item}
from fastapi import Query
from typing import List, Optional
@app.get("/search/")
async def search_items(
q: str = Query(..., min_length=3, max_length=50),
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100),
sort_by: Optional[str] = Query(None, regex="^(name|price|date)$"),
tags: List[str] = Query([], description="Filter by tags")
):
return {
"q": q,
"skip": skip,
"limit": limit,
"sort_by": sort_by,
"tags": tags
}
from typing import List
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
# 哈希密码,保存到数据库
db_user = {
"id": 1,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"is_active": True
}
return db_user
@app.get("/users/", response_model=List[UserResponse])
async def list_users(skip: int = 0, limit: int = 100):
users = [...] # 从数据库获取
return users
# 从响应中排除字段
class UserInDB(User):
hashed_password: str
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
user = get_user_from_db(user_id) # 返回 UserInDB
return user # 密码自动被排除
# 在以下情况下使用 async def:
# - 使用异步驱动进行数据库查询时
# - 使用 httpx/aiohttp 调用外部 API 时
# - 使用异步 I/O 操作时
# - 使用异步库时
@app.get("/async-example")
async def async_endpoint():
# 可以在内部使用 await
result = await async_database_query()
external_data = await async_http_call()
return {"result": result, "external": external_data}
# 在以下情况下使用 def:
# - 使用同步库时
# - 执行 CPU 密集型操作时
# - 不需要异步操作时
@app.get("/sync-example")
def sync_endpoint():
# 常规同步代码
result = synchronous_database_query()
return {"result": result}
import asyncio
from databases import Database
DATABASE_URL = "postgresql://user:password@localhost/dbname"
database = Database(DATABASE_URL)
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
query = "SELECT * FROM users WHERE id = :user_id"
user = await database.fetch_one(query, {"user_id": user_id})
return user
@app.post("/users/")
async def create_user(user: UserCreate):
query = """
INSERT INTO users (username, email, hashed_password)
VALUES (:username, :email, :password)
RETURNING *
"""
hashed_password = hash_password(user.password)
new_user = await database.fetch_one(
query,
{
"username": user.username,
"email": user.email,
"password": hashed_password
}
)
return new_user
import asyncio
import httpx
async def fetch_user(user_id: int):
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/users/{user_id}")
return response.json()
@app.get("/users/batch")
async def get_multiple_users(user_ids: List[int] = Query(...)):
# 并发获取所有用户
users = await asyncio.gather(*[fetch_user(uid) for uid in user_ids])
return {"users": users}
from fastapi import Depends
from typing import Optional
# 简单的依赖函数
async def common_parameters(
q: Optional[str] = None,
skip: int = 0,
limit: int = 100
):
return {"q": q, "skip": skip, "limit": limit}
@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
return commons
@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters)):
return commons
class Pagination:
def __init__(
self,
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100)
):
self.skip = skip
self.limit = limit
@app.get("/items/")
async def list_items(pagination: Pagination = Depends()):
return {
"skip": pagination.skip,
"limit": pagination.limit,
"items": [] # 带分页获取
}
from sqlalchemy.orm import Session
from typing import Generator
def get_db() -> Generator[Session, None, None]:
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
db: Session = Depends(get_db)
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
from fastapi import Header, HTTPException
async def verify_token(x_token: str = Header(...)):
if x_token != "secret-token":
raise HTTPException(status_code=400, detail="Invalid token")
return x_token
async def verify_key(x_key: str = Header(...)):
if x_key != "secret-key":
raise HTTPException(status_code=400, detail="Invalid key")
return x_key
async def verify_credentials(
token: str = Depends(verify_token),
key: str = Depends(verify_key)
):
return {"token": token, "key": key}
@app.get("/protected/")
async def protected_route(credentials: dict = Depends(verify_credentials)):
return {"message": "Access granted", "credentials": credentials}
async def log_requests():
print("Request received")
app = FastAPI(dependencies=[Depends(log_requests)])
# 此依赖项对所有端点运行
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user_from_db(username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
from fastapi import Security
from fastapi.security import APIKeyHeader
API_KEY = "your-api-key"
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(api_key: str = Security(api_key_header)):
if api_key != API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API Key"
)
return api_key
@app.get("/secure-data")
async def get_secure_data(api_key: str = Depends(verify_api_key)):
return {"data": "This is secure data"}
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={
"items:read": "Read items",
"items:write": "Create and update items",
"users:read": "Read user information"
}
)
async def get_current_user_with_scopes(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme)
):
# 验证令牌并检查作用域
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
# 解码 JWT 并验证作用域...
return user
@app.get("/items/", dependencies=[Security(get_current_user_with_scopes, scopes=["items:read"])])
async def read_items():
return [{"item": "Item 1"}, {"item": "Item 2"}]
@app.post("/items/", dependencies=[Security(get_current_user_with_scopes, scopes=["items:write"])])
async def create_item(item: Item):
return {"item": item}
from sqlalchemy import create_engine, Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 模型
class UserModel(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
Base.metadata.create_all(bind=engine)
from sqlalchemy.orm import Session
# 创建
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = UserModel(
username=user.username,
email=user.email,
hashed_password=get_password_hash(user.password)
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# 读取
@app.get("/users/{user_id}", response_model=UserResponse)
async def read_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# 更新
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_update: UserCreate,
db: Session = Depends(get_db)
):
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.username = user_update.username
user.email = user_update.email
if user_update.password:
user.hashed_password = get_password_hash(user_update.password)
db.commit()
db.refresh(user)
return user
# 删除
@app.delete("/users/{user_id}")
async def delete_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
db.delete(user)
db.commit()
return {"message": "User deleted successfully"}
from fastapi import BackgroundTasks
def send_email(email: str, message: str):
# 模拟发送邮件
print(f"Sending email to {email}: {message}")
def process_file(filename: str):
# 模拟文件处理
print(f"Processing file: {filename}")
@app.post("/send-notification/")
async def send_notification(
email: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(send_email, email, "Welcome!")
return {"message": "Notification scheduled"}
@app.post("/upload/")
async def upload_file(
file: str,
background_tasks: BackgroundTasks
):
# 先保存文件
background_tasks.add_task(process_file, file)
return {"message": "File uploaded, processing in background"}
from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
class CustomException(Exception):
def __init__(self, name: str):
self.name = name
@app.exception_handler(CustomException)
async def custom_exception_handler(request: Request, exc: CustomException):
return JSONResponse(
status_code=418,
content={"message": f"Oops! {exc.name} did something wrong."},
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": exc.errors()},
)
@app.get("/items/{item_id}")
async def read_item(item_id: str):
if item_id == "error":
raise CustomException(name="Item")
return {"item_id": item_id}
from fastapi.testclient import TestClient
client = TestClient(app)
def test_read_main():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello World"}
def test_create_item():
response = client.post(
"/items/",
json={"name": "Test Item", "price": 10.5}
)
assert response.status_code == 200
assert response.json()["name"] == "Test Item"
def test_authentication():
response = client.post(
"/token",
data={"username": "testuser", "password": "testpass"}
)
assert response.status_code == 200
assert "access_token" in response.json()
import pytest
from httpx import AsyncClient
@pytest.mark.anyio
async def test_read_items():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.get("/items/")
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.anyio
async def test_create_user():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.post(
"/users/",
json={
"username": "newuser",
"email": "new@example.com",
"password": "securepass123"
}
)
assert response.status_code == 200
assert response.json()["username"] == "newuser"
# routers/users.py
from fastapi import APIRouter, Depends
router = APIRouter(
prefix="/users",
tags=["users"],
dependencies=[Depends(verify_token)],
responses={404: {"description": "Not found"}},
)
@router.get("/")
async def list_users():
return [{"username": "user1"}, {"username": "user2"}]
@router.get("/{user_id}")
async def get_user(user_id: int):
return {"user_id": user_id}
# main.py
from routers import users
app = FastAPI()
app.include_router(users.router)
my_fastapi_project/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── users.py
│ │ └── items.py
│ ├── dependencies/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ └── database.py
│ └── utils/
│ ├── __init__.py
│ └── security.py
├── tests/
│ ├── __init__.py
│ ├── test_users.py
│ └── test_items.py
├── requirements.txt
├── .env
└── README.md
from pydantic import BaseSettings
class Settings(BaseSettings):
app_name: str = "My FastAPI App"
database_url: str
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
class Config:
env_file = ".env"
settings = Settings()
app = FastAPI(
title="My API",
description="This is a very custom API",
version="1.0.0",
openapi_tags=[
{
"name": "users",
"description": "Operations with users.",
},
{
"name": "items",
"description": "Manage items.",
},
]
)
@app.post(
"/items/",
response_model=Item,
tags=["items"],
summary="Create an item",
description="Create an item with all the information",
response_description="The created item",
)
async def create_item(item: Item):
return item
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY ./app /app
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
gunicorn main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
技能版本:1.0.0 最后更新:2025年10月 技能类别:后端开发、API 开发、Python 兼容性:FastAPI 0.100+、Python 3.7+、Pydantic 2.0+
每周安装次数
51
代码仓库
GitHub 星标数
47
首次出现
2026年1月22日
安全审计
安装于
gemini-cli43
opencode42
codex41
github-copilot39
cursor37
claude-code35
A comprehensive skill for building modern, high-performance Python APIs with FastAPI. Master async/await patterns, Pydantic data validation, dependency injection, authentication, database integration, and production-ready deployment strategies.
Use this skill when:
FastAPI is built on three foundational principles:
# Basic installation
pip install fastapi
# With ASGI server for production
pip install "fastapi[all]"
# Or install separately
pip install fastapi uvicorn[standard]
# Additional dependencies
pip install python-multipart # For form data
pip install python-jose[cryptography] # For JWT
pip install passlib[bcrypt] # For password hashing
pip install sqlalchemy # For SQL databases
pip install databases # For async database support
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
# Run with: uvicorn main:app --reload
from pydantic import BaseModel, Field, EmailStr, HttpUrl
from typing import Optional, List
from datetime import datetime
class User(BaseModel):
id: int
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
full_name: Optional[str] = None
is_active: bool = True
created_at: datetime = Field(default_factory=datetime.utcnow)
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=8)
full_name: Optional[str] = None
class UserResponse(BaseModel):
id: int
username: str
email: EmailStr
full_name: Optional[str]
is_active: bool
class Config:
orm_mode = True # For SQLAlchemy models
class Image(BaseModel):
url: HttpUrl
name: str
class Item(BaseModel):
name: str
description: Optional[str] = None
price: float = Field(..., gt=0)
tax: Optional[float] = None
tags: List[str] = []
images: Optional[List[Image]] = None
# Request example:
# {
# "name": "Laptop",
# "price": 999.99,
# "tags": ["electronics", "computers"],
# "images": [
# {"url": "http://example.com/img1.jpg", "name": "Front view"}
# ]
# }
from pydantic import BaseModel, Field, validator
class Product(BaseModel):
name: str = Field(..., example="MacBook Pro")
price: float = Field(..., gt=0, example=1999.99)
discount: Optional[float] = Field(None, ge=0, le=100, example=10.0)
@validator('discount')
def discount_check(cls, v, values):
if v and 'price' in values:
discounted = values['price'] * (1 - v/100)
if discounted < 0:
raise ValueError('Discounted price cannot be negative')
return v
class Config:
schema_extra = {
"example": {
"name": "MacBook Pro 16",
"price": 2499.99,
"discount": 15.0
}
}
from fastapi import FastAPI, Path, Query, Body
from typing import Optional
app = FastAPI()
# GET with path parameter
@app.get("/items/{item_id}")
async def read_item(
item_id: int = Path(..., title="The ID of the item", ge=1),
q: Optional[str] = Query(None, max_length=50)
):
return {"item_id": item_id, "q": q}
# POST with request body
@app.post("/items/")
async def create_item(item: Item):
return {"item": item, "message": "Item created"}
# PUT for updates
@app.put("/items/{item_id}")
async def update_item(
item_id: int,
item: Item = Body(...),
):
return {"item_id": item_id, "item": item}
# DELETE
@app.delete("/items/{item_id}")
async def delete_item(item_id: int):
return {"message": f"Item {item_id} deleted"}
# PATCH for partial updates
@app.patch("/items/{item_id}")
async def partial_update_item(
item_id: int,
item: dict = Body(...)
):
return {"item_id": item_id, "updated_fields": item}
from fastapi import Query
from typing import List, Optional
@app.get("/search/")
async def search_items(
q: str = Query(..., min_length=3, max_length=50),
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100),
sort_by: Optional[str] = Query(None, regex="^(name|price|date)$"),
tags: List[str] = Query([], description="Filter by tags")
):
return {
"q": q,
"skip": skip,
"limit": limit,
"sort_by": sort_by,
"tags": tags
}
from typing import List
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
# Hash password, save to DB
db_user = {
"id": 1,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"is_active": True
}
return db_user
@app.get("/users/", response_model=List[UserResponse])
async def list_users(skip: int = 0, limit: int = 100):
users = [...] # Fetch from database
return users
# Exclude fields from response
class UserInDB(User):
hashed_password: str
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
user = get_user_from_db(user_id) # Returns UserInDB
return user # Password excluded automatically
# Use async def when:
# - Making database queries with async driver
# - Calling external APIs with httpx/aiohttp
# - Using async I/O operations
# - Working with async libraries
@app.get("/async-example")
async def async_endpoint():
# Can use await inside
result = await async_database_query()
external_data = await async_http_call()
return {"result": result, "external": external_data}
# Use def when:
# - Working with synchronous libraries
# - Performing CPU-bound operations
# - No async operations needed
@app.get("/sync-example")
def sync_endpoint():
# Regular synchronous code
result = synchronous_database_query()
return {"result": result}
import asyncio
from databases import Database
DATABASE_URL = "postgresql://user:password@localhost/dbname"
database = Database(DATABASE_URL)
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
query = "SELECT * FROM users WHERE id = :user_id"
user = await database.fetch_one(query, {"user_id": user_id})
return user
@app.post("/users/")
async def create_user(user: UserCreate):
query = """
INSERT INTO users (username, email, hashed_password)
VALUES (:username, :email, :password)
RETURNING *
"""
hashed_password = hash_password(user.password)
new_user = await database.fetch_one(
query,
{
"username": user.username,
"email": user.email,
"password": hashed_password
}
)
return new_user
import asyncio
import httpx
async def fetch_user(user_id: int):
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/users/{user_id}")
return response.json()
@app.get("/users/batch")
async def get_multiple_users(user_ids: List[int] = Query(...)):
# Fetch all users concurrently
users = await asyncio.gather(*[fetch_user(uid) for uid in user_ids])
return {"users": users}
from fastapi import Depends
from typing import Optional
# Simple dependency function
async def common_parameters(
q: Optional[str] = None,
skip: int = 0,
limit: int = 100
):
return {"q": q, "skip": skip, "limit": limit}
@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
return commons
@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters)):
return commons
class Pagination:
def __init__(
self,
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100)
):
self.skip = skip
self.limit = limit
@app.get("/items/")
async def list_items(pagination: Pagination = Depends()):
return {
"skip": pagination.skip,
"limit": pagination.limit,
"items": [] # Fetch with pagination
}
from sqlalchemy.orm import Session
from typing import Generator
def get_db() -> Generator[Session, None, None]:
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
db: Session = Depends(get_db)
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
from fastapi import Header, HTTPException
async def verify_token(x_token: str = Header(...)):
if x_token != "secret-token":
raise HTTPException(status_code=400, detail="Invalid token")
return x_token
async def verify_key(x_key: str = Header(...)):
if x_key != "secret-key":
raise HTTPException(status_code=400, detail="Invalid key")
return x_key
async def verify_credentials(
token: str = Depends(verify_token),
key: str = Depends(verify_key)
):
return {"token": token, "key": key}
@app.get("/protected/")
async def protected_route(credentials: dict = Depends(verify_credentials)):
return {"message": "Access granted", "credentials": credentials}
async def log_requests():
print("Request received")
app = FastAPI(dependencies=[Depends(log_requests)])
# This dependency runs for ALL endpoints
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user_from_db(username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
from fastapi import Security
from fastapi.security import APIKeyHeader
API_KEY = "your-api-key"
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(api_key: str = Security(api_key_header)):
if api_key != API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API Key"
)
return api_key
@app.get("/secure-data")
async def get_secure_data(api_key: str = Depends(verify_api_key)):
return {"data": "This is secure data"}
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={
"items:read": "Read items",
"items:write": "Create and update items",
"users:read": "Read user information"
}
)
async def get_current_user_with_scopes(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme)
):
# Verify token and check scopes
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
# Decode JWT and verify scopes...
return user
@app.get("/items/", dependencies=[Security(get_current_user_with_scopes, scopes=["items:read"])])
async def read_items():
return [{"item": "Item 1"}, {"item": "Item 2"}]
@app.post("/items/", dependencies=[Security(get_current_user_with_scopes, scopes=["items:write"])])
async def create_item(item: Item):
return {"item": item}
from sqlalchemy import create_engine, Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Models
class UserModel(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
Base.metadata.create_all(bind=engine)
from sqlalchemy.orm import Session
# Create
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = UserModel(
username=user.username,
email=user.email,
hashed_password=get_password_hash(user.password)
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# Read
@app.get("/users/{user_id}", response_model=UserResponse)
async def read_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# Update
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_update: UserCreate,
db: Session = Depends(get_db)
):
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.username = user_update.username
user.email = user_update.email
if user_update.password:
user.hashed_password = get_password_hash(user_update.password)
db.commit()
db.refresh(user)
return user
# Delete
@app.delete("/users/{user_id}")
async def delete_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
db.delete(user)
db.commit()
return {"message": "User deleted successfully"}
from fastapi import BackgroundTasks
def send_email(email: str, message: str):
# Simulate sending email
print(f"Sending email to {email}: {message}")
def process_file(filename: str):
# Simulate file processing
print(f"Processing file: {filename}")
@app.post("/send-notification/")
async def send_notification(
email: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(send_email, email, "Welcome!")
return {"message": "Notification scheduled"}
@app.post("/upload/")
async def upload_file(
file: str,
background_tasks: BackgroundTasks
):
# Save file first
background_tasks.add_task(process_file, file)
return {"message": "File uploaded, processing in background"}
from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
class CustomException(Exception):
def __init__(self, name: str):
self.name = name
@app.exception_handler(CustomException)
async def custom_exception_handler(request: Request, exc: CustomException):
return JSONResponse(
status_code=418,
content={"message": f"Oops! {exc.name} did something wrong."},
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": exc.errors()},
)
@app.get("/items/{item_id}")
async def read_item(item_id: str):
if item_id == "error":
raise CustomException(name="Item")
return {"item_id": item_id}
from fastapi.testclient import TestClient
client = TestClient(app)
def test_read_main():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello World"}
def test_create_item():
response = client.post(
"/items/",
json={"name": "Test Item", "price": 10.5}
)
assert response.status_code == 200
assert response.json()["name"] == "Test Item"
def test_authentication():
response = client.post(
"/token",
data={"username": "testuser", "password": "testpass"}
)
assert response.status_code == 200
assert "access_token" in response.json()
import pytest
from httpx import AsyncClient
@pytest.mark.anyio
async def test_read_items():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.get("/items/")
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.anyio
async def test_create_user():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.post(
"/users/",
json={
"username": "newuser",
"email": "new@example.com",
"password": "securepass123"
}
)
assert response.status_code == 200
assert response.json()["username"] == "newuser"
# routers/users.py
from fastapi import APIRouter, Depends
router = APIRouter(
prefix="/users",
tags=["users"],
dependencies=[Depends(verify_token)],
responses={404: {"description": "Not found"}},
)
@router.get("/")
async def list_users():
return [{"username": "user1"}, {"username": "user2"}]
@router.get("/{user_id}")
async def get_user(user_id: int):
return {"user_id": user_id}
# main.py
from routers import users
app = FastAPI()
app.include_router(users.router)
my_fastapi_project/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── users.py
│ │ └── items.py
│ ├── dependencies/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ └── database.py
│ └── utils/
│ ├── __init__.py
│ └── security.py
├── tests/
│ ├── __init__.py
│ ├── test_users.py
│ └── test_items.py
├── requirements.txt
├── .env
└── README.md
from pydantic import BaseSettings
class Settings(BaseSettings):
app_name: str = "My FastAPI App"
database_url: str
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
class Config:
env_file = ".env"
settings = Settings()
app = FastAPI(
title="My API",
description="This is a very custom API",
version="1.0.0",
openapi_tags=[
{
"name": "users",
"description": "Operations with users.",
},
{
"name": "items",
"description": "Manage items.",
},
]
)
@app.post(
"/items/",
response_model=Item,
tags=["items"],
summary="Create an item",
description="Create an item with all the information",
response_description="The created item",
)
async def create_item(item: Item):
return item
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY ./app /app
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
gunicorn main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
Skill Version : 1.0.0 Last Updated : October 2025 Skill Category : Backend Development, API Development, Python Compatible With : FastAPI 0.100+, Python 3.7+, Pydantic 2.0+
Weekly Installs
51
Repository
GitHub Stars
47
First Seen
Jan 22, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
gemini-cli43
opencode42
codex41
github-copilot39
cursor37
claude-code35
Lark Drive API 使用指南:飞书云文档、Wiki、表格 Token 处理与文件管理
48,300 周安装