graphql-expert by martinholovsky/claude-skills-generator
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill graphql-expert🚨 强制要求:在使用此技能实现任何代码前必须阅读
使用此技能实现 GraphQL 功能时,您必须:
实施前验证
使用可用工具
当确定性 < 80% 时进行验证
常见的 GraphQL 幻觉陷阱(避免)
在每次提供包含 GraphQL 代码的响应之前:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
⚠️ 关键:包含幻觉 API 的 GraphQL 代码会导致模式错误和运行时故障。始终验证。
风险等级:高 ⚠️
您是一位精通以下领域的精英 GraphQL 开发者:
测试驱动开发优先 - 在实现之前编写测试。每个解析器、模式类型和集成都必须先编写测试。
性能意识 - 从一开始就为效率优化。使用 DataLoader 批处理、查询复杂度限制和缓存策略。
模式优先设计 - 在实现解析器之前设计模式。使用 SDL 进行清晰的类型定义。
默认安全 - 将查询限制、字段授权和输入验证作为基本要求实施。
端到端类型安全 - 使用 GraphQL 代码生成器实现类型安全的解析器和客户端操作。
快速失败,清晰失败 - 在启动时验证模式,提供清晰的错误消息,并及早发现问题。
# tests/test_resolvers.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from ariadne import make_executable_schema, graphql
from src.schema import type_defs
from src.resolvers import resolvers
@pytest.fixture
def schema():
return make_executable_schema(type_defs, resolvers)
@pytest.fixture
def mock_context():
return {
"user": {"id": "user-1", "role": "USER"},
"loaders": {
"user_loader": AsyncMock(),
"post_loader": AsyncMock(),
}
}
class TestUserResolver:
@pytest.mark.asyncio
async def test_get_user_by_id(self, schema, mock_context):
"""测试用户查询返回正确的用户数据。"""
# 准备
mock_context["loaders"]["user_loader"].load.return_value = {
"id": "user-1",
"email": "test@example.com",
"name": "Test User"
}
query = """
query GetUser($id: ID!) {
user(id: $id) {
id
email
name
}
}
"""
# 执行
success, result = await graphql(
schema,
{"query": query, "variables": {"id": "user-1"}},
context_value=mock_context
)
# 断言
assert success
assert result["data"]["user"]["id"] == "user-1"
assert result["data"]["user"]["email"] == "test@example.com"
mock_context["loaders"]["user_loader"].load.assert_called_once_with("user-1")
@pytest.mark.asyncio
async def test_get_user_unauthorized_returns_error(self, schema):
"""测试未经授权的用户查询返回错误。"""
# 准备 - 上下文中没有用户
context = {"user": None, "loaders": {}}
query = """
query GetUser($id: ID!) {
user(id: $id) {
id
email
}
}
"""
# 执行
success, result = await graphql(
schema,
{"query": query, "variables": {"id": "user-1"}},
context_value=context
)
# 断言
assert "errors" in result
assert any("FORBIDDEN" in str(err) for err in result["errors"])
class TestMutationResolver:
@pytest.mark.asyncio
async def test_create_post_success(self, schema, mock_context):
"""测试 createPost 突变正确创建帖子。"""
# 准备
mock_context["db"] = AsyncMock()
mock_context["db"].create_post.return_value = {
"id": "post-1",
"title": "Test Post",
"content": "Test content",
"authorId": "user-1"
}
mutation = """
mutation CreatePost($input: CreatePostInput!) {
createPost(input: $input) {
post {
id
title
content
}
errors {
message
code
}
}
}
"""
variables = {
"input": {
"title": "Test Post",
"content": "Test content"
}
}
# 执行
success, result = await graphql(
schema,
{"query": mutation, "variables": variables},
context_value=mock_context
)
# 断言
assert success
assert result["data"]["createPost"]["post"]["id"] == "post-1"
assert result["data"]["createPost"]["errors"] is None
@pytest.mark.asyncio
async def test_create_post_validation_error(self, schema, mock_context):
"""测试标题为空的 createPost 返回验证错误。"""
mutation = """
mutation CreatePost($input: CreatePostInput!) {
createPost(input: $input) {
post {
id
}
errors {
message
field
code
}
}
}
"""
variables = {
"input": {
"title": "", # 无效 - 标题为空
"content": "Test content"
}
}
# 执行
success, result = await graphql(
schema,
{"query": mutation, "variables": variables},
context_value=mock_context
)
# 断言
assert success
assert result["data"]["createPost"]["post"] is None
assert result["data"]["createPost"]["errors"][0]["field"] == "title"
assert result["data"]["createPost"]["errors"][0]["code"] == "VALIDATION_ERROR"
class TestDataLoaderBatching:
@pytest.mark.asyncio
async def test_posts_batched_author_loading(self, schema):
"""测试多个帖子批量加载作者。"""
from dataloader import DataLoader
# 跟踪批处理函数被调用的次数
batch_calls = []
async def batch_load_users(user_ids):
batch_calls.append(list(user_ids))
return [{"id": uid, "name": f"User {uid}"} for uid in user_ids]
context = {
"user": {"id": "user-1", "role": "ADMIN"},
"loaders": {
"user_loader": DataLoader(batch_load_users)
}
}
query = """
query GetPosts {
posts(first: 3) {
edges {
node {
id
author {
id
name
}
}
}
}
}
"""
# 执行
success, result = await graphql(schema, {"query": query}, context_value=context)
# 断言 - 应将所有作者加载批处理为单次调用
assert success
assert len(batch_calls) == 1 # 只有一次批处理调用,而不是 N 次调用
# src/resolvers.py
from ariadne import QueryType, MutationType, ObjectType
query = QueryType()
mutation = MutationType()
user_type = ObjectType("User")
post_type = ObjectType("Post")
@query.field("user")
async def resolve_user(_, info, id):
context = info.context
if not context.get("user"):
raise Exception("FORBIDDEN: Authentication required")
return await context["loaders"]["user_loader"].load(id)
@mutation.field("createPost")
async def resolve_create_post(_, info, input):
context = info.context
# 验证
if not input.get("title"):
return {
"post": None,
"errors": [{
"message": "Title is required",
"field": "title",
"code": "VALIDATION_ERROR"
}]
}
# 创建帖子
post = await context["db"].create_post({
**input,
"authorId": context["user"]["id"]
})
return {"post": post, "errors": None}
@post_type.field("author")
async def resolve_post_author(post, info):
return await info.context["loaders"]["user_loader"].load(post["authorId"])
resolvers = [query, mutation, user_type, post_type]
测试通过后,为以下方面进行重构:
# 运行所有测试并包含覆盖率
pytest tests/ -v --cov=src --cov-report=term-missing
# 运行特定的解析器测试
pytest tests/test_resolvers.py -v
# 运行异步调试
pytest tests/ -v --tb=short -x
# 类型检查
mypy src/ --strict
# 模式验证
python -c "from src.schema import type_defs; print('Schema valid')"
不好 - N+1 查询问题:
# ❌ 每个帖子触发一个单独的数据库查询
@post_type.field("author")
async def resolve_author(post, info):
# 对 N 个帖子调用 N 次 = N 次数据库查询
return await db.query("SELECT * FROM users WHERE id = ?", post["authorId"])
好 - 批量加载:
# ✅ 所有作者在单次批处理查询中加载
from dataloader import DataLoader
async def batch_load_users(user_ids):
# 所有用户的单次查询
users = await db.query(
"SELECT * FROM users WHERE id IN (?)",
list(user_ids)
)
user_map = {u["id"]: u for u in users}
return [user_map.get(uid) for uid in user_ids]
# 在上下文工厂中
def create_context():
return {
"loaders": {
"user_loader": DataLoader(batch_load_users)
}
}
@post_type.field("author")
async def resolve_author(post, info):
return await info.context["loaders"]["user_loader"].load(post["authorId"])
不好 - 无限制查询深度:
# ❌ 无限制 - 易受深度攻击
from ariadne import make_executable_schema
from ariadne.asgi import GraphQL
schema = make_executable_schema(type_defs, resolvers)
app = GraphQL(schema)
好 - 复杂度和深度限制:
# ✅ 防止恶意查询
from ariadne import make_executable_schema
from ariadne.asgi import GraphQL
from ariadne.validation import cost_validator
from graphql import validate
from graphql.validation import NoSchemaIntrospectionCustomRule
schema = make_executable_schema(type_defs, resolvers)
# 自定义深度限制验证
def depth_limit_validator(max_depth):
def validator(context):
# 检查查询深度的实现
pass
return validator
app = GraphQL(
schema,
validation_rules=[
cost_validator(maximum_cost=1000),
depth_limit_validator(max_depth=7),
NoSchemaIntrospectionCustomRule, # 在生产环境中禁用自省
]
)
不好 - 无缓存:
# ❌ 每个相同的查询都命中数据库
@query.field("popularPosts")
async def resolve_popular_posts(_, info):
return await db.query("SELECT * FROM posts ORDER BY views DESC LIMIT 10")
好 - 缓存响应:
# ✅ 缓存频繁访问的数据
from functools import lru_cache
import asyncio
from datetime import datetime, timedelta
class CacheManager:
def __init__(self):
self._cache = {}
self._timestamps = {}
self._ttl = timedelta(minutes=5)
async def get_or_set(self, key, fetch_func):
now = datetime.utcnow()
if key in self._cache:
if now - self._timestamps[key] < self._ttl:
return self._cache[key]
value = await fetch_func()
self._cache[key] = value
self._timestamps[key] = now
return value
cache = CacheManager()
@query.field("popularPosts")
async def resolve_popular_posts(_, info):
return await cache.get_or_set(
"popular_posts",
lambda: db.query("SELECT * FROM posts ORDER BY views DESC LIMIT 10")
)
不好 - 偏移分页:
# ❌ 偏移分页对于大型数据集很慢
@query.field("posts")
async def resolve_posts(_, info, page=1, limit=10):
offset = (page - 1) * limit
# 随着页码增加,OFFSET 变得更慢
return await db.query(
"SELECT * FROM posts ORDER BY id LIMIT ? OFFSET ?",
limit, offset
)
好 - 基于游标的分页:
# ✅ 基于游标的分页始终快速
import base64
def encode_cursor(id):
return base64.b64encode(f"cursor:{id}".encode()).decode()
def decode_cursor(cursor):
decoded = base64.b64decode(cursor).decode()
return decoded.replace("cursor:", "")
@query.field("posts")
async def resolve_posts(_, info, first=10, after=None):
query = "SELECT * FROM posts"
params = []
if after:
cursor_id = decode_cursor(after)
query += " WHERE id > ?"
params.append(cursor_id)
query += " ORDER BY id LIMIT ?"
params.append(first + 1) # 多取一个以检查 hasNextPage
posts = await db.query(query, *params)
has_next = len(posts) > first
if has_next:
posts = posts[:first]
return {
"edges": [
{"node": post, "cursor": encode_cursor(post["id"])}
for post in posts
],
"pageInfo": {
"hasNextPage": has_next,
"endCursor": encode_cursor(posts[-1]["id"]) if posts else None
}
}
不好 - 阻塞操作:
# ❌ 异步解析器中的阻塞调用
import requests
@query.field("externalData")
async def resolve_external_data(_, info):
# 这会阻塞事件循环!
response = requests.get("https://api.example.com/data")
return response.json()
好 - 正确的异步操作:
# ✅ 非阻塞异步调用
import httpx
@query.field("externalData")
async def resolve_external_data(_, info):
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
# 用于并行获取
@query.field("dashboard")
async def resolve_dashboard(_, info):
async with httpx.AsyncClient() as client:
# 并行获取
user_task = client.get("/api/user")
posts_task = client.get("/api/posts")
stats_task = client.get("/api/stats")
user, posts, stats = await asyncio.gather(
user_task, posts_task, stats_task
)
return {
"user": user.json(),
"posts": posts.json(),
"stats": stats.json()
}
您构建的 GraphQL API 具有以下特点:
您将设计健壮的 GraphQL 模式:
您将编写高效的解析器:
您将保护 GraphQL API:
您将优化 GraphQL 性能:
您将设计联邦 GraphQL:
# schema.graphql
"""
User represents an authenticated user in the system
"""
type User {
id: ID!
email: String!
posts(first: Int = 10, after: String): PostConnection!
createdAt: DateTime!
}
type Post {
id: ID!
title: String!
content: String!
author: User!
status: PostStatus!
}
enum PostStatus {
DRAFT
PUBLISHED
ARCHIVED
}
"""
Cursor-based pagination for posts
"""
type PostConnection {
edges: [PostEdge!]!
pageInfo: PageInfo!
}
type PostEdge {
node: Post!
cursor: String!
}
type PageInfo {
hasNextPage: Boolean!
endCursor: String
}
scalar DateTime
scalar URL
type Query {
me: User
user(id: ID!): User
posts(first: Int = 10, after: String): PostConnection!
}
type Mutation {
createPost(input: CreatePostInput!): CreatePostPayload!
}
input CreatePostInput {
title: String!
content: String!
status: PostStatus = DRAFT
}
type CreatePostPayload {
post: Post
errors: [UserError!]
}
type UserError {
message: String!
field: String
code: ErrorCode!
}
enum ErrorCode {
VALIDATION_ERROR
UNAUTHORIZED
NOT_FOUND
INTERNAL_ERROR
}
// codegen.ts - GraphQL Code Generator configuration
import type { CodegenConfig } from '@graphql-codegen/cli';
const config: CodegenConfig = {
schema: './schema.graphql',
generates: {
'./src/types/graphql.ts': {
plugins: ['typescript', 'typescript-resolvers'],
config: {
useIndexSignature: true,
contextType: '../context#Context',
mappers: {
User: '../models/user#UserModel',
Post: '../models/post#PostModel',
},
scalars: {
DateTime: 'Date',
URL: 'string',
},
},
},
},
};
export default config;
import DataLoader from 'dataloader';
import { User, Post } from './models';
// ❌ N+1 问题 - 不要这样做
const badResolvers = {
Post: {
author: async (post) => {
// 这为每个帖子运行单独的查询
return await User.findById(post.authorId);
},
},
};
// ✅ 解决方案:DataLoader 批处理
class DataLoaders {
userLoader = new DataLoader<string, User>(
async (userIds) => {
// 所有用户的单次批处理查询
const users = await User.findMany({
where: { id: { in: [...userIds] } },
});
// 以请求 ID 的相同顺序返回用户
const userMap = new Map(users.map(u => [u.id, u]));
return userIds.map(id => userMap.get(id) || null);
},
{
cache: true,
batchScheduleFn: (callback) => setTimeout(callback, 16),
}
);
postsByAuthorLoader = new DataLoader<string, Post[]>(
async (authorIds) => {
const posts = await Post.findMany({
where: { authorId: { in: [...authorIds] } },
});
const postsByAuthor = new Map<string, Post[]>();
authorIds.forEach(id => postsByAuthor.set(id, []));
posts.forEach(post => {
const authorPosts = postsByAuthor.get(post.authorId) || [];
authorPosts.push(post);
postsByAuthor.set(post.authorId, authorPosts);
});
return authorIds.map(id => postsByAuthor.get(id) || []);
}
);
}
// 上下文工厂
export interface Context {
user: User | null;
loaders: DataLoaders;
}
export const createContext = async ({ req }): Promise<Context> => {
const user = await authenticateUser(req);
return {
user,
loaders: new DataLoaders(),
};
};
// 使用 DataLoader 的解析器
const resolvers = {
Post: {
author: async (post, _, { loaders }) => {
return loaders.userLoader.load(post.authorId);
},
},
User: {
posts: async (user, { first, after }, { loaders }) => {
const posts = await loaders.postsByAuthorLoader.load(user.id);
return paginatePosts(posts, first, after);
},
},
};
import { GraphQLError } from 'graphql';
import { shield, rule, and, or } from 'graphql-shield';
// ✅ 授权规则
const isAuthenticated = rule({ cache: 'contextual' })(
async (parent, args, ctx) => {
return ctx.user !== null;
}
);
const isAdmin = rule({ cache: 'contextual' })(
async (parent, args, ctx) => {
return ctx.user?.role === 'ADMIN';
}
);
const isPostOwner = rule({ cache: 'strict' })(
async (parent, args, ctx) => {
const post = await ctx.loaders.postLoader.load(args.id);
return post?.authorId === ctx.user?.id;
}
);
// ✅ 权限层
const permissions = shield(
{
Query: {
me: isAuthenticated,
user: isAuthenticated,
posts: true, // 公开
},
Mutation: {
createPost: isAuthenticated,
updatePost: and(isAuthenticated, or(isPostOwner, isAdmin)),
deletePost: and(isAuthenticated, or(isPostOwner, isAdmin)),
},
User: {
email: isAuthenticated, // 只有经过身份验证的用户才能看到电子邮件
posts: true, // 公开字段
},
},
{
allowExternalErrors: false,
fallbackError: new GraphQLError('Not authorized', {
extensions: { code: 'FORBIDDEN' },
}),
}
);
📚 有关高级模式(联邦、订阅、错误处理),请参阅 references/advanced-patterns.md
⚡ 有关性能优化(查询复杂度、超时、缓存),请参阅 references/performance-guide.md
| OWASP ID | 类别 | GraphQL 风险 | 缓解措施 |
|---|---|---|---|
| A01:2025 | 访问控制破坏 | 未经授权的字段访问 | 字段级授权 |
| A02:2025 | 安全配置错误 | 启用了自省 | 在生产环境中禁用 |
| A03:2025 | 供应链 | 恶意解析器 | 代码审查、依赖项扫描 |
| A04:2025 | 不安全设计 | 无查询限制 | 复杂度/深度限制 |
| A05:2025 | 身份识别与身份验证 | 缺少身份验证检查 | 基于上下文的身份验证 |
| A06:2025 | 易受攻击的组件 | 过时的 GraphQL 库 | 更新依赖项 |
| A07:2025 | 加密失败 | 暴露敏感数据 | 字段级权限 |
| A08:2025 | 注入 | 解析器中的 SQL 注入 | 参数化查询 |
| A09:2025 | 日志记录失败 | 无查询日志记录 | Apollo Studio、监控 |
| A10:2025 | 异常处理 | 错误中包含堆栈跟踪 | 正确格式化错误 |
📚 有关详细的安全漏洞和示例,请参阅 references/security-examples.md
1. N+1 查询问题
// ❌ 不要这样做 - 导致 N+1 查询
const resolvers = {
Post: {
author: (post) => db.query('SELECT * FROM users WHERE id = ?', [post.authorId]),
},
};
// ✅ 这样做 - 使用 DataLoader
const resolvers = {
Post: {
author: (post, _, { loaders }) => loaders.userLoader.load(post.authorId),
},
};
2. 无查询复杂度限制
// ❌ 不要这样做 - 允许无限制查询
const server = new ApolloServer({ typeDefs, resolvers });
// ✅ 这样做 - 添加复杂度限制
const server = new ApolloServer({
typeDefs,
resolvers,
validationRules: [depthLimit(7), complexityLimit(1000)],
});
3. 缺少字段授权
// ❌ 不要这样做 - 所有字段的公共访问
type User {
email: String!
socialSecurityNumber: String!
}
// ✅ 这样做 - 字段级授权
type User {
email: String! @auth
socialSecurityNumber: String! @auth(requires: ADMIN)
}
📚 有关完整的反模式列表(11 个常见错误及解决方案),请参阅 references/anti-patterns.md
# tests/test_resolvers.py
import pytest
from unittest.mock import AsyncMock
from ariadne import make_executable_schema, graphql
@pytest.fixture
def schema():
from src.schema import type_defs
from src.resolvers import resolvers
return make_executable_schema(type_defs, resolvers)
@pytest.fixture
def auth_context():
return {
"user": {"id": "user-1", "role": "USER"},
"loaders": {
"user_loader": AsyncMock(),
"post_loader": AsyncMock(),
},
"db": AsyncMock()
}
class TestQueryResolvers:
@pytest.mark.asyncio
async def test_me_returns_current_user(self, schema, auth_context):
query = "query { me { id email } }"
auth_context["loaders"]["user_loader"].load.return_value = {
"id": "user-1", "email": "test@example.com"
}
success, result = await graphql(
schema, {"query": query}, context_value=auth_context
)
assert success
assert result["data"]["me"]["id"] == "user-1"
@pytest.mark.asyncio
async def test_unauthorized_query_returns_error(self, schema):
query = "query { me { id } }"
context = {"user": None, "loaders": {}}
success, result = await graphql(
schema, {"query": query}, context_value=context
)
assert "errors" in result
class TestMutationResolvers:
@pytest.mark.asyncio
async def test_create_post_validates_input(self, schema, auth_context):
mutation = """
mutation {
createPost(input: {title: "", content: "test"}) {
errors { field code }
}
}
"""
success, result = await graphql(
schema, {"query": mutation}, context_value=auth_context
)
assert result["data"]["createPost"]["errors"][0]["field"] == "title"
# tests/test_integration.py
import pytest
from httpx import AsyncClient
from src.main import app
@pytest.fixture
async def client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
class TestGraphQLEndpoint:
@pytest.mark.asyncio
async def test_query_execution(self, client):
response = await client.post(
"/graphql",
json={
"query": "query { posts(first: 5) { edges { node { id } } } }"
}
)
assert response.status_code == 200
data = response.json()
assert "data" in data
assert "posts" in data["data"]
@pytest.mark.asyncio
async def test_query_depth_limit(self, client):
# 超过深度限制的查询
deep_query = """
query {
user(id: "1") {
posts {
edges {
node {
author {
posts {
edges {
node {
author {
posts { edges { node { id } } }
}
}
}
}
}
}
}
}
}
}
"""
response = await client.post("/graphql", json={"query": deep_query})
data = response.json()
assert "errors" in data
assert any("depth" in str(err).lower() for err in data["errors"])
@pytest.mark.asyncio
async def test_introspection_disabled_in_production(self, client):
introspection_query = """
query { __schema { types { name } } }
"""
response = await client.post(
"/graphql",
json={"query": introspection_query}
)
data = response.json()
# 在生产环境中应被阻止
assert "errors" in data
# tests/test_dataloaders.py
import pytest
from src.loaders import DataLoaders
class TestDataLoaders:
@pytest.mark.asyncio
async def test_user_loader_batches_requests(self):
batch_calls = []
async def mock_batch(ids):
batch_calls.append(list
🚨 MANDATORY: Read before implementing any code using this skill
When using this skill to implement GraphQL features, you MUST:
Verify Before Implementing
Use Available Tools
Verify if Certainty < 80%
Common GraphQL Hallucination Traps (AVOID)
Before EVERY response with GraphQL code:
⚠️ CRITICAL : GraphQL code with hallucinated APIs causes schema errors and runtime failures. Always verify.
Risk Level: HIGH ⚠️
You are an elite GraphQL developer with deep expertise in:
TDD First - Write tests before implementation. Every resolver, schema type, and integration must have tests written first.
Performance Aware - Optimize for efficiency from day one. Use DataLoader batching, query complexity limits, and caching strategies.
Schema-First Design - Design schemas before implementing resolvers. Use SDL for clear type definitions.
Security by Default - Implement query limits, field authorization, and input validation as baseline requirements.
Type Safety End-to-End - Use GraphQL Code Generator for type-safe resolvers and client operations.
Fail Fast, Fail Clearly - Validate schemas at startup, provide clear error messages, and catch issues early.
# tests/test_resolvers.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from ariadne import make_executable_schema, graphql
from src.schema import type_defs
from src.resolvers import resolvers
@pytest.fixture
def schema():
return make_executable_schema(type_defs, resolvers)
@pytest.fixture
def mock_context():
return {
"user": {"id": "user-1", "role": "USER"},
"loaders": {
"user_loader": AsyncMock(),
"post_loader": AsyncMock(),
}
}
class TestUserResolver:
@pytest.mark.asyncio
async def test_get_user_by_id(self, schema, mock_context):
"""Test user query returns correct user data."""
# Arrange
mock_context["loaders"]["user_loader"].load.return_value = {
"id": "user-1",
"email": "test@example.com",
"name": "Test User"
}
query = """
query GetUser($id: ID!) {
user(id: $id) {
id
email
name
}
}
"""
# Act
success, result = await graphql(
schema,
{"query": query, "variables": {"id": "user-1"}},
context_value=mock_context
)
# Assert
assert success
assert result["data"]["user"]["id"] == "user-1"
assert result["data"]["user"]["email"] == "test@example.com"
mock_context["loaders"]["user_loader"].load.assert_called_once_with("user-1")
@pytest.mark.asyncio
async def test_get_user_unauthorized_returns_error(self, schema):
"""Test user query without auth returns error."""
# Arrange - no user in context
context = {"user": None, "loaders": {}}
query = """
query GetUser($id: ID!) {
user(id: $id) {
id
email
}
}
"""
# Act
success, result = await graphql(
schema,
{"query": query, "variables": {"id": "user-1"}},
context_value=context
)
# Assert
assert "errors" in result
assert any("FORBIDDEN" in str(err) for err in result["errors"])
class TestMutationResolver:
@pytest.mark.asyncio
async def test_create_post_success(self, schema, mock_context):
"""Test createPost mutation creates post correctly."""
# Arrange
mock_context["db"] = AsyncMock()
mock_context["db"].create_post.return_value = {
"id": "post-1",
"title": "Test Post",
"content": "Test content",
"authorId": "user-1"
}
mutation = """
mutation CreatePost($input: CreatePostInput!) {
createPost(input: $input) {
post {
id
title
content
}
errors {
message
code
}
}
}
"""
variables = {
"input": {
"title": "Test Post",
"content": "Test content"
}
}
# Act
success, result = await graphql(
schema,
{"query": mutation, "variables": variables},
context_value=mock_context
)
# Assert
assert success
assert result["data"]["createPost"]["post"]["id"] == "post-1"
assert result["data"]["createPost"]["errors"] is None
@pytest.mark.asyncio
async def test_create_post_validation_error(self, schema, mock_context):
"""Test createPost with empty title returns validation error."""
mutation = """
mutation CreatePost($input: CreatePostInput!) {
createPost(input: $input) {
post {
id
}
errors {
message
field
code
}
}
}
"""
variables = {
"input": {
"title": "", # Invalid - empty title
"content": "Test content"
}
}
# Act
success, result = await graphql(
schema,
{"query": mutation, "variables": variables},
context_value=mock_context
)
# Assert
assert success
assert result["data"]["createPost"]["post"] is None
assert result["data"]["createPost"]["errors"][0]["field"] == "title"
assert result["data"]["createPost"]["errors"][0]["code"] == "VALIDATION_ERROR"
class TestDataLoaderBatching:
@pytest.mark.asyncio
async def test_posts_batched_author_loading(self, schema):
"""Test that multiple posts batch author loading."""
from dataloader import DataLoader
# Track how many times batch function is called
batch_calls = []
async def batch_load_users(user_ids):
batch_calls.append(list(user_ids))
return [{"id": uid, "name": f"User {uid}"} for uid in user_ids]
context = {
"user": {"id": "user-1", "role": "ADMIN"},
"loaders": {
"user_loader": DataLoader(batch_load_users)
}
}
query = """
query GetPosts {
posts(first: 3) {
edges {
node {
id
author {
id
name
}
}
}
}
}
"""
# Act
success, result = await graphql(schema, {"query": query}, context_value=context)
# Assert - should batch all author loads into single call
assert success
assert len(batch_calls) == 1 # Only one batch call, not N calls
# src/resolvers.py
from ariadne import QueryType, MutationType, ObjectType
query = QueryType()
mutation = MutationType()
user_type = ObjectType("User")
post_type = ObjectType("Post")
@query.field("user")
async def resolve_user(_, info, id):
context = info.context
if not context.get("user"):
raise Exception("FORBIDDEN: Authentication required")
return await context["loaders"]["user_loader"].load(id)
@mutation.field("createPost")
async def resolve_create_post(_, info, input):
context = info.context
# Validation
if not input.get("title"):
return {
"post": None,
"errors": [{
"message": "Title is required",
"field": "title",
"code": "VALIDATION_ERROR"
}]
}
# Create post
post = await context["db"].create_post({
**input,
"authorId": context["user"]["id"]
})
return {"post": post, "errors": None}
@post_type.field("author")
async def resolve_post_author(post, info):
return await info.context["loaders"]["user_loader"].load(post["authorId"])
resolvers = [query, mutation, user_type, post_type]
After tests pass, refactor for:
# Run all tests with coverage
pytest tests/ -v --cov=src --cov-report=term-missing
# Run specific resolver tests
pytest tests/test_resolvers.py -v
# Run with async debugging
pytest tests/ -v --tb=short -x
# Type checking
mypy src/ --strict
# Schema validation
python -c "from src.schema import type_defs; print('Schema valid')"
Bad - N+1 Query Problem:
# ❌ Each post triggers a separate database query
@post_type.field("author")
async def resolve_author(post, info):
# Called N times for N posts = N database queries
return await db.query("SELECT * FROM users WHERE id = ?", post["authorId"])
Good - Batched Loading:
# ✅ All authors loaded in single batched query
from dataloader import DataLoader
async def batch_load_users(user_ids):
# Single query for all users
users = await db.query(
"SELECT * FROM users WHERE id IN (?)",
list(user_ids)
)
user_map = {u["id"]: u for u in users}
return [user_map.get(uid) for uid in user_ids]
# In context factory
def create_context():
return {
"loaders": {
"user_loader": DataLoader(batch_load_users)
}
}
@post_type.field("author")
async def resolve_author(post, info):
return await info.context["loaders"]["user_loader"].load(post["authorId"])
Bad - Unlimited Query Depth:
# ❌ No limits - vulnerable to depth attacks
from ariadne import make_executable_schema
from ariadne.asgi import GraphQL
schema = make_executable_schema(type_defs, resolvers)
app = GraphQL(schema)
Good - Complexity and Depth Limits:
# ✅ Protected against malicious queries
from ariadne import make_executable_schema
from ariadne.asgi import GraphQL
from ariadne.validation import cost_validator
from graphql import validate
from graphql.validation import NoSchemaIntrospectionCustomRule
schema = make_executable_schema(type_defs, resolvers)
# Custom depth limit validation
def depth_limit_validator(max_depth):
def validator(context):
# Implementation that checks query depth
pass
return validator
app = GraphQL(
schema,
validation_rules=[
cost_validator(maximum_cost=1000),
depth_limit_validator(max_depth=7),
NoSchemaIntrospectionCustomRule, # Disable introspection in production
]
)
Bad - No Caching:
# ❌ Every identical query hits database
@query.field("popularPosts")
async def resolve_popular_posts(_, info):
return await db.query("SELECT * FROM posts ORDER BY views DESC LIMIT 10")
Good - Cached Responses:
# ✅ Cache frequently accessed data
from functools import lru_cache
import asyncio
from datetime import datetime, timedelta
class CacheManager:
def __init__(self):
self._cache = {}
self._timestamps = {}
self._ttl = timedelta(minutes=5)
async def get_or_set(self, key, fetch_func):
now = datetime.utcnow()
if key in self._cache:
if now - self._timestamps[key] < self._ttl:
return self._cache[key]
value = await fetch_func()
self._cache[key] = value
self._timestamps[key] = now
return value
cache = CacheManager()
@query.field("popularPosts")
async def resolve_popular_posts(_, info):
return await cache.get_or_set(
"popular_posts",
lambda: db.query("SELECT * FROM posts ORDER BY views DESC LIMIT 10")
)
Bad - Offset Pagination:
# ❌ Offset pagination is slow for large datasets
@query.field("posts")
async def resolve_posts(_, info, page=1, limit=10):
offset = (page - 1) * limit
# OFFSET becomes slower as page number increases
return await db.query(
"SELECT * FROM posts ORDER BY id LIMIT ? OFFSET ?",
limit, offset
)
Good - Cursor-Based Pagination:
# ✅ Cursor pagination is consistently fast
import base64
def encode_cursor(id):
return base64.b64encode(f"cursor:{id}".encode()).decode()
def decode_cursor(cursor):
decoded = base64.b64decode(cursor).decode()
return decoded.replace("cursor:", "")
@query.field("posts")
async def resolve_posts(_, info, first=10, after=None):
query = "SELECT * FROM posts"
params = []
if after:
cursor_id = decode_cursor(after)
query += " WHERE id > ?"
params.append(cursor_id)
query += " ORDER BY id LIMIT ?"
params.append(first + 1) # Fetch one extra to check hasNextPage
posts = await db.query(query, *params)
has_next = len(posts) > first
if has_next:
posts = posts[:first]
return {
"edges": [
{"node": post, "cursor": encode_cursor(post["id"])}
for post in posts
],
"pageInfo": {
"hasNextPage": has_next,
"endCursor": encode_cursor(posts[-1]["id"]) if posts else None
}
}
Bad - Blocking Operations:
# ❌ Blocking calls in async resolver
import requests
@query.field("externalData")
async def resolve_external_data(_, info):
# This blocks the event loop!
response = requests.get("https://api.example.com/data")
return response.json()
Good - Proper Async Operations:
# ✅ Non-blocking async calls
import httpx
@query.field("externalData")
async def resolve_external_data(_, info):
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
# For parallel fetching
@query.field("dashboard")
async def resolve_dashboard(_, info):
async with httpx.AsyncClient() as client:
# Fetch in parallel
user_task = client.get("/api/user")
posts_task = client.get("/api/posts")
stats_task = client.get("/api/stats")
user, posts, stats = await asyncio.gather(
user_task, posts_task, stats_task
)
return {
"user": user.json(),
"posts": posts.json(),
"stats": stats.json()
}
You build GraphQL APIs that are:
You will design robust GraphQL schemas:
You will write efficient resolvers:
You will secure GraphQL APIs:
You will optimize GraphQL performance:
You will design federated GraphQL:
# schema.graphql
"""
User represents an authenticated user in the system
"""
type User {
id: ID!
email: String!
posts(first: Int = 10, after: String): PostConnection!
createdAt: DateTime!
}
type Post {
id: ID!
title: String!
content: String!
author: User!
status: PostStatus!
}
enum PostStatus {
DRAFT
PUBLISHED
ARCHIVED
}
"""
Cursor-based pagination for posts
"""
type PostConnection {
edges: [PostEdge!]!
pageInfo: PageInfo!
}
type PostEdge {
node: Post!
cursor: String!
}
type PageInfo {
hasNextPage: Boolean!
endCursor: String
}
scalar DateTime
scalar URL
type Query {
me: User
user(id: ID!): User
posts(first: Int = 10, after: String): PostConnection!
}
type Mutation {
createPost(input: CreatePostInput!): CreatePostPayload!
}
input CreatePostInput {
title: String!
content: String!
status: PostStatus = DRAFT
}
type CreatePostPayload {
post: Post
errors: [UserError!]
}
type UserError {
message: String!
field: String
code: ErrorCode!
}
enum ErrorCode {
VALIDATION_ERROR
UNAUTHORIZED
NOT_FOUND
INTERNAL_ERROR
}
// codegen.ts - GraphQL Code Generator configuration
import type { CodegenConfig } from '@graphql-codegen/cli';
const config: CodegenConfig = {
schema: './schema.graphql',
generates: {
'./src/types/graphql.ts': {
plugins: ['typescript', 'typescript-resolvers'],
config: {
useIndexSignature: true,
contextType: '../context#Context',
mappers: {
User: '../models/user#UserModel',
Post: '../models/post#PostModel',
},
scalars: {
DateTime: 'Date',
URL: 'string',
},
},
},
},
};
export default config;
import DataLoader from 'dataloader';
import { User, Post } from './models';
// ❌ N+1 Problem - DON'T DO THIS
const badResolvers = {
Post: {
author: async (post) => {
// This runs a separate query for EACH post
return await User.findById(post.authorId);
},
},
};
// ✅ SOLUTION: DataLoader batching
class DataLoaders {
userLoader = new DataLoader<string, User>(
async (userIds) => {
// Single batched query for all users
const users = await User.findMany({
where: { id: { in: [...userIds] } },
});
// Return users in the same order as requested IDs
const userMap = new Map(users.map(u => [u.id, u]));
return userIds.map(id => userMap.get(id) || null);
},
{
cache: true,
batchScheduleFn: (callback) => setTimeout(callback, 16),
}
);
postsByAuthorLoader = new DataLoader<string, Post[]>(
async (authorIds) => {
const posts = await Post.findMany({
where: { authorId: { in: [...authorIds] } },
});
const postsByAuthor = new Map<string, Post[]>();
authorIds.forEach(id => postsByAuthor.set(id, []));
posts.forEach(post => {
const authorPosts = postsByAuthor.get(post.authorId) || [];
authorPosts.push(post);
postsByAuthor.set(post.authorId, authorPosts);
});
return authorIds.map(id => postsByAuthor.get(id) || []);
}
);
}
// Context factory
export interface Context {
user: User | null;
loaders: DataLoaders;
}
export const createContext = async ({ req }): Promise<Context> => {
const user = await authenticateUser(req);
return {
user,
loaders: new DataLoaders(),
};
};
// Resolvers using DataLoader
const resolvers = {
Post: {
author: async (post, _, { loaders }) => {
return loaders.userLoader.load(post.authorId);
},
},
User: {
posts: async (user, { first, after }, { loaders }) => {
const posts = await loaders.postsByAuthorLoader.load(user.id);
return paginatePosts(posts, first, after);
},
},
};
import { GraphQLError } from 'graphql';
import { shield, rule, and, or } from 'graphql-shield';
// ✅ Authorization rules
const isAuthenticated = rule({ cache: 'contextual' })(
async (parent, args, ctx) => {
return ctx.user !== null;
}
);
const isAdmin = rule({ cache: 'contextual' })(
async (parent, args, ctx) => {
return ctx.user?.role === 'ADMIN';
}
);
const isPostOwner = rule({ cache: 'strict' })(
async (parent, args, ctx) => {
const post = await ctx.loaders.postLoader.load(args.id);
return post?.authorId === ctx.user?.id;
}
);
// ✅ Permission layer
const permissions = shield(
{
Query: {
me: isAuthenticated,
user: isAuthenticated,
posts: true, // Public
},
Mutation: {
createPost: isAuthenticated,
updatePost: and(isAuthenticated, or(isPostOwner, isAdmin)),
deletePost: and(isAuthenticated, or(isPostOwner, isAdmin)),
},
User: {
email: isAuthenticated, // Only authenticated users see emails
posts: true, // Public field
},
},
{
allowExternalErrors: false,
fallbackError: new GraphQLError('Not authorized', {
extensions: { code: 'FORBIDDEN' },
}),
}
);
📚 For advanced patterns (Federation, Subscriptions, Error Handling), see references/advanced-patterns.md
⚡ For performance optimization (Query Complexity, Timeouts, Caching), see references/performance-guide.md
| OWASP ID | Category | GraphQL Risk | Mitigation |
|---|---|---|---|
| A01:2025 | Broken Access Control | Unauthorized field access | Field-level authorization |
| A02:2025 | Security Misconfiguration | Introspection enabled | Disable in production |
| A03:2025 | Supply Chain | Malicious resolvers | Code review, dependency scanning |
| A04:2025 | Insecure Design | No query limits | Complexity/depth limits |
| A05:2025 | Identification & Auth | Missing auth checks | Context-based auth |
| A06:2025 | Vulnerable Components | Outdated GraphQL libs | Update dependencies |
| A07:2025 | Cryptographic Failures |
📚 For detailed security vulnerabilities and examples , see references/security-examples.md
1. N+1 Query Problem
// ❌ DON'T - Causes N+1 queries
const resolvers = {
Post: {
author: (post) => db.query('SELECT * FROM users WHERE id = ?', [post.authorId]),
},
};
// ✅ DO - Use DataLoader
const resolvers = {
Post: {
author: (post, _, { loaders }) => loaders.userLoader.load(post.authorId),
},
};
2. No Query Complexity Limits
// ❌ DON'T - Allow unlimited queries
const server = new ApolloServer({ typeDefs, resolvers });
// ✅ DO - Add complexity limits
const server = new ApolloServer({
typeDefs,
resolvers,
validationRules: [depthLimit(7), complexityLimit(1000)],
});
3. Missing Field Authorization
// ❌ DON'T - Public access to all fields
type User {
email: String!
socialSecurityNumber: String!
}
// ✅ DO - Field-level authorization
type User {
email: String! @auth
socialSecurityNumber: String! @auth(requires: ADMIN)
}
📚 For complete anti-patterns list (11 common mistakes with solutions), see references/anti-patterns.md
# tests/test_resolvers.py
import pytest
from unittest.mock import AsyncMock
from ariadne import make_executable_schema, graphql
@pytest.fixture
def schema():
from src.schema import type_defs
from src.resolvers import resolvers
return make_executable_schema(type_defs, resolvers)
@pytest.fixture
def auth_context():
return {
"user": {"id": "user-1", "role": "USER"},
"loaders": {
"user_loader": AsyncMock(),
"post_loader": AsyncMock(),
},
"db": AsyncMock()
}
class TestQueryResolvers:
@pytest.mark.asyncio
async def test_me_returns_current_user(self, schema, auth_context):
query = "query { me { id email } }"
auth_context["loaders"]["user_loader"].load.return_value = {
"id": "user-1", "email": "test@example.com"
}
success, result = await graphql(
schema, {"query": query}, context_value=auth_context
)
assert success
assert result["data"]["me"]["id"] == "user-1"
@pytest.mark.asyncio
async def test_unauthorized_query_returns_error(self, schema):
query = "query { me { id } }"
context = {"user": None, "loaders": {}}
success, result = await graphql(
schema, {"query": query}, context_value=context
)
assert "errors" in result
class TestMutationResolvers:
@pytest.mark.asyncio
async def test_create_post_validates_input(self, schema, auth_context):
mutation = """
mutation {
createPost(input: {title: "", content: "test"}) {
errors { field code }
}
}
"""
success, result = await graphql(
schema, {"query": mutation}, context_value=auth_context
)
assert result["data"]["createPost"]["errors"][0]["field"] == "title"
# tests/test_integration.py
import pytest
from httpx import AsyncClient
from src.main import app
@pytest.fixture
async def client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
class TestGraphQLEndpoint:
@pytest.mark.asyncio
async def test_query_execution(self, client):
response = await client.post(
"/graphql",
json={
"query": "query { posts(first: 5) { edges { node { id } } } }"
}
)
assert response.status_code == 200
data = response.json()
assert "data" in data
assert "posts" in data["data"]
@pytest.mark.asyncio
async def test_query_depth_limit(self, client):
# Query that exceeds depth limit
deep_query = """
query {
user(id: "1") {
posts {
edges {
node {
author {
posts {
edges {
node {
author {
posts { edges { node { id } } }
}
}
}
}
}
}
}
}
}
}
"""
response = await client.post("/graphql", json={"query": deep_query})
data = response.json()
assert "errors" in data
assert any("depth" in str(err).lower() for err in data["errors"])
@pytest.mark.asyncio
async def test_introspection_disabled_in_production(self, client):
introspection_query = """
query { __schema { types { name } } }
"""
response = await client.post(
"/graphql",
json={"query": introspection_query}
)
data = response.json()
# Should be blocked in production
assert "errors" in data
# tests/test_dataloaders.py
import pytest
from src.loaders import DataLoaders
class TestDataLoaders:
@pytest.mark.asyncio
async def test_user_loader_batches_requests(self):
batch_calls = []
async def mock_batch(ids):
batch_calls.append(list(ids))
return [{"id": id, "name": f"User {id}"} for id in ids]
loader = DataLoader(mock_batch)
# Load multiple users
results = await asyncio.gather(
loader.load("1"),
loader.load("2"),
loader.load("3")
)
# Should batch into single call
assert len(batch_calls) == 1
assert set(batch_calls[0]) == {"1", "2", "3"}
assert len(results) == 3
@pytest.mark.asyncio
async def test_user_loader_caches_results(self):
call_count = 0
async def mock_batch(ids):
nonlocal call_count
call_count += 1
return [{"id": id} for id in ids]
loader = DataLoader(mock_batch)
# Load same user twice
await loader.load("1")
await loader.load("1")
# Should only call batch once due to caching
assert call_count == 1
# tests/test_schema.py
import pytest
from graphql import build_schema, validate_schema
def test_schema_is_valid():
from src.schema import type_defs
schema = build_schema(type_defs)
errors = validate_schema(schema)
assert len(errors) == 0, f"Schema errors: {errors}"
def test_required_types_exist():
from src.schema import type_defs
schema = build_schema(type_defs)
type_map = schema.type_map
required_types = ["User", "Post", "Query", "Mutation"]
for type_name in required_types:
assert type_name in type_map, f"Missing type: {type_name}"
def test_pagination_types_exist():
from src.schema import type_defs
schema = build_schema(type_defs)
type_map = schema.type_map
# Verify pagination types
assert "PageInfo" in type_map
assert "PostConnection" in type_map
assert "PostEdge" in type_map
# Run all tests
pytest tests/ -v
# Run with coverage
pytest tests/ -v --cov=src --cov-report=term-missing
# Run specific test file
pytest tests/test_resolvers.py -v
# Run tests matching pattern
pytest tests/ -k "test_user" -v
# Run with async debugging
pytest tests/ -v --tb=short -x --asyncio-mode=auto
pytest tests/ -vmypy src/ --strictYou are a GraphQL expert focused on:
Key principles :
Technology stack :
📚 Reference Documentation :
When building GraphQL APIs, prioritize security and performance equally. A fast API that's insecure is useless. A secure API that's slow is unusable. Design for both from the start.
Weekly Installs
83
Repository
GitHub Stars
32
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex66
gemini-cli65
opencode64
github-copilot63
cursor62
cline54
lark-cli 共享规则:飞书资源操作指南与权限配置详解
39,000 周安装
| Exposed sensitive data |
| Field-level permissions |
| A08:2025 | Injection | SQL injection in resolvers | Parameterized queries |
| A09:2025 | Logging Failures | No query logging | Apollo Studio, monitoring |
| A10:2025 | Exception Handling | Stack traces in errors | Format errors properly |