rest-api-design-patterns by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill rest-api-design-patterns一项全面的技能,用于设计、实现和维护 RESTful API。掌握资源建模、HTTP 方法、版本控制策略、分页、过滤、错误处理以及使用 FastAPI、Express.js 和现代框架构建可扩展、可维护 API 的最佳实践。
在以下情况下使用此技能:
REST(表述性状态转移)是一种用于分布式系统的架构风格,强调:
第 0 级 - POX 沼泽:单一 URI,单一 HTTP 方法(通常是 POST)
/api,所有操作都在 POST 请求体中广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
第 1 级 - 资源:多个 URI,每个代表一个资源
/users、/posts、/products第 2 级 - HTTP 动词:正确使用 HTTP 方法
/users/123、POST /users、PUT /users/123第 3 级 - 超媒体控制(HATEOAS):API 响应包含指向相关资源的链接
"_links": {"self": "/users/123", "posts": "/users/123/posts"}1. 使用名词,而非动词
良好:
GET /users
GET /products
POST /orders
不佳:
GET /getUsers
GET /getAllProducts
POST /createOrder
2. 对集合使用复数名词
良好:
GET /users # 集合
GET /users/123 # 单个资源
不佳:
GET /user
GET /user/123
3. 使用小写字母和连字符
良好:
/user-profiles
/order-items
/payment-methods
不佳:
/userProfiles
/OrderItems
/payment_methods
4. 相关资源的层次结构
良好:
/users/123/posts
/users/123/posts/456
/users/123/posts/456/comments
避免深度嵌套(最多 2-3 层):
/organizations/1/departments/2/teams/3/members/4/tasks/5 # 太深了!
集合资源:
GET /products # 列出所有产品
POST /products # 创建新产品
项目资源:
GET /products/123 # 获取特定产品
PUT /products/123 # 替换产品(完全更新)
PATCH /products/123 # 部分更新
DELETE /products/123 # 删除产品
# 评论属于帖子
GET /posts/42/comments # 列出帖子 42 的评论
POST /posts/42/comments # 在帖子 42 上创建评论
GET /posts/42/comments/7 # 获取特定评论
DELETE /posts/42/comments/7 # 删除特定评论
# 直接访问评论的替代方案
GET /comments/7 # 通过 ID 获取评论(如果你有 ID)
GET /products?category=electronics
GET /products?price_min=100&price_max=500
GET /products?sort=price&order=desc
GET /users?status=active&role=admin
GET /posts?author=123&published=true
对于不适合标准 CRUD 的操作:
POST /users/123/activate # 激活用户账户
POST /orders/456/cancel # 取消订单
POST /payments/789/refund # 退款
POST /documents/321/publish # 发布文档
POST /subscriptions/654/renew # 续订订阅
POST /users/bulk-create # 创建多个用户
PATCH /products/bulk-update # 更新多个产品
DELETE /orders/bulk-delete # 删除多个订单
# 或使用查询参数
DELETE /orders?ids=1,2,3,4,5
特性:
FastAPI 示例:
from fastapi import FastAPI, HTTPException
from typing import List, Optional
app = FastAPI()
# 集合端点
@app.get("/items/")
async def list_items(
skip: int = 0,
limit: int = 10,
category: Optional[str] = None
) -> List[dict]:
"""列出项目,支持分页和过滤。"""
# 过滤和分页
items = get_items_from_db(skip=skip, limit=limit, category=category)
return items
# 单个资源端点
@app.get("/items/{item_id}")
async def get_item(item_id: int) -> dict:
"""通过 ID 获取特定项目。"""
item = get_item_from_db(item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item
Express.js 示例:
const express = require('express');
const app = express();
// 集合端点
app.get('/items', async (req, res) => {
try {
const { skip = 0, limit = 10, category } = req.query;
const items = await getItemsFromDB({ skip, limit, category });
res.json(items);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
// 单个资源端点
app.get('/items/:id', async (req, res) => {
try {
const item = await getItemFromDB(req.params.id);
if (!item) {
return res.status(404).json({ error: 'Item not found' });
}
res.json(item);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
特性:
Location 头部FastAPI 示例:
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
class ItemCreate(BaseModel):
name: str
price: float
category: str
description: Optional[str] = None
@app.post("/items/", status_code=status.HTTP_201_CREATED)
async def create_item(item: ItemCreate, response: Response) -> dict:
"""创建新项目。"""
# 验证并创建
new_item = create_item_in_db(item)
# 设置 Location 头部
response.headers["Location"] = f"/items/{new_item.id}"
return new_item
Express.js 示例:
app.use(express.json());
app.post('/items', async (req, res) => {
try {
const { name, price, category, description } = req.body;
// 验证
if (!name || !price || !category) {
return res.status(400).json({
error: 'Missing required fields: name, price, category'
});
}
// 创建资源
const newItem = await createItemInDB({ name, price, category, description });
// 设置 Location 头部并返回 201
res.location(`/items/${newItem.id}`)
.status(201)
.json(newItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
特性:
FastAPI 示例:
class ItemUpdate(BaseModel):
name: str
price: float
category: str
description: str
@app.put("/items/{item_id}")
async def replace_item(item_id: int, item: ItemUpdate) -> dict:
"""替换整个项目(所有字段都是必需的)。"""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
# 替换整个资源
updated_item = replace_item_in_db(item_id, item)
return updated_item
Express.js 示例:
app.put('/items/:id', async (req, res) => {
try {
const { name, price, category, description } = req.body;
// PUT 需要所有字段
if (!name || !price || !category || description === undefined) {
return res.status(400).json({
error: 'PUT requires all fields: name, price, category, description'
});
}
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
// 替换整个资源
const updatedItem = await replaceItemInDB(req.params.id, req.body);
res.json(updatedItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
特性:
FastAPI 示例:
class ItemPatch(BaseModel):
name: Optional[str] = None
price: Optional[float] = None
category: Optional[str] = None
description: Optional[str] = None
@app.patch("/items/{item_id}")
async def update_item(item_id: int, item: ItemPatch) -> dict:
"""部分更新项目(仅更新提供的字段)。"""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
# 仅更新提供的字段
update_data = item.model_dump(exclude_unset=True)
updated_item = update_item_in_db(item_id, update_data)
return updated_item
Express.js 示例:
app.patch('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
// 仅更新提供的字段
const updatedItem = await updateItemInDB(req.params.id, req.body);
res.json(updatedItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
特性:
FastAPI 示例:
@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_item(item_id: int):
"""删除项目。"""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
delete_item_from_db(item_id)
return None # 204 No Content
# 替代方案:返回已删除的资源
@app.delete("/items/{item_id}")
async def delete_item_with_response(item_id: int) -> dict:
"""删除项目并返回它。"""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
delete_item_from_db(item_id)
return existing_item # 200 OK with body
Express.js 示例:
// 204 No Content 方法
app.delete('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
await deleteItemFromDB(req.params.id);
res.status(204).send();
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
// 200 OK with response body 方法
app.delete('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
await deleteItemFromDB(req.params.id);
res.json({ message: 'Item deleted successfully', item: existingItem });
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
在 URI 路径中指定版本 - 清晰、明确、易于理解。
优点:
缺点:
FastAPI 实现:
from fastapi import FastAPI, APIRouter
app = FastAPI()
# 版本 1 路由器
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users")
async def get_users_v1():
return {"users": ["user1", "user2"], "version": "1.0"}
@v1_router.get("/users/{user_id}")
async def get_user_v1(user_id: int):
return {"id": user_id, "name": "John", "version": "1.0"}
# 版本 2 路由器
v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users")
async def get_users_v2(limit: int = 10, offset: int = 0):
"""V2 添加分页"""
return {
"users": ["user1", "user2"],
"pagination": {"limit": limit, "offset": offset},
"version": "2.0"
}
@v2_router.get("/users/{user_id}")
async def get_user_v2(user_id: int):
"""V2 返回更多字段"""
return {
"id": user_id,
"name": "John",
"email": "john@example.com",
"created_at": "2024-01-01",
"version": "2.0"
}
app.include_router(v1_router)
app.include_router(v2_router)
Express.js 实现:
const express = require('express');
const app = express();
// 版本 1 路由
const v1Router = express.Router();
v1Router.get('/users', (req, res) => {
res.json({ users: ['user1', 'user2'], version: '1.0' });
});
v1Router.get('/users/:id', (req, res) => {
res.json({ id: req.params.id, name: 'John', version: '1.0' });
});
// 版本 2 路由
const v2Router = express.Router();
v2Router.get('/users', (req, res) => {
const { limit = 10, offset = 0 } = req.query;
res.json({
users: ['user1', 'user2'],
pagination: { limit, offset },
version: '2.0'
});
});
v2Router.get('/users/:id', (req, res) => {
res.json({
id: req.params.id,
name: 'John',
email: 'john@example.com',
created_at: '2024-01-01',
version: '2.0'
});
});
app.use('/api/v1', v1Router);
app.use('/api/v2', v2Router);
在自定义头部或 Accept 头部中指定版本。
优点:
缺点:
FastAPI 实现:
from fastapi import FastAPI, Header, HTTPException
app = FastAPI()
@app.get("/users")
async def get_users(api_version: str = Header(default="1.0", alias="X-API-Version")):
"""基于头部处理多个版本"""
if api_version == "1.0":
return {"users": ["user1", "user2"], "version": "1.0"}
elif api_version == "2.0":
return {
"users": ["user1", "user2"],
"pagination": {"limit": 10, "offset": 0},
"version": "2.0"
}
else:
raise HTTPException(
status_code=400,
detail=f"Unsupported API version: {api_version}"
)
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
api_version: str = Header(default="1.0", alias="X-API-Version")
):
"""具有版本处理的用户端点"""
if api_version == "1.0":
return {"id": user_id, "name": "John", "version": "1.0"}
elif api_version == "2.0":
return {
"id": user_id,
"name": "John",
"email": "john@example.com",
"created_at": "2024-01-01",
"version": "2.0"
}
else:
raise HTTPException(
status_code=400,
detail=f"Unsupported API version: {api_version}"
)
Express.js 实现:
app.get('/users', (req, res) => {
const version = req.get('X-API-Version') || '1.0';
if (version === '1.0') {
res.json({ users: ['user1', 'user2'], version: '1.0' });
} else if (version === '2.0') {
res.json({
users: ['user1', 'user2'],
pagination: { limit: 10, offset: 0 },
version: '2.0'
});
} else {
res.status(400).json({ error: `Unsupported API version: ${version}` });
}
});
app.get('/users/:id', (req, res) => {
const version = req.get('X-API-Version') || '1.0';
if (version === '1.0') {
res.json({ id: req.params.id, name: 'John', version: '1.0' });
} else if (version === '2.0') {
res.json({
id: req.params.id,
name: 'John',
email: 'john@example.com',
created_at: '2024-01-01',
version: '2.0'
});
} else {
res.status(400).json({ error: `Unsupported API version: ${version}` });
}
});
在 Accept 头部中使用自定义媒体类型指定版本。
FastAPI 实现:
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
@app.get("/users")
async def get_users(request: Request):
"""通过 Accept 头部处理版本控制"""
accept = request.headers.get("accept", "application/vnd.api.v1+json")
if "vnd.api.v1+json" in accept:
return {"users": ["user1", "user2"], "version": "1.0"}
elif "vnd.api.v2+json" in accept:
return {
"users": ["user1", "user2"],
"pagination": {"limit": 10, "offset": 0},
"version": "2.0"
}
else:
raise HTTPException(
status_code=406,
detail="Not Acceptable: Unsupported media type"
)
Express.js 实现:
app.get('/users', (req, res) => {
const accept = req.get('Accept') || 'application/vnd.api.v1+json';
if (accept.includes('vnd.api.v1+json')) {
res.type('application/vnd.api.v1+json')
.json({ users: ['user1', 'user2'], version: '1.0' });
} else if (accept.includes('vnd.api.v2+json')) {
res.type('application/vnd.api.v2+json')
.json({
users: ['user1', 'user2'],
pagination: { limit: 10, offset: 0 },
version: '2.0'
});
} else {
res.status(406).json({ error: 'Not Acceptable: Unsupported media type' });
}
});
将版本作为查询参数(最不推荐)。
GET /users?version=2.0
GET /users/123?v=2
缺点:
简单,但对于大型数据集可能存在性能问题。
FastAPI 实现:
from fastapi import FastAPI, Query
from typing import List
from pydantic import BaseModel
class PaginatedResponse(BaseModel):
items: List[dict]
total: int
limit: int
offset: int
has_more: bool
@app.get("/items", response_model=PaginatedResponse)
async def list_items(
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0)
):
"""基于偏移量的分页"""
# 获取总计数
total = count_items_in_db()
# 获取分页项目
items = get_items_from_db(limit=limit, offset=offset)
# 检查是否有更多项目
has_more = (offset + limit) < total
return {
"items": items,
"total": total,
"limit": limit,
"offset": offset,
"has_more": has_more
}
Express.js 实现:
app.get('/items', async (req, res) => {
try {
const limit = Math.min(parseInt(req.query.limit) || 10, 100);
const offset = parseInt(req.query.offset) || 0;
const total = await countItemsInDB();
const items = await getItemsFromDB({ limit, offset });
const hasMore = (offset + limit) < total;
res.json({
items,
total,
limit,
offset,
has_more: hasMore
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
对于大型数据集更高效,防止在分页期间数据更改导致的问题。
FastAPI 实现:
from typing import Optional
class CursorPaginatedResponse(BaseModel):
items: List[dict]
next_cursor: Optional[str] = None
has_more: bool
@app.get("/items", response_model=CursorPaginatedResponse)
async def list_items_cursor(
limit: int = Query(default=10, ge=1, le=100),
cursor: Optional[str] = None
):
"""基于游标的分页"""
# 获取游标之后的项目
items = get_items_after_cursor(cursor=cursor, limit=limit + 1)
# 检查是否有更多项目
has_more = len(items) > limit
# 从最后一个项目获取下一个游标
next_cursor = None
if has_more:
items = items[:limit] # 移除额外项目
next_cursor = items[-1]["id"] # 使用最后一个项目 ID 作为游标
return {
"items": items,
"next_cursor": next_cursor,
"has_more": has_more
}
Express.js 实现:
app.get('/items', async (req, res) => {
try {
const limit = Math.min(parseInt(req.query.limit) || 10, 100);
const cursor = req.query.cursor || null;
// 获取一个额外项目以检查是否有更多
const items = await getItemsAfterCursor({ cursor, limit: limit + 1 });
const hasMore = items.length > limit;
let nextCursor = null;
if (hasMore) {
items.pop(); // 移除额外项目
nextCursor = items[items.length - 1].id; // 最后一个项目 ID 作为游标
}
res.json({
items,
next_cursor: nextCursor,
has_more: hasMore
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
对于带有页码的用户界面友好。
FastAPI 实现:
class PagePaginatedResponse(BaseModel):
items: List[dict]
page: int
page_size: int
total_pages: int
total_items: int
@app.get("/items", response_model=PagePaginatedResponse)
async def list_items_pages(
page: int = Query(default=1, ge=1),
page_size: int = Query(default=10, ge=1, le=100)
):
"""基于页面的分页"""
# 计算偏移量
offset = (page - 1) * page_size
# 获取总计数和项目
total_items = count_items_in_db()
items = get_items_from_db(limit=page_size, offset=offset)
# 计算总页数
total_pages = (total_items + page_size - 1) // page_size
return {
"items": items,
"page": page,
"page_size": page_size,
"total_pages": total_pages,
"total_items": total_items
}
Express.js 实现:
app.get('/items', async (req, res) => {
try {
const page = Math.max(parseInt(req.query.page) || 1, 1);
const pageSize = Math.min(parseInt(req.query.page_size) || 10, 100);
const offset = (page - 1) * pageSize;
const totalItems = await countItemsInDB();
const items = await getItemsFromDB({ limit: pageSize, offset });
const totalPages = Math.ceil(totalItems / pageSize);
res.json({
items,
page,
page_size: pageSize,
total_pages: totalPages,
total_items: totalItems
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
FastAPI 实现:
from enum import Enum
from typing import Optional, List
class SortOrder(str, Enum):
asc = "asc"
desc = "desc"
@app.get("/products")
async def list_products(
# 过滤
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
in_stock: Optional[bool] = None,
tags: Optional[List[str]] = Query(None),
search: Optional[str] = None,
# 排序
sort_by: Optional[str] = Query(default="created_at"),
order: SortOrder = SortOrder.desc,
# 分页
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0)
):
"""高级过滤和排序"""
filters = {
"category": category,
"min_price": min_price,
"max_price": max_price,
"in_stock": in_stock,
"tags": tags,
"search": search
}
# 移除 None 值
filters = {k: v for k, v in filters.items() if v is not None}
# 查询数据库
products = query_products(
filters=filters,
sort_by=sort_by,
order=order.value,
limit=limit,
offset=offset
)
return {
"products": products,
"filters": filters,
"sort": {"by": sort_by, "order": order.value},
"pagination": {"limit": limit, "offset": offset}
}
Express.js 实现:
app.get('/products', async (req, res) => {
try {
const {
category,
min_price,
max_price,
in_stock,
tags,
search,
sort_by = 'created_at',
order = 'desc',
limit = 10,
offset = 0
} = req.query;
// 构建过滤器
const filters = {};
if (category) filters.category = category;
if (min_price) filters.min_price = parseFloat(min_price);
if (max_price) filters.max_price = parseFloat(max_price);
if (in_stock !== undefined) filters.in_stock = in_stock === 'true';
if (tags) filters.tags = Array.isArray(tags) ? tags : [tags];
if (search) filters.search = search;
// 查询数据库
const products = await queryProducts({
filters,
sortBy: sort_by,
order,
limit: Math.min(parseInt(limit), 100),
offset: parseInt(offset)
});
res.json({
products,
filters,
sort: { by: sort_by, order },
pagination: { limit, offset }
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
标准错误响应结构:
{
"error": {
"code": "RESOURCE_NOT_FOUND",
"message": "The requested user was not found",
"details": {
"user_id": "123",
"resource": "user"
},
"timestamp": "2024-01-15T10:30:00Z"
}
}
成功码:
200 OK:成功的 GET、PUT、PATCH、DELETE 并返回响应201 Created:成功的 POST 创建资源202 Accepted:请求已接受进行异步处理204 No Content:成功的 DELETE 且无响应体客户端错误码:
400 Bad Request:无效的请求语法或验证错误401 Unauthorized:需要身份验证403 Forbidden:已认证但未授权404 Not Found:资源不存在405 Method Not Allowed:HTTP 方法不受支持409 Conflict:请求与当前状态冲突422 Unprocessable Entity:验证失败429 Too Many Requests:超出速率限制服务器错误码:
500 Internal Server Error:通用服务器错误502 Bad Gateway:上游服务器响应无效503 Service Unavailable:服务器暂时不可用504 Gateway Timeout:上游服务器超时from fastapi import FastAPI, HTTPException, status, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from datetime import datetime
app = FastAPI()
# 自定义异常
class ResourceNotFoundError(Exception):
def __init__(self, resource: str, resource_id: str):
self.resource = resource
self.resource_id = resource_id
# 全局异常处理器
@app.exception_handler(ResourceNotFoundError)
async def resource_not_found_handler(request: Request, exc: ResourceNotFoundError):
return JSONResponse(
status_code=404,
content={
"error": {
"code": "RESOURCE_NOT_FOUND",
"message": f"The requested {exc.resource} was not found",
"details": {
"resource": exc.resource,
"resource_id": exc.resource_id
},
"timestamp": datetime.utcnow().isoformat() + "Z"
}
}
)
# 验证错误处理器
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=422,
content={
"error": {
"code": "VALIDATION_ERROR",
"message": "Request validation failed",
"details": exc.errors(),
"timestamp": datetime.utcnow().isoformat() + "Z"
}
}
)
# 使用异常
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = get_user_from_db(user_id)
if not user:
raise ResourceNotFoundError("user", str(user_id))
return user
# 手动错误响应
@app.post("/users")
async def create_user(email: str):
if user_exists(email):
raise HTTPException(
status_code=409,
detail={
A comprehensive skill for designing, implementing, and maintaining RESTful APIs. Master resource modeling, HTTP methods, versioning strategies, pagination, filtering, error handling, and best practices for building scalable, maintainable APIs using FastAPI, Express.js, and modern frameworks.
Use this skill when:
REST (Representational State Transfer) is an architectural style for distributed systems that emphasizes:
Level 0 - The Swamp of POX : Single URI, single HTTP method (usually POST)
/api with all operations in POST bodyLevel 1 - Resources : Multiple URIs, each representing a resource
/users, /posts, /productsLevel 2 - HTTP Verbs : Proper use of HTTP methods
/users/123, POST /users, PUT /users/123Level 3 - Hypermedia Controls (HATEOAS) : API responses include links to related resources
"_links": {"self": "/users/123", "posts": "/users/123/posts"}1. Use Nouns, Not Verbs
Good:
GET /users
GET /products
POST /orders
Bad:
GET /getUsers
GET /getAllProducts
POST /createOrder
2. Use Plural Nouns for Collections
Good:
GET /users # Collection
GET /users/123 # Individual resource
Bad:
GET /user
GET /user/123
3. Use Lowercase and Hyphens
Good:
/user-profiles
/order-items
/payment-methods
Bad:
/userProfiles
/OrderItems
/payment_methods
4. Hierarchy for Related Resources
Good:
/users/123/posts
/users/123/posts/456
/users/123/posts/456/comments
Avoid Deep Nesting (max 2-3 levels):
/organizations/1/departments/2/teams/3/members/4/tasks/5 # Too deep!
Collection Resource:
GET /products # List all products
POST /products # Create new product
Item Resource:
GET /products/123 # Get specific product
PUT /products/123 # Replace product (full update)
PATCH /products/123 # Partial update
DELETE /products/123 # Delete product
# Comments belong to posts
GET /posts/42/comments # List comments for post 42
POST /posts/42/comments # Create comment on post 42
GET /posts/42/comments/7 # Get specific comment
DELETE /posts/42/comments/7 # Delete specific comment
# Alternative for accessing comments directly
GET /comments/7 # Get comment by ID (if you have it)
GET /products?category=electronics
GET /products?price_min=100&price_max=500
GET /products?sort=price&order=desc
GET /users?status=active&role=admin
GET /posts?author=123&published=true
For operations that don't fit standard CRUD:
POST /users/123/activate # Activate user account
POST /orders/456/cancel # Cancel order
POST /payments/789/refund # Refund payment
POST /documents/321/publish # Publish document
POST /subscriptions/654/renew # Renew subscription
POST /users/bulk-create # Create multiple users
PATCH /products/bulk-update # Update multiple products
DELETE /orders/bulk-delete # Delete multiple orders
# Or using query parameters
DELETE /orders?ids=1,2,3,4,5
Characteristics:
FastAPI Example:
from fastapi import FastAPI, HTTPException
from typing import List, Optional
app = FastAPI()
# Collection endpoint
@app.get("/items/")
async def list_items(
skip: int = 0,
limit: int = 10,
category: Optional[str] = None
) -> List[dict]:
"""List items with pagination and filtering."""
# Filter and paginate
items = get_items_from_db(skip=skip, limit=limit, category=category)
return items
# Individual resource endpoint
@app.get("/items/{item_id}")
async def get_item(item_id: int) -> dict:
"""Get a specific item by ID."""
item = get_item_from_db(item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item
Express.js Example:
const express = require('express');
const app = express();
// Collection endpoint
app.get('/items', async (req, res) => {
try {
const { skip = 0, limit = 10, category } = req.query;
const items = await getItemsFromDB({ skip, limit, category });
res.json(items);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
// Individual resource endpoint
app.get('/items/:id', async (req, res) => {
try {
const item = await getItemFromDB(req.params.id);
if (!item) {
return res.status(404).json({ error: 'Item not found' });
}
res.json(item);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
Characteristics:
Location header with new resource URIFastAPI Example:
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
class ItemCreate(BaseModel):
name: str
price: float
category: str
description: Optional[str] = None
@app.post("/items/", status_code=status.HTTP_201_CREATED)
async def create_item(item: ItemCreate, response: Response) -> dict:
"""Create a new item."""
# Validate and create
new_item = create_item_in_db(item)
# Set Location header
response.headers["Location"] = f"/items/{new_item.id}"
return new_item
Express.js Example:
app.use(express.json());
app.post('/items', async (req, res) => {
try {
const { name, price, category, description } = req.body;
// Validate
if (!name || !price || !category) {
return res.status(400).json({
error: 'Missing required fields: name, price, category'
});
}
// Create resource
const newItem = await createItemInDB({ name, price, category, description });
// Set Location header and return 201
res.location(`/items/${newItem.id}`)
.status(201)
.json(newItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
Characteristics:
FastAPI Example:
class ItemUpdate(BaseModel):
name: str
price: float
category: str
description: str
@app.put("/items/{item_id}")
async def replace_item(item_id: int, item: ItemUpdate) -> dict:
"""Replace an entire item (all fields required)."""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
# Replace entire resource
updated_item = replace_item_in_db(item_id, item)
return updated_item
Express.js Example:
app.put('/items/:id', async (req, res) => {
try {
const { name, price, category, description } = req.body;
// All fields required for PUT
if (!name || !price || !category || description === undefined) {
return res.status(400).json({
error: 'PUT requires all fields: name, price, category, description'
});
}
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
// Replace entire resource
const updatedItem = await replaceItemInDB(req.params.id, req.body);
res.json(updatedItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
Characteristics:
FastAPI Example:
class ItemPatch(BaseModel):
name: Optional[str] = None
price: Optional[float] = None
category: Optional[str] = None
description: Optional[str] = None
@app.patch("/items/{item_id}")
async def update_item(item_id: int, item: ItemPatch) -> dict:
"""Partially update an item (only provided fields)."""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
# Update only provided fields
update_data = item.model_dump(exclude_unset=True)
updated_item = update_item_in_db(item_id, update_data)
return updated_item
Express.js Example:
app.patch('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
// Update only provided fields
const updatedItem = await updateItemInDB(req.params.id, req.body);
res.json(updatedItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
Characteristics:
FastAPI Example:
@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_item(item_id: int):
"""Delete an item."""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
delete_item_from_db(item_id)
return None # 204 No Content
# Alternative: Return deleted resource
@app.delete("/items/{item_id}")
async def delete_item_with_response(item_id: int) -> dict:
"""Delete an item and return it."""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
delete_item_from_db(item_id)
return existing_item # 200 OK with body
Express.js Example:
// 204 No Content approach
app.delete('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
await deleteItemFromDB(req.params.id);
res.status(204).send();
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
// 200 OK with response body approach
app.delete('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
await deleteItemFromDB(req.params.id);
res.json({ message: 'Item deleted successfully', item: existingItem });
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
Version in the URI path - clear, explicit, easy to understand.
Pros:
Cons:
FastAPI Implementation:
from fastapi import FastAPI, APIRouter
app = FastAPI()
# Version 1 router
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users")
async def get_users_v1():
return {"users": ["user1", "user2"], "version": "1.0"}
@v1_router.get("/users/{user_id}")
async def get_user_v1(user_id: int):
return {"id": user_id, "name": "John", "version": "1.0"}
# Version 2 router
v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users")
async def get_users_v2(limit: int = 10, offset: int = 0):
"""V2 adds pagination"""
return {
"users": ["user1", "user2"],
"pagination": {"limit": limit, "offset": offset},
"version": "2.0"
}
@v2_router.get("/users/{user_id}")
async def get_user_v2(user_id: int):
"""V2 returns more fields"""
return {
"id": user_id,
"name": "John",
"email": "john@example.com",
"created_at": "2024-01-01",
"version": "2.0"
}
app.include_router(v1_router)
app.include_router(v2_router)
Express.js Implementation:
const express = require('express');
const app = express();
// Version 1 routes
const v1Router = express.Router();
v1Router.get('/users', (req, res) => {
res.json({ users: ['user1', 'user2'], version: '1.0' });
});
v1Router.get('/users/:id', (req, res) => {
res.json({ id: req.params.id, name: 'John', version: '1.0' });
});
// Version 2 routes
const v2Router = express.Router();
v2Router.get('/users', (req, res) => {
const { limit = 10, offset = 0 } = req.query;
res.json({
users: ['user1', 'user2'],
pagination: { limit, offset },
version: '2.0'
});
});
v2Router.get('/users/:id', (req, res) => {
res.json({
id: req.params.id,
name: 'John',
email: 'john@example.com',
created_at: '2024-01-01',
version: '2.0'
});
});
app.use('/api/v1', v1Router);
app.use('/api/v2', v2Router);
Version specified in custom header or Accept header.
Pros:
Cons:
FastAPI Implementation:
from fastapi import FastAPI, Header, HTTPException
app = FastAPI()
@app.get("/users")
async def get_users(api_version: str = Header(default="1.0", alias="X-API-Version")):
"""Handle multiple versions based on header"""
if api_version == "1.0":
return {"users": ["user1", "user2"], "version": "1.0"}
elif api_version == "2.0":
return {
"users": ["user1", "user2"],
"pagination": {"limit": 10, "offset": 0},
"version": "2.0"
}
else:
raise HTTPException(
status_code=400,
detail=f"Unsupported API version: {api_version}"
)
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
api_version: str = Header(default="1.0", alias="X-API-Version")
):
"""User endpoint with version handling"""
if api_version == "1.0":
return {"id": user_id, "name": "John", "version": "1.0"}
elif api_version == "2.0":
return {
"id": user_id,
"name": "John",
"email": "john@example.com",
"created_at": "2024-01-01",
"version": "2.0"
}
else:
raise HTTPException(
status_code=400,
detail=f"Unsupported API version: {api_version}"
)
Express.js Implementation:
app.get('/users', (req, res) => {
const version = req.get('X-API-Version') || '1.0';
if (version === '1.0') {
res.json({ users: ['user1', 'user2'], version: '1.0' });
} else if (version === '2.0') {
res.json({
users: ['user1', 'user2'],
pagination: { limit: 10, offset: 0 },
version: '2.0'
});
} else {
res.status(400).json({ error: `Unsupported API version: ${version}` });
}
});
app.get('/users/:id', (req, res) => {
const version = req.get('X-API-Version') || '1.0';
if (version === '1.0') {
res.json({ id: req.params.id, name: 'John', version: '1.0' });
} else if (version === '2.0') {
res.json({
id: req.params.id,
name: 'John',
email: 'john@example.com',
created_at: '2024-01-01',
version: '2.0'
});
} else {
res.status(400).json({ error: `Unsupported API version: ${version}` });
}
});
Version specified in Accept header with custom media types.
FastAPI Implementation:
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
@app.get("/users")
async def get_users(request: Request):
"""Handle versioning via Accept header"""
accept = request.headers.get("accept", "application/vnd.api.v1+json")
if "vnd.api.v1+json" in accept:
return {"users": ["user1", "user2"], "version": "1.0"}
elif "vnd.api.v2+json" in accept:
return {
"users": ["user1", "user2"],
"pagination": {"limit": 10, "offset": 0},
"version": "2.0"
}
else:
raise HTTPException(
status_code=406,
detail="Not Acceptable: Unsupported media type"
)
Express.js Implementation:
app.get('/users', (req, res) => {
const accept = req.get('Accept') || 'application/vnd.api.v1+json';
if (accept.includes('vnd.api.v1+json')) {
res.type('application/vnd.api.v1+json')
.json({ users: ['user1', 'user2'], version: '1.0' });
} else if (accept.includes('vnd.api.v2+json')) {
res.type('application/vnd.api.v2+json')
.json({
users: ['user1', 'user2'],
pagination: { limit: 10, offset: 0 },
version: '2.0'
});
} else {
res.status(406).json({ error: 'Not Acceptable: Unsupported media type' });
}
});
Version as query parameter (least recommended).
GET /users?version=2.0
GET /users/123?v=2
Cons:
Simple but can have performance issues with large datasets.
FastAPI Implementation:
from fastapi import FastAPI, Query
from typing import List
from pydantic import BaseModel
class PaginatedResponse(BaseModel):
items: List[dict]
total: int
limit: int
offset: int
has_more: bool
@app.get("/items", response_model=PaginatedResponse)
async def list_items(
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0)
):
"""Offset-based pagination"""
# Get total count
total = count_items_in_db()
# Get paginated items
items = get_items_from_db(limit=limit, offset=offset)
# Check if there are more items
has_more = (offset + limit) < total
return {
"items": items,
"total": total,
"limit": limit,
"offset": offset,
"has_more": has_more
}
Express.js Implementation:
app.get('/items', async (req, res) => {
try {
const limit = Math.min(parseInt(req.query.limit) || 10, 100);
const offset = parseInt(req.query.offset) || 0;
const total = await countItemsInDB();
const items = await getItemsFromDB({ limit, offset });
const hasMore = (offset + limit) < total;
res.json({
items,
total,
limit,
offset,
has_more: hasMore
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
More efficient for large datasets, prevents issues with data changes during pagination.
FastAPI Implementation:
from typing import Optional
class CursorPaginatedResponse(BaseModel):
items: List[dict]
next_cursor: Optional[str] = None
has_more: bool
@app.get("/items", response_model=CursorPaginatedResponse)
async def list_items_cursor(
limit: int = Query(default=10, ge=1, le=100),
cursor: Optional[str] = None
):
"""Cursor-based pagination"""
# Get items after cursor
items = get_items_after_cursor(cursor=cursor, limit=limit + 1)
# Check if there are more items
has_more = len(items) > limit
# Get next cursor from last item
next_cursor = None
if has_more:
items = items[:limit] # Remove extra item
next_cursor = items[-1]["id"] # Use last item ID as cursor
return {
"items": items,
"next_cursor": next_cursor,
"has_more": has_more
}
Express.js Implementation:
app.get('/items', async (req, res) => {
try {
const limit = Math.min(parseInt(req.query.limit) || 10, 100);
const cursor = req.query.cursor || null;
// Get one extra item to check if there are more
const items = await getItemsAfterCursor({ cursor, limit: limit + 1 });
const hasMore = items.length > limit;
let nextCursor = null;
if (hasMore) {
items.pop(); // Remove extra item
nextCursor = items[items.length - 1].id; // Last item ID as cursor
}
res.json({
items,
next_cursor: nextCursor,
has_more: hasMore
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
User-friendly for UIs with page numbers.
FastAPI Implementation:
class PagePaginatedResponse(BaseModel):
items: List[dict]
page: int
page_size: int
total_pages: int
total_items: int
@app.get("/items", response_model=PagePaginatedResponse)
async def list_items_pages(
page: int = Query(default=1, ge=1),
page_size: int = Query(default=10, ge=1, le=100)
):
"""Page-based pagination"""
# Calculate offset
offset = (page - 1) * page_size
# Get total count and items
total_items = count_items_in_db()
items = get_items_from_db(limit=page_size, offset=offset)
# Calculate total pages
total_pages = (total_items + page_size - 1) // page_size
return {
"items": items,
"page": page,
"page_size": page_size,
"total_pages": total_pages,
"total_items": total_items
}
Express.js Implementation:
app.get('/items', async (req, res) => {
try {
const page = Math.max(parseInt(req.query.page) || 1, 1);
const pageSize = Math.min(parseInt(req.query.page_size) || 10, 100);
const offset = (page - 1) * pageSize;
const totalItems = await countItemsInDB();
const items = await getItemsFromDB({ limit: pageSize, offset });
const totalPages = Math.ceil(totalItems / pageSize);
res.json({
items,
page,
page_size: pageSize,
total_pages: totalPages,
total_items: totalItems
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
FastAPI Implementation:
from enum import Enum
from typing import Optional, List
class SortOrder(str, Enum):
asc = "asc"
desc = "desc"
@app.get("/products")
async def list_products(
# Filtering
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
in_stock: Optional[bool] = None,
tags: Optional[List[str]] = Query(None),
search: Optional[str] = None,
# Sorting
sort_by: Optional[str] = Query(default="created_at"),
order: SortOrder = SortOrder.desc,
# Pagination
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0)
):
"""Advanced filtering and sorting"""
filters = {
"category": category,
"min_price": min_price,
"max_price": max_price,
"in_stock": in_stock,
"tags": tags,
"search": search
}
# Remove None values
filters = {k: v for k, v in filters.items() if v is not None}
# Query database
products = query_products(
filters=filters,
sort_by=sort_by,
order=order.value,
limit=limit,
offset=offset
)
return {
"products": products,
"filters": filters,
"sort": {"by": sort_by, "order": order.value},
"pagination": {"limit": limit, "offset": offset}
}
Express.js Implementation:
app.get('/products', async (req, res) => {
try {
const {
category,
min_price,
max_price,
in_stock,
tags,
search,
sort_by = 'created_at',
order = 'desc',
limit = 10,
offset = 0
} = req.query;
// Build filters
const filters = {};
if (category) filters.category = category;
if (min_price) filters.min_price = parseFloat(min_price);
if (max_price) filters.max_price = parseFloat(max_price);
if (in_stock !== undefined) filters.in_stock = in_stock === 'true';
if (tags) filters.tags = Array.isArray(tags) ? tags : [tags];
if (search) filters.search = search;
// Query database
const products = await queryProducts({
filters,
sortBy: sort_by,
order,
limit: Math.min(parseInt(limit), 100),
offset: parseInt(offset)
});
res.json({
products,
filters,
sort: { by: sort_by, order },
pagination: { limit, offset }
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
Standard Error Response Structure:
{
"error": {
"code": "RESOURCE_NOT_FOUND",
"message": "The requested user was not found",
"details": {
"user_id": "123",
"resource": "user"
},
"timestamp": "2024-01-15T10:30:00Z"
}
}
Success Codes:
200 OK: Successful GET, PUT, PATCH, DELETE with response201 Created: Successful POST creating a resource202 Accepted: Request accepted for async processing204 No Content: Successful DELETE with no response bodyClient Error Codes:
400 Bad Request: Invalid request syntax or validation error401 Unauthorized: Authentication required403 Forbidden: Authenticated but not authorized404 Not Found: Resource doesn't exist405 Method Not Allowed: HTTP method not supported409 Conflict: Request conflicts with current state422 Unprocessable Entity: Validation failed429 Too Many Requests: Rate limit exceededServer Error Codes:
500 Internal Server Error: Generic server error502 Bad Gateway: Invalid response from upstream server503 Service Unavailable: Server temporarily unavailable504 Gateway Timeout: Upstream server timeoutfrom fastapi import FastAPI, HTTPException, status, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from datetime import datetime
app = FastAPI()
# Custom exception
class ResourceNotFoundError(Exception):
def __init__(self, resource: str, resource_id: str):
self.resource = resource
self.resource_id = resource_id
# Global exception handler
@app.exception_handler(ResourceNotFoundError)
async def resource_not_found_handler(request: Request, exc: ResourceNotFoundError):
return JSONResponse(
status_code=404,
content={
"error": {
"code": "RESOURCE_NOT_FOUND",
"message": f"The requested {exc.resource} was not found",
"details": {
"resource": exc.resource,
"resource_id": exc.resource_id
},
"timestamp": datetime.utcnow().isoformat() + "Z"
}
}
)
# Validation error handler
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=422,
content={
"error": {
"code": "VALIDATION_ERROR",
"message": "Request validation failed",
"details": exc.errors(),
"timestamp": datetime.utcnow().isoformat() + "Z"
}
}
)
# Using exceptions
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = get_user_from_db(user_id)
if not user:
raise ResourceNotFoundError("user", str(user_id))
return user
# Manual error responses
@app.post("/users")
async def create_user(email: str):
if user_exists(email):
raise HTTPException(
status_code=409,
detail={
"code": "DUPLICATE_EMAIL",
"message": "A user with this email already exists",
"details": {"email": email}
}
)
return create_user_in_db(email)
const express = require('express');
const app = express();
app.use(express.json());
// Custom error class
class ResourceNotFoundError extends Error {
constructor(resource, resourceId) {
super(`${resource} not found`);
this.name = 'ResourceNotFoundError';
this.resource = resource;
this.resourceId = resourceId;
this.statusCode = 404;
}
}
class ValidationError extends Error {
constructor(message, details) {
super(message);
this.name = 'ValidationError';
this.details = details;
this.statusCode = 422;
}
}
// Routes
app.get('/users/:id', async (req, res, next) => {
try {
const user = await getUserFromDB(req.params.id);
if (!user) {
throw new ResourceNotFoundError('user', req.params.id);
}
res.json(user);
} catch (error) {
next(error);
}
});
app.post('/users', async (req, res, next) => {
try {
const { email } = req.body;
if (!email) {
throw new ValidationError('Validation failed', {
field: 'email',
message: 'Email is required'
});
}
const userExists = await checkUserExists(email);
if (userExists) {
const error = new Error('Duplicate email');
error.statusCode = 409;
error.code = 'DUPLICATE_EMAIL';
error.details = { email };
throw error;
}
const newUser = await createUserInDB(email);
res.status(201).json(newUser);
} catch (error) {
next(error);
}
});
// Global error handler (must be last)
app.use((err, req, res, next) => {
const statusCode = err.statusCode || 500;
const code = err.code || err.name || 'INTERNAL_SERVER_ERROR';
const errorResponse = {
error: {
code,
message: err.message,
timestamp: new Date().toISOString()
}
};
// Add details if available
if (err.details) {
errorResponse.error.details = err.details;
} else if (err.resource && err.resourceId) {
errorResponse.error.details = {
resource: err.resource,
resource_id: err.resourceId
};
}
// Log error for debugging (don't expose in production)
if (process.env.NODE_ENV !== 'production') {
errorResponse.error.stack = err.stack;
}
res.status(statusCode).json(errorResponse);
});
HATEOAS (Hypermedia as the Engine of Application State) means including links to related resources in API responses.
Benefits:
from fastapi import FastAPI
from pydantic import BaseModel, HttpUrl
from typing import List, Optional
class Link(BaseModel):
rel: str
href: str
method: str = "GET"
class UserResponse(BaseModel):
id: int
name: str
email: str
links: List[Link]
class UserListResponse(BaseModel):
users: List[UserResponse]
links: List[Link]
app = FastAPI()
def build_user_links(user_id: int, base_url: str = "http://api.example.com") -> List[Link]:
"""Build HATEOAS links for a user"""
return [
Link(rel="self", href=f"{base_url}/users/{user_id}", method="GET"),
Link(rel="update", href=f"{base_url}/users/{user_id}", method="PUT"),
Link(rel="delete", href=f"{base_url}/users/{user_id}", method="DELETE"),
Link(rel="posts", href=f"{base_url}/users/{user_id}/posts", method="GET"),
Link(rel="create_post", href=f"{base_url}/users/{user_id}/posts", method="POST")
]
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""Get user with HATEOAS links"""
user = get_user_from_db(user_id)
return {
"id": user.id,
"name": user.name,
"email": user.email,
"links": build_user_links(user.id)
}
@app.get("/users", response_model=UserListResponse)
async def list_users():
"""List users with HATEOAS links"""
users = get_users_from_db()
users_with_links = [
{
"id": user.id,
"name": user.name,
"email": user.email,
"links": build_user_links(user.id)
}
for user in users
]
collection_links = [
Link(rel="self", href="http://api.example.com/users", method="GET"),
Link(rel="create", href="http://api.example.com/users", method="POST")
]
return {
"users": users_with_links,
"links": collection_links
}
app.get('/users/:id', async (req, res) => {
try {
const user = await getUserFromDB(req.params.id);
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
const baseUrl = `${req.protocol}://${req.get('host')}`;
res.json({
id: user.id,
name: user.name,
email: user.email,
_links: {
self: { href: `${baseUrl}/users/${user.id}`, method: 'GET' },
update: { href: `${baseUrl}/users/${user.id}`, method: 'PUT' },
delete: { href: `${baseUrl}/users/${user.id}`, method: 'DELETE' },
posts: { href: `${baseUrl}/users/${user.id}/posts`, method: 'GET' },
create_post: { href: `${baseUrl}/users/${user.id}/posts`, method: 'POST' }
}
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
app.get('/users', async (req, res) => {
try {
const users = await getUsersFromDB();
const baseUrl = `${req.protocol}://${req.get('host')}`;
const usersWithLinks = users.map(user => ({
id: user.id,
name: user.name,
email: user.email,
_links: {
self: { href: `${baseUrl}/users/${user.id}`, method: 'GET' },
update: { href: `${baseUrl}/users/${user.id}`, method: 'PUT' },
delete: { href: `${baseUrl}/users/${user.id}`, method: 'DELETE' }
}
}));
res.json({
users: usersWithLinks,
_links: {
self: { href: `${baseUrl}/users`, method: 'GET' },
create: { href: `${baseUrl}/users`, method: 'POST' }
}
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
FastAPI Implementation:
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import hashlib
app = FastAPI()
def generate_etag(data: dict) -> str:
"""Generate ETag from response data"""
content = str(data).encode('utf-8')
return hashlib.md5(content).hexdigest()
@app.get("/users/{user_id}")
async def get_user_cached(user_id: int, request: Request):
"""Get user with ETag caching"""
user = get_user_from_db(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Generate ETag
etag = generate_etag(user)
# Check If-None-Match header
if_none_match = request.headers.get("if-none-match")
if if_none_match == etag:
return Response(status_code=304) # Not Modified
# Return with ETag header
return JSONResponse(
content=user,
headers={"ETag": etag, "Cache-Control": "max-age=300"}
)
Express.js Implementation:
const crypto = require('crypto');
function generateETag(data) {
const content = JSON.stringify(data);
return crypto.createHash('md5').update(content).digest('hex');
}
app.get('/users/:id', async (req, res) => {
try {
const user = await getUserFromDB(req.params.id);
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
const etag = generateETag(user);
// Check If-None-Match header
if (req.get('If-None-Match') === etag) {
return res.status(304).send(); // Not Modified
}
res.set('ETag', etag)
.set('Cache-Control', 'max-age=300')
.json(user);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
Express.js Implementation:
const rateLimit = require('express-rate-limit');
// Create rate limiter
const apiLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // Limit each IP to 100 requests per windowMs
message: {
error: {
code: 'RATE_LIMIT_EXCEEDED',
message: 'Too many requests, please try again later'
}
},
standardHeaders: true, // Return rate limit info in headers
legacyHeaders: false
});
// Apply to all routes
app.use('/api/', apiLimiter);
// Or create specific limiters
const createAccountLimiter = rateLimit({
windowMs: 60 * 60 * 1000, // 1 hour
max: 5, // 5 requests per hour
message: 'Too many accounts created, please try again later'
});
app.post('/api/users', createAccountLimiter, async (req, res) => {
// Create user
});
FastAPI Implementation:
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
# Add compression middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get("/large-data")
async def get_large_data():
"""This response will be compressed if > 1000 bytes"""
return {"data": [{"id": i, "value": f"item_{i}"} for i in range(1000)]}
Express.js Implementation:
const compression = require('compression');
// Add compression middleware
app.use(compression({
threshold: 1024, // Only compress responses > 1KB
level: 6 // Compression level (0-9)
}));
app.get('/large-data', (req, res) => {
const data = Array.from({ length: 1000 }, (_, i) => ({
id: i,
value: `item_${i}`
}));
res.json({ data });
});
JWT Authentication in FastAPI:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
app = FastAPI()
security = HTTPBearer()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: timedelta = None):
"""Create JWT token"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token"""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired"
)
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
@app.post("/login")
async def login(email: str, password: str):
"""Login and get access token"""
user = authenticate_user(email, password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password"
)
access_token = create_access_token(
data={"sub": user.email, "user_id": user.id}
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def get_current_user(payload: dict = Depends(verify_token)):
"""Protected endpoint - requires authentication"""
user_id = payload.get("user_id")
user = get_user_from_db(user_id)
return user
JWT Authentication in Express.js:
const jwt = require('jsonwebtoken');
const express = require('express');
const app = express();
const SECRET_KEY = 'your-secret-key';
function createAccessToken(data, expiresIn = '15m') {
return jwt.sign(data, SECRET_KEY, { expiresIn });
}
function verifyToken(req, res, next) {
const authHeader = req.get('Authorization');
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.status(401).json({ error: 'Missing or invalid authorization header' });
}
const token = authHeader.substring(7);
try {
const payload = jwt.verify(token, SECRET_KEY);
req.user = payload;
next();
} catch (error) {
if (error.name === 'TokenExpiredError') {
return res.status(401).json({ error: 'Token has expired' });
}
return res.status(401).json({ error: 'Invalid token' });
}
}
app.post('/login', async (req, res) => {
const { email, password } = req.body;
const user = await authenticateUser(email, password);
if (!user) {
return res.status(401).json({ error: 'Incorrect email or password' });
}
const accessToken = createAccessToken({
sub: user.email,
user_id: user.id
});
res.json({ access_token: accessToken, token_type: 'bearer' });
});
app.get('/users/me', verifyToken, async (req, res) => {
const user = await getUserFromDB(req.user.user_id);
res.json(user);
});
FastAPI Validation:
from fastapi import FastAPI
from pydantic import BaseModel, EmailStr, Field, validator
from typing import Optional
class UserCreate(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_-]+$")
password: str = Field(..., min_length=8)
age: Optional[int] = Field(None, ge=0, le=150)
@validator('password')
def password_strength(cls, v):
"""Validate password strength"""
if not any(char.isdigit() for char in v):
raise ValueError('Password must contain at least one digit')
if not any(char.isupper() for char in v):
raise ValueError('Password must contain at least one uppercase letter')
return v
@app.post("/users")
async def create_user(user: UserCreate):
"""Automatically validates input"""
# Input is already validated by Pydantic
hashed_password = hash_password(user.password)
new_user = create_user_in_db(user.email, user.username, hashed_password)
return new_user
FastAPI automatically generates OpenAPI documentation:
from fastapi import FastAPI
from pydantic import BaseModel, Field
app = FastAPI(
title="My API",
description="Comprehensive API for managing resources",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
class Item(BaseModel):
"""Item model with rich documentation"""
name: str = Field(..., description="The name of the item", example="Widget")
price: float = Field(..., description="Price in USD", example=19.99, gt=0)
description: str = Field(None, description="Optional item description")
model_config = {
"json_schema_extra": {
"examples": [
{
"name": "Super Widget",
"price": 29.99,
"description": "An amazing widget"
}
]
}
}
@app.post(
"/items",
response_model=Item,
status_code=201,
summary="Create a new item",
description="Create a new item with name, price, and optional description",
response_description="The created item",
tags=["items"]
)
async def create_item(item: Item):
"""
Create a new item with all the information:
- **name**: The item name (required)
- **price**: The item price in USD (required, must be positive)
- **description**: Optional description of the item
"""
return item
Skill Version : 1.0.0 Last Updated : October 2025 Skill Category : API Design, Backend Development, REST Architecture Compatible With : FastAPI, Express.js, Node.js, Python, HTTP Frameworks
Weekly Installs
101
Repository
GitHub Stars
49
First Seen
Jan 22, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
gemini-cli84
codex84
opencode82
cursor77
github-copilot77
claude-code71
agent-browser 浏览器自动化工具 - Vercel Labs 命令行网页操作与测试
157,400 周安装