api-security-review by bobmatnyc/claude-mpm-skills
npx skills add https://github.com/bobmatnyc/claude-mpm-skills --skill api-security-review用于 API 端点开发的全面安全检查清单。确保在部署前已正确实施身份验证、授权、输入验证、输出安全性和安全日志记录。
每个 API 端点必须在处理请求之前验证用户的身份。
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
export async function GET(request: Request) {
// 1. 验证请求
const { userId } = await auth();
if (!userId) {
return NextResponse.json(
{ error: "Unauthorized" },
{ status: 401 }
);
}
// 继续处理已验证的请求...
}
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
import jwt from 'jsonwebtoken';
import { Request, Response, NextFunction } from 'express';
function authenticateToken(req: Request, res: Response, next: NextFunction) {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.sendStatus(401);
}
jwt.verify(token, process.env.JWT_SECRET!, (err, user) => {
if (err) return res.sendStatus(403);
req.user = user;
next();
});
}
app.get('/api/protected', authenticateToken, (req, res) => {
// 请求已通过身份验证
});
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = await verify_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@app.get("/api/protected")
async def protected_route(current_user: User = Depends(get_current_user)):
return {"user": current_user.email}
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def protected_view(request):
# request.user 已通过身份验证
return Response({'user': request.user.email})
身份验证证明用户是谁。授权证明用户有权访问资源。
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
import { db } from '@/lib/db';
import { eq } from 'drizzle-orm';
export async function GET(
request: Request,
{ params }: { params: { id: string } }
) {
// 1. 身份验证
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// 2. 获取资源
const resource = await db.query.resources.findFirst({
where: eq(resources.id, params.id)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
// 3. 授权 - 检查所有权
if (resource.ownerId !== userId) {
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
// 4. 返回授权数据
return NextResponse.json(resource);
}
enum Role {
USER = 'user',
ADMIN = 'admin',
MODERATOR = 'moderator'
}
function requireRole(allowedRoles: Role[]) {
return async (request: Request) => {
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
const user = await db.query.users.findFirst({
where: eq(users.clerkId, userId)
});
if (!user || !allowedRoles.includes(user.role)) {
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
return null; // 已授权
};
}
export async function DELETE(request: Request) {
const authError = await requireRole([Role.ADMIN, Role.MODERATOR])(request);
if (authError) return authError;
// 用户已授权为管理员或版主
}
// 关键:防止跨租户数据泄露
// ❌ 错误 - 没有租户检查
const orders = await db.query.orders.findMany({
where: eq(orders.userId, userId)
});
// ✅ 正确 - 租户隔离
const user = await db.query.users.findFirst({
where: eq(users.clerkId, userId)
});
const orders = await db.query.orders.findMany({
where: and(
eq(orders.userId, userId),
eq(orders.tenantId, user.tenantId) // 关键:租户边界
)
});
import { z } from 'zod';
import { NextResponse } from 'next/server';
const updateUserSchema = z.object({
id: z.string().uuid(),
email: z.string().email().optional(),
age: z.number().int().min(0).max(150).optional(),
role: z.enum(['user', 'admin', 'moderator']).optional(),
});
export async function PATCH(request: Request) {
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// 解析并验证输入
const body = await request.json();
const result = updateUserSchema.safeParse(body);
if (!result.success) {
return NextResponse.json({
error: "Validation failed",
details: result.error.issues
}, { status: 400 });
}
// 可以安全使用已验证的数据
const validatedData = result.data;
// ... 更新逻辑
}
from pydantic import BaseModel, EmailStr, Field, validator
from fastapi import HTTPException
class UpdateUser(BaseModel):
id: str = Field(..., regex=r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$')
email: EmailStr | None = None
age: int | None = Field(None, ge=0, le=150)
role: str | None = Field(None, regex=r'^(user|admin|moderator)$')
@validator('email')
def email_must_not_be_disposable(cls, v):
if v and any(domain in v for domain in ['tempmail.com', '10minutemail.com']):
raise ValueError('Disposable email addresses not allowed')
return v
@app.patch("/api/users")
async def update_user(user_data: UpdateUser, current_user: User = Depends(get_current_user)):
# user_data 已验证
return {"status": "updated"}
// ❌ 绝对不要:使用字符串插值的原始 SQL
const userId = request.params.id;
const query = `SELECT * FROM users WHERE id = '${userId}'`; // 有漏洞!
db.execute(query);
// ✅ 始终:使用 ORM 或参数化查询
import { eq } from 'drizzle-orm';
const user = await db.query.users.findFirst({
where: eq(users.id, userId)
});
// ✅ 或者:参数化原始查询
const [user] = await db.execute(
'SELECT * FROM users WHERE id = ?',
[userId]
);
const MAX_FILE_SIZE = 5 * 1024 * 1024; // 5MB
const ALLOWED_TYPES = ['image/jpeg', 'image/png', 'image/gif'];
export async function POST(request: Request) {
const formData = await request.formData();
const file = formData.get('file') as File;
if (!file) {
return NextResponse.json({ error: "No file provided" }, { status: 400 });
}
// 验证文件大小
if (file.size > MAX_FILE_SIZE) {
return NextResponse.json({
error: "File too large. Max 5MB"
}, { status: 400 });
}
// 验证文件类型
if (!ALLOWED_TYPES.includes(file.type)) {
return NextResponse.json({
error: "Invalid file type. Only JPEG, PNG, GIF allowed"
}, { status: 400 });
}
// 处理文件...
}
// ❌ 错误 - 暴露敏感字段
const user = await db.query.users.findFirst({
where: eq(users.id, userId)
});
return NextResponse.json(user); // 包含密码哈希、内部 ID 等
// ✅ 正确 - 显式选择安全字段
const user = await db.query.users.findFirst({
where: eq(users.id, userId),
columns: {
id: true,
email: true,
name: true,
createdAt: true,
// 排除:passwordHash, internalNotes, apiKey 等
}
});
return NextResponse.json(user);
// ✅ 更好 - 使用 DTO
interface PublicUserDTO {
id: string;
email: string;
name: string;
createdAt: Date;
}
function toPublicUser(user: User): PublicUserDTO {
return {
id: user.id,
email: user.email,
name: user.name,
createdAt: user.createdAt
};
}
return NextResponse.json(toPublicUser(user));
function sanitizeForLogging(data: any) {
const sanitized = { ...data };
// 掩码电子邮件
if (sanitized.email) {
const [local, domain] = sanitized.email.split('@');
sanitized.email = `${local.slice(0, 2)}***@${domain}`;
}
// 掩码 SSN
if (sanitized.ssn) {
sanitized.ssn = `***-**-${sanitized.ssn.slice(-4)}`;
}
// 移除敏感字段
delete sanitized.passwordHash;
delete sanitized.apiKey;
return sanitized;
}
console.log('User updated:', sanitizeForLogging(user));
// ❌ 错误 - 泄露系统信息
try {
await db.execute(query);
} catch (error) {
return NextResponse.json({
error: error.message, // 可能暴露 SQL、文件路径等
stack: error.stack // 生产环境中绝对不要暴露
}, { status: 500 });
}
// ✅ 正确 - 通用错误并记录日志
try {
await db.execute(query);
} catch (error) {
console.error('Database error:', error); // 内部记录完整错误
return NextResponse.json({
error: "An error occurred processing your request"
}, { status: 500 });
}
enum SecurityEvent {
AUTH_FAILURE = 'auth_failure',
UNAUTHORIZED_ACCESS = 'unauthorized_access',
PERMISSION_DENIED = 'permission_denied',
RATE_LIMIT_EXCEEDED = 'rate_limit_exceeded',
INVALID_INPUT = 'invalid_input'
}
function logSecurityEvent(event: SecurityEvent, details: any) {
console.log(JSON.stringify({
timestamp: new Date().toISOString(),
event,
userId: details.userId || 'anonymous',
ip: details.ip,
endpoint: details.endpoint,
details: sanitizeForLogging(details)
}));
}
// 用法
export async function GET(request: Request) {
const { userId } = await auth();
if (!userId) {
logSecurityEvent(SecurityEvent.AUTH_FAILURE, {
ip: request.headers.get('x-forwarded-for'),
endpoint: request.url
});
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
}
import { v4 as uuidv4 } from 'uuid';
export async function middleware(request: Request) {
const requestId = uuidv4();
console.log(JSON.stringify({
requestId,
method: request.method,
url: request.url,
timestamp: new Date().toISOString()
}));
// 通过请求头传递请求 ID
const response = await fetch(request.url, {
headers: {
...request.headers,
'X-Request-ID': requestId
}
});
return response;
}
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
import { db } from '@/lib/db';
import { eq, and } from 'drizzle-orm';
import { z } from 'zod';
import { ratelimit } from '@/lib/ratelimit';
// 1. 输入验证模式
const updateResourceSchema = z.object({
name: z.string().min(1).max(100),
description: z.string().max(500).optional(),
isPublic: z.boolean().optional()
});
// 2. 用于安全输出的 DTO
interface ResourceDTO {
id: string;
name: string;
description: string;
isPublic: boolean;
createdAt: Date;
}
function toResourceDTO(resource: any): ResourceDTO {
return {
id: resource.id,
name: resource.name,
description: resource.description,
isPublic: resource.isPublic,
createdAt: resource.createdAt
// 排除:ownerId, internalNotes 等
};
}
export async function PATCH(
request: Request,
{ params }: { params: { id: string } }
) {
try {
// 3. 速率限制
const { success } = await ratelimit.limit(request.headers.get('x-forwarded-for') || 'anonymous');
if (!success) {
return NextResponse.json({ error: "Too many requests" }, { status: 429 });
}
// 4. 身份验证
const { userId } = await auth();
if (!userId) {
console.log('Auth failure:', { endpoint: request.url });
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// 5. 输入验证
const body = await request.json();
const result = updateResourceSchema.safeParse(body);
if (!result.success) {
return NextResponse.json({
error: "Validation failed",
details: result.error.issues
}, { status: 400 });
}
// 6. 获取并验证资源是否存在
const resource = await db.query.resources.findFirst({
where: eq(resources.id, params.id)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
// 7. 授权检查
if (resource.ownerId !== userId) {
console.log('Permission denied:', { userId, resourceId: params.id });
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
// 8. 更新资源
const [updatedResource] = await db.update(resources)
.set({
...result.data,
updatedAt: new Date()
})
.where(eq(resources.id, params.id))
.returning();
// 9. 返回安全响应
return NextResponse.json(toResourceDTO(updatedResource));
} catch (error) {
// 10. 安全的错误处理
console.error('Error updating resource:', error);
return NextResponse.json({
error: "An error occurred"
}, { status: 500 });
}
}
import { Request, Response, NextFunction } from 'express';
import { z } from 'zod';
// 验证中间件工厂
function validateSchema(schema: z.ZodSchema) {
return (req: Request, res: Response, next: NextFunction) => {
const result = schema.safeParse(req.body);
if (!result.success) {
return res.status(400).json({
error: 'Validation failed',
details: result.error.issues
});
}
req.body = result.data;
next();
};
}
// 授权中间件
async function requireOwnership(req: Request, res: Response, next: NextFunction) {
const resource = await db.resources.findById(req.params.id);
if (!resource) {
return res.status(404).json({ error: 'Not found' });
}
if (resource.ownerId !== req.user.id) {
return res.status(403).json({ error: 'Forbidden' });
}
req.resource = resource;
next();
}
// 用法
const updateSchema = z.object({ name: z.string() });
app.patch('/api/resources/:id',
authenticate,
validateSchema(updateSchema),
requireOwnership,
async (req, res) => {
// 所有检查已通过
const updated = await updateResource(req.resource, req.body);
res.json(toDTO(updated));
}
);
from fastapi import Depends, HTTPException
from typing import Annotated
async def verify_ownership(
resource_id: str,
current_user: User = Depends(get_current_user)
):
resource = await db.resources.get(resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Not found")
if resource.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
return resource
@app.patch("/api/resources/{resource_id}")
async def update_resource(
data: UpdateResourceSchema,
resource: Resource = Depends(verify_ownership)
):
# 资源所有权已验证
updated = await resource.update(data.dict())
return ResourceDTO.from_orm(updated)
// ❌ 有漏洞
export async function GET(request: Request) {
const { userId } = await auth();
const resourceId = new URL(request.url).searchParams.get('id');
// 缺少所有权检查!
const resource = await db.query.resources.findFirst({
where: eq(resources.id, resourceId)
});
return NextResponse.json(resource);
}
// ✅ 已修复
export async function GET(request: Request) {
const { userId } = await auth();
const resourceId = new URL(request.url).searchParams.get('id');
const resource = await db.query.resources.findFirst({
where: and(
eq(resources.id, resourceId),
eq(resources.ownerId, userId) // 所有权检查
)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
return NextResponse.json(resource);
}
// ❌ 有漏洞 - 用户可以设置任何字段
export async function PATCH(request: Request) {
const body = await request.json();
// 用户可以发送:{ role: 'admin', isVerified: true }
await db.update(users)
.set(body) // 危险!
.where(eq(users.id, userId));
}
// ✅ 已修复 - 显式允许的字段
const allowedFields = z.object({
name: z.string().optional(),
bio: z.string().optional()
// role, isVerified 不允许
});
export async function PATCH(request: Request) {
const body = await request.json();
const validated = allowedFields.parse(body);
await db.update(users)
.set(validated)
.where(eq(users.id, userId));
}
// ❌ 有漏洞
return NextResponse.json(user); // 所有字段都暴露了
// ✅ 已修复
return NextResponse.json({
id: user.id,
name: user.name,
email: user.email
// passwordHash, resetToken 等已排除
});
import { Ratelimit } from '@upstash/ratelimit';
import { Redis } from '@upstash/redis';
const ratelimit = new Ratelimit({
redis: Redis.fromEnv(),
limiter: Ratelimit.slidingWindow(10, '10 s'),
});
export async function POST(request: Request) {
const identifier = request.headers.get('x-forwarded-for') || 'anonymous';
const { success } = await ratelimit.limit(identifier);
if (!success) {
return NextResponse.json({ error: "Too many requests" }, { status: 429 });
}
// 处理请求...
}
## [端点名称] 的安全审查
### 身份验证
- [ ] 在处理前已验证用户身份
- [ ] 无效令牌已用 401 拒绝
- [ ] 已检查令牌过期
### 授权
- [ ] 已验证资源所有权
- [ ] 已实现角色/权限检查
- [ ] 已强制执行跨租户数据隔离
### 输入验证
- [ ] 所有输入已使用模式(Zod/Pydantic)验证
- [ ] 文件上传大小/类型已限制
- [ ] 已防止 SQL 注入(使用 ORM)
- [ ] 已实施 XSS 预防
### 输出安全性
- [ ] 响应中已排除敏感字段
- [ ] 日志中已掩码 PII
- [ ] 错误消息未泄露系统信息
### 速率限制
- [ ] 已为每个端点配置速率限制
- [ ] 已实施 DDoS 保护
### 日志记录
- [ ] 已记录失败的身份验证尝试
- [ ] 已记录权限拒绝
- [ ] 请求 ID 用于可追踪性
在对 API 变更进行代码审查和部署之前使用此技能,以确保全面的安全覆盖。
每周安装数
116
代码仓库
GitHub 星标数
18
首次出现
2026 年 1 月 23 日
安全审计
安装于
opencode96
codex93
claude-code93
gemini-cli91
cursor88
github-copilot88
Comprehensive security checklist for API endpoint development. Ensures proper authentication, authorization, input validation, output safety, and security logging are implemented before deployment.
Every API endpoint must verify the user's identity before processing requests.
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
export async function GET(request: Request) {
// 1. Authenticate request
const { userId } = await auth();
if (!userId) {
return NextResponse.json(
{ error: "Unauthorized" },
{ status: 401 }
);
}
// Continue with authenticated request...
}
import jwt from 'jsonwebtoken';
import { Request, Response, NextFunction } from 'express';
function authenticateToken(req: Request, res: Response, next: NextFunction) {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.sendStatus(401);
}
jwt.verify(token, process.env.JWT_SECRET!, (err, user) => {
if (err) return res.sendStatus(403);
req.user = user;
next();
});
}
app.get('/api/protected', authenticateToken, (req, res) => {
// Request is authenticated
});
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = await verify_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@app.get("/api/protected")
async def protected_route(current_user: User = Depends(get_current_user)):
return {"user": current_user.email}
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def protected_view(request):
# request.user is authenticated
return Response({'user': request.user.email})
Authentication proves WHO the user is. Authorization proves the user has permission to access the resource.
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
import { db } from '@/lib/db';
import { eq } from 'drizzle-orm';
export async function GET(
request: Request,
{ params }: { params: { id: string } }
) {
// 1. Authenticate
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// 2. Fetch resource
const resource = await db.query.resources.findFirst({
where: eq(resources.id, params.id)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
// 3. Authorize - Check ownership
if (resource.ownerId !== userId) {
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
// 4. Return authorized data
return NextResponse.json(resource);
}
enum Role {
USER = 'user',
ADMIN = 'admin',
MODERATOR = 'moderator'
}
function requireRole(allowedRoles: Role[]) {
return async (request: Request) => {
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
const user = await db.query.users.findFirst({
where: eq(users.clerkId, userId)
});
if (!user || !allowedRoles.includes(user.role)) {
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
return null; // Authorized
};
}
export async function DELETE(request: Request) {
const authError = await requireRole([Role.ADMIN, Role.MODERATOR])(request);
if (authError) return authError;
// User is authorized as admin or moderator
}
// CRITICAL: Prevent cross-tenant data leaks
// ❌ WRONG - No tenant check
const orders = await db.query.orders.findMany({
where: eq(orders.userId, userId)
});
// ✅ CORRECT - Tenant isolation
const user = await db.query.users.findFirst({
where: eq(users.clerkId, userId)
});
const orders = await db.query.orders.findMany({
where: and(
eq(orders.userId, userId),
eq(orders.tenantId, user.tenantId) // CRITICAL: tenant boundary
)
});
import { z } from 'zod';
import { NextResponse } from 'next/server';
const updateUserSchema = z.object({
id: z.string().uuid(),
email: z.string().email().optional(),
age: z.number().int().min(0).max(150).optional(),
role: z.enum(['user', 'admin', 'moderator']).optional(),
});
export async function PATCH(request: Request) {
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// Parse and validate input
const body = await request.json();
const result = updateUserSchema.safeParse(body);
if (!result.success) {
return NextResponse.json({
error: "Validation failed",
details: result.error.issues
}, { status: 400 });
}
// Safe to use validated data
const validatedData = result.data;
// ... update logic
}
from pydantic import BaseModel, EmailStr, Field, validator
from fastapi import HTTPException
class UpdateUser(BaseModel):
id: str = Field(..., regex=r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$')
email: EmailStr | None = None
age: int | None = Field(None, ge=0, le=150)
role: str | None = Field(None, regex=r'^(user|admin|moderator)$')
@validator('email')
def email_must_not_be_disposable(cls, v):
if v and any(domain in v for domain in ['tempmail.com', '10minutemail.com']):
raise ValueError('Disposable email addresses not allowed')
return v
@app.patch("/api/users")
async def update_user(user_data: UpdateUser, current_user: User = Depends(get_current_user)):
# user_data is validated
return {"status": "updated"}
// ❌ NEVER: Raw SQL with string interpolation
const userId = request.params.id;
const query = `SELECT * FROM users WHERE id = '${userId}'`; // VULNERABLE!
db.execute(query);
// ✅ ALWAYS: Use ORM or parameterized queries
import { eq } from 'drizzle-orm';
const user = await db.query.users.findFirst({
where: eq(users.id, userId)
});
// ✅ OR: Parameterized raw query
const [user] = await db.execute(
'SELECT * FROM users WHERE id = ?',
[userId]
);
const MAX_FILE_SIZE = 5 * 1024 * 1024; // 5MB
const ALLOWED_TYPES = ['image/jpeg', 'image/png', 'image/gif'];
export async function POST(request: Request) {
const formData = await request.formData();
const file = formData.get('file') as File;
if (!file) {
return NextResponse.json({ error: "No file provided" }, { status: 400 });
}
// Validate file size
if (file.size > MAX_FILE_SIZE) {
return NextResponse.json({
error: "File too large. Max 5MB"
}, { status: 400 });
}
// Validate file type
if (!ALLOWED_TYPES.includes(file.type)) {
return NextResponse.json({
error: "Invalid file type. Only JPEG, PNG, GIF allowed"
}, { status: 400 });
}
// Process file...
}
// ❌ WRONG - Exposing sensitive fields
const user = await db.query.users.findFirst({
where: eq(users.id, userId)
});
return NextResponse.json(user); // Includes password hash, internal IDs, etc.
// ✅ CORRECT - Explicitly select safe fields
const user = await db.query.users.findFirst({
where: eq(users.id, userId),
columns: {
id: true,
email: true,
name: true,
createdAt: true,
// Exclude: passwordHash, internalNotes, apiKey, etc.
}
});
return NextResponse.json(user);
// ✅ BETTER - Use DTOs
interface PublicUserDTO {
id: string;
email: string;
name: string;
createdAt: Date;
}
function toPublicUser(user: User): PublicUserDTO {
return {
id: user.id,
email: user.email,
name: user.name,
createdAt: user.createdAt
};
}
return NextResponse.json(toPublicUser(user));
function sanitizeForLogging(data: any) {
const sanitized = { ...data };
// Mask email
if (sanitized.email) {
const [local, domain] = sanitized.email.split('@');
sanitized.email = `${local.slice(0, 2)}***@${domain}`;
}
// Mask SSN
if (sanitized.ssn) {
sanitized.ssn = `***-**-${sanitized.ssn.slice(-4)}`;
}
// Remove sensitive fields
delete sanitized.passwordHash;
delete sanitized.apiKey;
return sanitized;
}
console.log('User updated:', sanitizeForLogging(user));
// ❌ WRONG - Leaking system information
try {
await db.execute(query);
} catch (error) {
return NextResponse.json({
error: error.message, // Might expose SQL, file paths, etc.
stack: error.stack // NEVER expose in production
}, { status: 500 });
}
// ✅ CORRECT - Generic error with logging
try {
await db.execute(query);
} catch (error) {
console.error('Database error:', error); // Log full error internally
return NextResponse.json({
error: "An error occurred processing your request"
}, { status: 500 });
}
enum SecurityEvent {
AUTH_FAILURE = 'auth_failure',
UNAUTHORIZED_ACCESS = 'unauthorized_access',
PERMISSION_DENIED = 'permission_denied',
RATE_LIMIT_EXCEEDED = 'rate_limit_exceeded',
INVALID_INPUT = 'invalid_input'
}
function logSecurityEvent(event: SecurityEvent, details: any) {
console.log(JSON.stringify({
timestamp: new Date().toISOString(),
event,
userId: details.userId || 'anonymous',
ip: details.ip,
endpoint: details.endpoint,
details: sanitizeForLogging(details)
}));
}
// Usage
export async function GET(request: Request) {
const { userId } = await auth();
if (!userId) {
logSecurityEvent(SecurityEvent.AUTH_FAILURE, {
ip: request.headers.get('x-forwarded-for'),
endpoint: request.url
});
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
}
import { v4 as uuidv4 } from 'uuid';
export async function middleware(request: Request) {
const requestId = uuidv4();
console.log(JSON.stringify({
requestId,
method: request.method,
url: request.url,
timestamp: new Date().toISOString()
}));
// Pass request ID through headers
const response = await fetch(request.url, {
headers: {
...request.headers,
'X-Request-ID': requestId
}
});
return response;
}
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
import { db } from '@/lib/db';
import { eq, and } from 'drizzle-orm';
import { z } from 'zod';
import { ratelimit } from '@/lib/ratelimit';
// 1. Input validation schema
const updateResourceSchema = z.object({
name: z.string().min(1).max(100),
description: z.string().max(500).optional(),
isPublic: z.boolean().optional()
});
// 2. DTO for safe output
interface ResourceDTO {
id: string;
name: string;
description: string;
isPublic: boolean;
createdAt: Date;
}
function toResourceDTO(resource: any): ResourceDTO {
return {
id: resource.id,
name: resource.name,
description: resource.description,
isPublic: resource.isPublic,
createdAt: resource.createdAt
// Exclude: ownerId, internalNotes, etc.
};
}
export async function PATCH(
request: Request,
{ params }: { params: { id: string } }
) {
try {
// 3. Rate limiting
const { success } = await ratelimit.limit(request.headers.get('x-forwarded-for') || 'anonymous');
if (!success) {
return NextResponse.json({ error: "Too many requests" }, { status: 429 });
}
// 4. Authentication
const { userId } = await auth();
if (!userId) {
console.log('Auth failure:', { endpoint: request.url });
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// 5. Input validation
const body = await request.json();
const result = updateResourceSchema.safeParse(body);
if (!result.success) {
return NextResponse.json({
error: "Validation failed",
details: result.error.issues
}, { status: 400 });
}
// 6. Fetch and verify existence
const resource = await db.query.resources.findFirst({
where: eq(resources.id, params.id)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
// 7. Authorization check
if (resource.ownerId !== userId) {
console.log('Permission denied:', { userId, resourceId: params.id });
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
// 8. Update resource
const [updatedResource] = await db.update(resources)
.set({
...result.data,
updatedAt: new Date()
})
.where(eq(resources.id, params.id))
.returning();
// 9. Return safe response
return NextResponse.json(toResourceDTO(updatedResource));
} catch (error) {
// 10. Safe error handling
console.error('Error updating resource:', error);
return NextResponse.json({
error: "An error occurred"
}, { status: 500 });
}
}
import { Request, Response, NextFunction } from 'express';
import { z } from 'zod';
// Validation middleware factory
function validateSchema(schema: z.ZodSchema) {
return (req: Request, res: Response, next: NextFunction) => {
const result = schema.safeParse(req.body);
if (!result.success) {
return res.status(400).json({
error: 'Validation failed',
details: result.error.issues
});
}
req.body = result.data;
next();
};
}
// Authorization middleware
async function requireOwnership(req: Request, res: Response, next: NextFunction) {
const resource = await db.resources.findById(req.params.id);
if (!resource) {
return res.status(404).json({ error: 'Not found' });
}
if (resource.ownerId !== req.user.id) {
return res.status(403).json({ error: 'Forbidden' });
}
req.resource = resource;
next();
}
// Usage
const updateSchema = z.object({ name: z.string() });
app.patch('/api/resources/:id',
authenticate,
validateSchema(updateSchema),
requireOwnership,
async (req, res) => {
// All checks passed
const updated = await updateResource(req.resource, req.body);
res.json(toDTO(updated));
}
);
from fastapi import Depends, HTTPException
from typing import Annotated
async def verify_ownership(
resource_id: str,
current_user: User = Depends(get_current_user)
):
resource = await db.resources.get(resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Not found")
if resource.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
return resource
@app.patch("/api/resources/{resource_id}")
async def update_resource(
data: UpdateResourceSchema,
resource: Resource = Depends(verify_ownership)
):
# Resource ownership verified
updated = await resource.update(data.dict())
return ResourceDTO.from_orm(updated)
// ❌ VULNERABLE
export async function GET(request: Request) {
const { userId } = await auth();
const resourceId = new URL(request.url).searchParams.get('id');
// Missing ownership check!
const resource = await db.query.resources.findFirst({
where: eq(resources.id, resourceId)
});
return NextResponse.json(resource);
}
// ✅ FIXED
export async function GET(request: Request) {
const { userId } = await auth();
const resourceId = new URL(request.url).searchParams.get('id');
const resource = await db.query.resources.findFirst({
where: and(
eq(resources.id, resourceId),
eq(resources.ownerId, userId) // Ownership check
)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
return NextResponse.json(resource);
}
// ❌ VULNERABLE - User can set any field
export async function PATCH(request: Request) {
const body = await request.json();
// User could send: { role: 'admin', isVerified: true }
await db.update(users)
.set(body) // DANGEROUS!
.where(eq(users.id, userId));
}
// ✅ FIXED - Explicit allowed fields
const allowedFields = z.object({
name: z.string().optional(),
bio: z.string().optional()
// role, isVerified NOT allowed
});
export async function PATCH(request: Request) {
const body = await request.json();
const validated = allowedFields.parse(body);
await db.update(users)
.set(validated)
.where(eq(users.id, userId));
}
// ❌ VULNERABLE
return NextResponse.json(user); // All fields exposed
// ✅ FIXED
return NextResponse.json({
id: user.id,
name: user.name,
email: user.email
// passwordHash, resetToken, etc. excluded
});
import { Ratelimit } from '@upstash/ratelimit';
import { Redis } from '@upstash/redis';
const ratelimit = new Ratelimit({
redis: Redis.fromEnv(),
limiter: Ratelimit.slidingWindow(10, '10 s'),
});
export async function POST(request: Request) {
const identifier = request.headers.get('x-forwarded-for') || 'anonymous';
const { success } = await ratelimit.limit(identifier);
if (!success) {
return NextResponse.json({ error: "Too many requests" }, { status: 429 });
}
// Process request...
}
## Security Review for [Endpoint Name]
### Authentication
- [ ] User identity verified before processing
- [ ] Invalid tokens rejected with 401
- [ ] Token expiration checked
### Authorization
- [ ] Resource ownership verified
- [ ] Role/permission checks implemented
- [ ] Cross-tenant data isolation enforced
### Input Validation
- [ ] All inputs validated with schema (Zod/Pydantic)
- [ ] File uploads size/type limited
- [ ] SQL injection prevented (using ORM)
- [ ] XSS prevention in place
### Output Safety
- [ ] Sensitive fields excluded from responses
- [ ] PII masked in logs
- [ ] Error messages don't leak system info
### Rate Limiting
- [ ] Rate limits configured per endpoint
- [ ] DDoS protection in place
### Logging
- [ ] Failed auth attempts logged
- [ ] Permission denials logged
- [ ] Request IDs for traceability
Use this skill during code reviews and before deploying API changes to ensure comprehensive security coverage.
Weekly Installs
116
Repository
GitHub Stars
18
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode96
codex93
claude-code93
gemini-cli91
cursor88
github-copilot88
agent-browser 浏览器自动化工具 - Vercel Labs 命令行网页操作与测试
159,700 周安装
Firecrawl CLI 网站爬取工具 - 批量提取网页内容,支持深度限制和路径过滤
13,300 周安装
通用发布工作流技能 - 支持多语言更新日志,自动检测项目配置
12,900 周安装
Firecrawl Search:支持内容抓取的网络搜索工具,JSON格式返回结果
13,400 周安装
Python测试模式指南:pytest、fixtures、模拟与TDD实战教程
13,300 周安装
Firecrawl Scrape:智能网页抓取工具,一键提取LLM优化Markdown内容
13,700 周安装
Gemini Web API 客户端:文本与图像生成工具,支持多轮对话和逆向工程API
13,400 周安装