重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
fastapi-endpoint by davila7/claude-code-templates
npx skills add https://github.com/davila7/claude-code-templates --skill fastapi-endpoint当你需要以下操作时,请使用此技能:
进入计划模式。在编写任何代码之前,先探索现有项目以了解:
main.py、app.py 或 app/__init__.py)routers/ 目录)models/、schemas/、crud/ 或 目录广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
services/pyproject.toml 或 requirements.txt 以了解已安装的依赖项Depends(get_db)、中间件、其他){"data": ..., "meta": ...})tests/、test_*.py、*_test.py)使用 AskUserQuestion 来澄清需求。分轮次提问——不要一次性提出所有问题。
问题:"此端点管理什么资源?"
标题:"资源"
选项:
- "新资源(我将描述字段)" — 从头开始创建新的数据模型
- "现有模型(扩展它)" — 为代码库中已存在的模型添加端点
- "关系端点(嵌套)" — 例如,/users/{id}/orders — 在相关资源上的端点
问题:"你需要哪些 HTTP 方法?"
标题:"方法"
多选:true
选项:
- "完整 CRUD(GET 列表、GET 详情、POST、PUT/PATCH、DELETE)" — 所有标准操作
- "只读(GET 列表 + GET 详情)" — 无变更操作
- "自定义操作(POST /resource/{id}/action)" — 业务逻辑端点,非标准 CRUD
问题:"该资源有哪些字段?(简要描述)"
标题:"字段"
选项:
- "简单(< 6 个字段,基本类型)" — 字符串、整数、布尔值、日期
- "中等(6-15 个字段,一些关系)" — 包含外键或枚举
- "复杂(嵌套对象、多态)" — JSON 字段、判别联合、计算字段
问题:"此端点应如何进行身份验证?"
标题:"身份验证"
选项:
- "JWT Bearer 令牌(推荐)" — 带有 JWT 解码的 OAuth2PasswordBearer
- "API 密钥头" — X-API-Key 头验证
- "无需身份验证(公开)" — 开放端点,无需身份验证
- "使用现有身份验证" — 重用项目中已有的身份验证依赖项
问题:"你需要基于角色的访问控制吗?"
标题:"RBAC"
选项:
- "否 — 任何经过身份验证的用户" — 单一权限级别
- "是 — 角色检查(管理员、用户等)" — 每个端点需要特定角色
- "是 — 所有权检查" — 用户只能访问自己的资源
问题:"列表端点使用什么分页样式?"
标题:"分页"
选项:
- "基于游标(推荐)" — 最适合实时数据,无偏移漂移
- "偏移/限制" — 简单,适用于带页码的管理面板
- "无分页" — 小型数据集,返回所有结果
问题:"你需要响应缓存吗?"
标题:"缓存"
选项:
- "无缓存" — 每次请求都获取新数据
- "Cache-Control 头" — 通过 HTTP 头进行客户端缓存
- "Redis/内存缓存" — 具有 TTL 的服务器端缓存
编写一个具体的实施计划,涵盖:
Create、Update、Response 和 List 模式通过 ExitPlanMode 呈现给用户批准。
批准后,按以下顺序实施:
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from uuid import UUID
class ResourceBase(BaseModel):
"""创建和响应之间共享的字段。"""
name: str
# ... 来自访谈的字段
class ResourceCreate(ResourceBase):
"""创建资源所需的字段。"""
pass
class ResourceUpdate(BaseModel):
"""用于部分更新的所有可选字段。"""
name: str | None = None
class ResourceResponse(ResourceBase):
"""包含数据库生成字段的完整资源。"""
model_config = ConfigDict(from_attributes=True)
id: UUID
created_at: datetime
updated_at: datetime
class ResourceListResponse(BaseModel):
"""分页列表响应。"""
data: list[ResourceResponse]
next_cursor: str | None = None
has_more: bool
from sqlalchemy import Column, String, DateTime, func
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid
from app.database import Base
class Resource(Base):
__tablename__ = "resources"
id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, nullable=False, index=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from uuid import UUID
async def get_resource(db: AsyncSession, resource_id: UUID) -> Resource | None:
result = await db.execute(select(Resource).where(Resource.id == resource_id))
return result.scalar_one_or_none()
async def list_resources(
db: AsyncSession,
cursor: str | None = None,
limit: int = 20,
) -> tuple[list[Resource], str | None]:
query = select(Resource).order_by(Resource.created_at.desc()).limit(limit + 1)
if cursor:
query = query.where(Resource.created_at < decode_cursor(cursor))
result = await db.execute(query)
items = list(result.scalars().all())
next_cursor = encode_cursor(items[-1].created_at) if len(items) > limit else None
return items[:limit], next_cursor
async def create_resource(db: AsyncSession, data: ResourceCreate) -> Resource:
resource = Resource(**data.model_dump())
db.add(resource)
await db.commit()
await db.refresh(resource)
return resource
async def update_resource(
db: AsyncSession, resource_id: UUID, data: ResourceUpdate
) -> Resource | None:
resource = await get_resource(db, resource_id)
if not resource:
return None
for field, value in data.model_dump(exclude_unset=True).items():
setattr(resource, field, value)
await db.commit()
await db.refresh(resource)
return resource
async def delete_resource(db: AsyncSession, resource_id: UUID) -> bool:
resource = await get_resource(db, resource_id)
if not resource:
return False
await db.delete(resource)
await db.commit()
return True
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID
router = APIRouter(prefix="/resources", tags=["resources"])
@router.get("", response_model=ResourceListResponse)
async def list_resources_endpoint(
cursor: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user), # 如果需要身份验证
):
items, next_cursor = await list_resources(db, cursor=cursor, limit=limit)
return ResourceListResponse(
data=items,
next_cursor=next_cursor,
has_more=next_cursor is not None,
)
@router.get("/{resource_id}", response_model=ResourceResponse)
async def get_resource_endpoint(
resource_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
resource = await get_resource(db, resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
return resource
@router.post("", response_model=ResourceResponse, status_code=status.HTTP_201_CREATED)
async def create_resource_endpoint(
data: ResourceCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return await create_resource(db, data)
@router.patch("/{resource_id}", response_model=ResourceResponse)
async def update_resource_endpoint(
resource_id: UUID,
data: ResourceUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
resource = await update_resource(db, resource_id, data)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
return resource
@router.delete("/{resource_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_resource_endpoint(
resource_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
deleted = await delete_resource(db, resource_id)
if not deleted:
raise HTTPException(status_code=404, detail="Resource not found")
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
@pytest.mark.asyncio
async def test_create_resource(client: AsyncClient, auth_headers: dict):
response = await client.post(
"/resources",
json={"name": "Test Resource"},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Resource"
assert "id" in data
@pytest.mark.asyncio
async def test_get_resource_not_found(client: AsyncClient, auth_headers: dict):
response = await client.get(
"/resources/00000000-0000-0000-0000-000000000000",
headers=auth_headers,
)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_list_resources_pagination(client: AsyncClient, auth_headers: dict):
# 首先创建多个资源
for i in range(5):
await client.post(
"/resources",
json={"name": f"Resource {i}"},
headers=auth_headers,
)
response = await client.get("/resources?limit=2", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data["data"]) == 2
assert data["has_more"] is True
assert data["next_cursor"] is not None
@pytest.mark.asyncio
async def test_create_resource_unauthorized(client: AsyncClient):
response = await client.post("/resources", json={"name": "Test"})
assert response.status_code in (401, 403)
@pytest.mark.asyncio
async def test_update_resource_partial(client: AsyncClient, auth_headers: dict):
# 创建
create_resp = await client.post(
"/resources",
json={"name": "Original"},
headers=auth_headers,
)
resource_id = create_resp.json()["id"]
# 部分更新
response = await client.patch(
f"/resources/{resource_id}",
json={"name": "Updated"},
headers=auth_headers,
)
assert response.status_code == 200
assert response.json()["name"] == "Updated"
@pytest.mark.asyncio
async def test_delete_resource(client: AsyncClient, auth_headers: dict):
create_resp = await client.post(
"/resources",
json={"name": "To Delete"},
headers=auth_headers,
)
resource_id = create_resp.json()["id"]
response = await client.delete(
f"/resources/{resource_id}", headers=auth_headers
)
assert response.status_code == 204
# 验证已删除
get_resp = await client.get(
f"/resources/{resource_id}", headers=auth_headers
)
assert get_resp.status_code == 404
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
payload = decode_jwt(token)
user = await db.get(User, payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
def require_role(*roles: str):
"""用于基于角色的访问控制的工厂函数。"""
async def checker(current_user: User = Depends(get_current_user)):
if current_user.role not in roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return current_user
return checker
import base64
from datetime import datetime
def encode_cursor(dt: datetime) -> str:
return base64.urlsafe_b64encode(dt.isoformat().encode()).decode()
def decode_cursor(cursor: str) -> datetime:
return datetime.fromisoformat(base64.urlsafe_b64decode(cursor).decode())
始终使用 FastAPI 的 HTTPException 并附带一致的详细信息。对于验证错误,Pydantic v2 通过 RequestValidationError (422) 自动处理它们。
# 404 — 未找到
raise HTTPException(status_code=404, detail="Resource not found")
# 409 — 冲突(重复)
raise HTTPException(status_code=409, detail="Resource with this name already exists")
# 403 — 禁止
raise HTTPException(status_code=403, detail="Not allowed to modify this resource")
model_config = ConfigDict(from_attributes=True) 以启用 ORM 模式每周安装次数
62
仓库
GitHub 星标数
24.0K
首次出现
2026年2月15日
安全审计
安装于
opencode60
github-copilot60
codex60
gemini-cli59
amp59
kimi-cli59
Use this skill when you need to:
Enter plan mode. Before writing any code, explore the existing project to understand:
main.py, app.py, or app/__init__.py)routers/ directory)models/, schemas/, crud/, or services/ directoriespyproject.toml or requirements.txt for installed dependenciesDepends(get_db), middleware, other){"data": ..., "meta": ...})tests/, test_*.py, *_test.py)Use AskUserQuestion to clarify requirements. Ask in rounds — do NOT dump all questions at once.
Question: "What resource does this endpoint manage?"
Header: "Resource"
Options:
- "New resource (I'll describe the fields)" — Creating a new data model from scratch
- "Existing model (extend it)" — Adding endpoints for a model that already exists in the codebase
- "Relationship endpoint (nested)" — e.g., /users/{id}/orders — endpoint on a related resource
Question: "Which HTTP methods do you need?"
Header: "Methods"
multiSelect: true
Options:
- "Full CRUD (GET list, GET detail, POST, PUT/PATCH, DELETE)" — All standard operations
- "Read-only (GET list + GET detail)" — No mutations
- "Custom action (POST /resource/{id}/action)" — Business logic endpoint, not standard CRUD
Question: "What fields does the resource have? (describe briefly)"
Header: "Fields"
Options:
- "Simple (< 6 fields, basic types)" — Strings, ints, booleans, dates
- "Medium (6-15 fields, some relations)" — Includes foreign keys or enums
- "Complex (nested objects, polymorphic)" — JSON fields, discriminated unions, computed fields
Question: "How should this endpoint be authenticated?"
Header: "Auth"
Options:
- "JWT Bearer token (Recommended)" — OAuth2PasswordBearer with JWT decode
- "API Key header" — X-API-Key header validation
- "No auth (public)" — Open endpoint, no authentication required
- "Use existing auth" — Reuse the auth dependency already in the project
Question: "Do you need role-based access control?"
Header: "RBAC"
Options:
- "No — any authenticated user" — Single permission level
- "Yes — role check (admin, user, etc.)" — Require specific roles per endpoint
- "Yes — ownership check" — Users can only access their own resources
Question: "What pagination style for list endpoints?"
Header: "Pagination"
Options:
- "Cursor-based (Recommended)" — Best for real-time data, no offset drift
- "Offset/limit" — Simple, good for admin panels with page numbers
- "No pagination" — Small datasets, return all results
Question: "Do you need response caching?"
Header: "Caching"
Options:
- "No caching" — Fresh data on every request
- "Cache-Control headers" — Client-side caching via HTTP headers
- "Redis/in-memory cache" — Server-side caching with TTL
Write a concrete implementation plan covering:
Create, Update, Response, and List schemas with field typesPresent via ExitPlanMode for user approval.
After approval, implement following this order:
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from uuid import UUID
class ResourceBase(BaseModel):
"""Shared fields between create and response."""
name: str
# ... fields from interview
class ResourceCreate(ResourceBase):
"""Fields required to create the resource."""
pass
class ResourceUpdate(BaseModel):
"""All fields optional for partial updates."""
name: str | None = None
class ResourceResponse(ResourceBase):
"""Full resource with DB-generated fields."""
model_config = ConfigDict(from_attributes=True)
id: UUID
created_at: datetime
updated_at: datetime
class ResourceListResponse(BaseModel):
"""Paginated list response."""
data: list[ResourceResponse]
next_cursor: str | None = None
has_more: bool
from sqlalchemy import Column, String, DateTime, func
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid
from app.database import Base
class Resource(Base):
__tablename__ = "resources"
id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, nullable=False, index=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from uuid import UUID
async def get_resource(db: AsyncSession, resource_id: UUID) -> Resource | None:
result = await db.execute(select(Resource).where(Resource.id == resource_id))
return result.scalar_one_or_none()
async def list_resources(
db: AsyncSession,
cursor: str | None = None,
limit: int = 20,
) -> tuple[list[Resource], str | None]:
query = select(Resource).order_by(Resource.created_at.desc()).limit(limit + 1)
if cursor:
query = query.where(Resource.created_at < decode_cursor(cursor))
result = await db.execute(query)
items = list(result.scalars().all())
next_cursor = encode_cursor(items[-1].created_at) if len(items) > limit else None
return items[:limit], next_cursor
async def create_resource(db: AsyncSession, data: ResourceCreate) -> Resource:
resource = Resource(**data.model_dump())
db.add(resource)
await db.commit()
await db.refresh(resource)
return resource
async def update_resource(
db: AsyncSession, resource_id: UUID, data: ResourceUpdate
) -> Resource | None:
resource = await get_resource(db, resource_id)
if not resource:
return None
for field, value in data.model_dump(exclude_unset=True).items():
setattr(resource, field, value)
await db.commit()
await db.refresh(resource)
return resource
async def delete_resource(db: AsyncSession, resource_id: UUID) -> bool:
resource = await get_resource(db, resource_id)
if not resource:
return False
await db.delete(resource)
await db.commit()
return True
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID
router = APIRouter(prefix="/resources", tags=["resources"])
@router.get("", response_model=ResourceListResponse)
async def list_resources_endpoint(
cursor: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user), # if auth required
):
items, next_cursor = await list_resources(db, cursor=cursor, limit=limit)
return ResourceListResponse(
data=items,
next_cursor=next_cursor,
has_more=next_cursor is not None,
)
@router.get("/{resource_id}", response_model=ResourceResponse)
async def get_resource_endpoint(
resource_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
resource = await get_resource(db, resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
return resource
@router.post("", response_model=ResourceResponse, status_code=status.HTTP_201_CREATED)
async def create_resource_endpoint(
data: ResourceCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return await create_resource(db, data)
@router.patch("/{resource_id}", response_model=ResourceResponse)
async def update_resource_endpoint(
resource_id: UUID,
data: ResourceUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
resource = await update_resource(db, resource_id, data)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
return resource
@router.delete("/{resource_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_resource_endpoint(
resource_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
deleted = await delete_resource(db, resource_id)
if not deleted:
raise HTTPException(status_code=404, detail="Resource not found")
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
@pytest.mark.asyncio
async def test_create_resource(client: AsyncClient, auth_headers: dict):
response = await client.post(
"/resources",
json={"name": "Test Resource"},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Resource"
assert "id" in data
@pytest.mark.asyncio
async def test_get_resource_not_found(client: AsyncClient, auth_headers: dict):
response = await client.get(
"/resources/00000000-0000-0000-0000-000000000000",
headers=auth_headers,
)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_list_resources_pagination(client: AsyncClient, auth_headers: dict):
# Create multiple resources first
for i in range(5):
await client.post(
"/resources",
json={"name": f"Resource {i}"},
headers=auth_headers,
)
response = await client.get("/resources?limit=2", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data["data"]) == 2
assert data["has_more"] is True
assert data["next_cursor"] is not None
@pytest.mark.asyncio
async def test_create_resource_unauthorized(client: AsyncClient):
response = await client.post("/resources", json={"name": "Test"})
assert response.status_code in (401, 403)
@pytest.mark.asyncio
async def test_update_resource_partial(client: AsyncClient, auth_headers: dict):
# Create
create_resp = await client.post(
"/resources",
json={"name": "Original"},
headers=auth_headers,
)
resource_id = create_resp.json()["id"]
# Partial update
response = await client.patch(
f"/resources/{resource_id}",
json={"name": "Updated"},
headers=auth_headers,
)
assert response.status_code == 200
assert response.json()["name"] == "Updated"
@pytest.mark.asyncio
async def test_delete_resource(client: AsyncClient, auth_headers: dict):
create_resp = await client.post(
"/resources",
json={"name": "To Delete"},
headers=auth_headers,
)
resource_id = create_resp.json()["id"]
response = await client.delete(
f"/resources/{resource_id}", headers=auth_headers
)
assert response.status_code == 204
# Verify deleted
get_resp = await client.get(
f"/resources/{resource_id}", headers=auth_headers
)
assert get_resp.status_code == 404
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
payload = decode_jwt(token)
user = await db.get(User, payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
def require_role(*roles: str):
"""Factory for role-based access control."""
async def checker(current_user: User = Depends(get_current_user)):
if current_user.role not in roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return current_user
return checker
import base64
from datetime import datetime
def encode_cursor(dt: datetime) -> str:
return base64.urlsafe_b64encode(dt.isoformat().encode()).decode()
def decode_cursor(cursor: str) -> datetime:
return datetime.fromisoformat(base64.urlsafe_b64decode(cursor).decode())
Always use FastAPI's HTTPException with consistent detail messages. For validation errors, Pydantic v2 handles them automatically via RequestValidationError (422).
# 404 — not found
raise HTTPException(status_code=404, detail="Resource not found")
# 409 — conflict (duplicate)
raise HTTPException(status_code=409, detail="Resource with this name already exists")
# 403 — forbidden
raise HTTPException(status_code=403, detail="Not allowed to modify this resource")
model_config = ConfigDict(from_attributes=True) for ORM modeWeekly Installs
62
Repository
GitHub Stars
24.0K
First Seen
Feb 15, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode60
github-copilot60
codex60
gemini-cli59
amp59
kimi-cli59
lark-cli 共享规则:飞书资源操作指南与权限配置详解
41,800 周安装