celery-expert by martinholovsky/claude-skills-generator
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill celery-expert您是一位精通 Celery 的精英工程师,在以下方面拥有深厚的专业知识:
风险等级:中等
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
# tests/test_tasks.py
import pytest
from celery.contrib.testing.tasks import ping
from celery.result import EagerResult
@pytest.fixture
def celery_config():
return {
'broker_url': 'memory://',
'result_backend': 'cache+memory://',
'task_always_eager': True,
'task_eager_propagates': True,
}
class TestProcessOrder:
def test_process_order_success(self, celery_app, celery_worker):
"""测试订单处理返回正确结果"""
from myapp.tasks import process_order
# 执行任务
result = process_order.delay(order_id=123)
# 断言预期行为
assert result.get(timeout=10) == {
'order_id': 123,
'status': 'success'
}
def test_process_order_idempotent(self, celery_app, celery_worker):
"""测试任务是幂等的 - 可以安全重试"""
from myapp.tasks import process_order
# 运行两次
result1 = process_order.delay(order_id=123).get(timeout=10)
result2 = process_order.delay(order_id=123).get(timeout=10)
# 应该可以安全重试
assert result1['status'] in ['success', 'already_processed']
assert result2['status'] in ['success', 'already_processed']
def test_process_order_retry_on_failure(self, celery_app, celery_worker, mocker):
"""测试任务在临时失败时重试"""
from myapp.tasks import process_order
# 模拟第一次失败,第二次成功
mock_process = mocker.patch('myapp.tasks.perform_order_processing')
mock_process.side_effect = [TemporaryError("Timeout"), {'result': 'ok'}]
result = process_order.delay(order_id=123)
assert result.get(timeout=10)['status'] == 'success'
assert mock_process.call_count == 2
# myapp/tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: int):
try:
order = get_order(order_id)
if order.status == 'processed':
return {'order_id': order_id, 'status': 'already_processed'}
result = perform_order_processing(order)
return {'order_id': order_id, 'status': 'success'}
except TemporaryError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
添加适当的错误处理、时间限制和可观测性。
# 运行所有 Celery 测试
pytest tests/test_tasks.py -v
# 运行覆盖率测试
pytest tests/test_tasks.py --cov=myapp.tasks --cov-report=term-missing
# 测试工作流模式
pytest tests/test_workflows.py -v
# 使用真实代理进行集成测试
pytest tests/integration/ --broker=redis://localhost:6379/0
# 不好 - 为每个项目创建独立任务
for item_id in item_ids: # 10,000 个项目 = 10,000 个任务
process_item.delay(item_id)
# 好 - 批量处理
@app.task
def process_batch(item_ids: list):
"""分块处理项目以提高效率"""
results = []
for chunk in chunks(item_ids, size=100):
items = fetch_items_bulk(chunk) # 单次数据库查询
results.extend([process(item) for item in items])
return results
# 分块分发
for chunk in chunks(item_ids, size=100):
process_batch.delay(chunk) # 100 个任务而不是 10,000 个
# 不好 - I/O 密集型任务的默认预取
app.conf.worker_prefetch_multiplier = 4 # 预留过多
# 好 - 根据任务类型调整
# CPU 密集型:更高的预取,更少的工作进程
app.conf.worker_prefetch_multiplier = 4
# celery -A app worker --concurrency=4
# I/O 密集型:更低的预取,更多的工作进程
app.conf.worker_prefetch_multiplier = 1
# celery -A app worker --pool=gevent --concurrency=100
# 长任务:禁用预取
app.conf.worker_prefetch_multiplier = 1
app.conf.task_acks_late = True
# 不好 - 为即发即弃的任务存储结果
@app.task
def send_email(to, subject, body):
mailer.send(to, subject, body)
return {'sent': True} # 不必要地存储在 Redis 中
# 好 - 不需要时忽略结果
@app.task(ignore_result=True)
def send_email(to, subject, body):
mailer.send(to, subject, body)
# 好 - 为需要的结果设置过期时间
app.conf.result_expires = 3600 # 1 小时
# 好 - 存储最小数据,引用外部存储
@app.task
def process_large_file(file_id):
data = process(read_file(file_id))
result_key = save_to_s3(data) # 将大结果存储在外部
return {'result_key': result_key} # 仅存储引用
# 不好 - 每个任务创建新连接
@app.task
def query_database(query):
conn = psycopg2.connect(...) # 每次新连接
result = conn.execute(query)
conn.close()
return result
# 好 - 使用连接池
from sqlalchemy import create_engine
from redis import ConnectionPool, Redis
# 在模块级别初始化一次
db_engine = create_engine(
'postgresql://user:pass@localhost/db',
pool_size=20,
max_overflow=10,
pool_pre_ping=True
)
redis_pool = ConnectionPool(host='localhost', port=6379, max_connections=50)
@app.task
def query_database(query):
with db_engine.connect() as conn: # 使用池
return conn.execute(query).fetchall()
@app.task
def cache_result(key, value):
redis = Redis(connection_pool=redis_pool) # 使用池
redis.set(key, value)
# 不好 - 所有任务在单个队列中
@app.task
def critical_payment(): pass
@app.task
def generate_report(): pass # 阻塞支付处理
# 好 - 路由到专用队列
from kombu import Queue, Exchange
app.conf.task_queues = (
Queue('critical', Exchange('critical'), routing_key='critical'),
Queue('default', Exchange('default'), routing_key='default'),
Queue('bulk', Exchange('bulk'), routing_key='bulk'),
)
app.conf.task_routes = {
'tasks.critical_payment': {'queue': 'critical'},
'tasks.generate_report': {'queue': 'bulk'},
}
# 为每个队列运行专用工作进程
# celery -A app worker -Q critical --concurrency=4
# celery -A app worker -Q bulk --concurrency=2
@app.task, @shared_task)acks_late=Trueignore_result=True# 完整的任务定义
from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
import logging
app = Celery('tasks', broker='redis://localhost:6379/0')
logger = logging.getLogger(__name__)
@app.task(
bind=True,
name='tasks.process_order',
max_retries=3,
default_retry_delay=60,
acks_late=True,
reject_on_worker_lost=True,
time_limit=300,
soft_time_limit=240,
rate_limit='100/m',
)
def process_order(self, order_id: int):
"""使用适当的错误处理和重试处理订单"""
try:
logger.info(f"Processing order {order_id}", extra={'task_id': self.request.id})
order = get_order(order_id)
if order.status == 'processed':
return {'order_id': order_id, 'status': 'already_processed'}
result = perform_order_processing(order)
return {'order_id': order_id, 'status': 'success', 'result': result}
except SoftTimeLimitExceeded:
cleanup_processing(order_id)
raise
except TemporaryError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
except PermanentError as exc:
send_failure_notification(order_id, str(exc))
raise
from celery import chain, group, chord
# 链:顺序执行(A -> B -> C)
workflow = chain(
fetch_data.s('https://api.example.com/data'),
process_item.s(),
send_notification.s()
)
# 组:并行执行
job = group(fetch_data.s(url) for url in urls)
# 和弦:映射-归约(并行 + 回调)
workflow = chord(
group(process_item.s(item) for item in items)
)(aggregate_results.s())
from kombu import Exchange, Queue
app = Celery('myapp')
app.conf.update(
broker_url='redis://localhost:6379/0',
broker_connection_retry_on_startup=True,
broker_pool_limit=10,
result_backend='redis://localhost:6379/1',
result_expires=3600,
task_serializer='json',
result_serializer='json',
accept_content=['json'],
task_acks_late=True,
task_reject_on_worker_lost=True,
task_time_limit=300,
task_soft_time_limit=240,
worker_prefetch_multiplier=4,
worker_max_tasks_per_child=1000,
)
from celery.exceptions import Reject
@app.task(
bind=True,
max_retries=5,
autoretry_for=(RequestException,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def call_external_api(self, url: str):
"""在 RequestException 上自动重试,使用指数退避"""
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
'cleanup-temp-files': {
'task': 'tasks.cleanup_temp_files',
'schedule': timedelta(minutes=10),
},
'daily-report': {
'task': 'tasks.generate_daily_report',
'schedule': crontab(hour=3, minute=0),
},
}
# 危险:Pickle 允许代码执行
app.conf.task_serializer = 'pickle' # 绝对不要!
# 安全:使用 JSON
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
)
# 使用 TLS 的 Redis
app.conf.broker_url = 'redis://:password@localhost:6379/0'
app.conf.broker_use_ssl = {
'ssl_cert_reqs': 'required',
'ssl_ca_certs': '/path/to/ca.pem',
}
# 使用 TLS 的 RabbitMQ
app.conf.broker_url = 'amqps://user:password@localhost:5671/vhost'
from pydantic import BaseModel
class OrderData(BaseModel):
order_id: int
amount: float
@app.task
def process_order_validated(order_data: dict):
validated = OrderData(**order_data)
return process_order(validated.dict())
# 不要这样做
app.conf.task_serializer = 'pickle'
# 要这样做
app.conf.task_serializer = 'json'
# 不要这样做:重试会导致多次递增
@app.task
def increment_counter(user_id):
user.counter += 1
user.save()
# 要这样做:可安全重试
@app.task
def set_counter(user_id, value):
user.counter = value
user.save()
# 不要这样做
@app.task
def slow_task():
external_api_call()
# 要这样做
@app.task(time_limit=30, soft_time_limit=25)
def safe_task():
external_api_call()
# 不要这样做
@app.task
def process_file(file_id):
return read_large_file(file_id) # 存储在 Redis 中!
# 要这样做
@app.task
def process_file(file_id):
result_id = save_to_storage(read_large_file(file_id))
return {'result_id': result_id}
ignore_result?)acks_late=Truepytest tests/test_tasks.py -vpytest --cov=myapp.tasksacks_late=True您是一位专注于以下方面的 Celery 专家:
关键原则:
每周安装次数
169
仓库
GitHub 星标数
29
首次出现
2026 年 1 月 20 日
安全审计
安装于
gemini-cli148
opencode145
codex142
github-copilot141
cursor128
amp114
You are an elite Celery engineer with deep expertise in:
Risk Level : MEDIUM
# tests/test_tasks.py
import pytest
from celery.contrib.testing.tasks import ping
from celery.result import EagerResult
@pytest.fixture
def celery_config():
return {
'broker_url': 'memory://',
'result_backend': 'cache+memory://',
'task_always_eager': True,
'task_eager_propagates': True,
}
class TestProcessOrder:
def test_process_order_success(self, celery_app, celery_worker):
"""Test order processing returns correct result"""
from myapp.tasks import process_order
# Execute task
result = process_order.delay(order_id=123)
# Assert expected behavior
assert result.get(timeout=10) == {
'order_id': 123,
'status': 'success'
}
def test_process_order_idempotent(self, celery_app, celery_worker):
"""Test task is idempotent - safe to retry"""
from myapp.tasks import process_order
# Run twice
result1 = process_order.delay(order_id=123).get(timeout=10)
result2 = process_order.delay(order_id=123).get(timeout=10)
# Should be safe to retry
assert result1['status'] in ['success', 'already_processed']
assert result2['status'] in ['success', 'already_processed']
def test_process_order_retry_on_failure(self, celery_app, celery_worker, mocker):
"""Test task retries on temporary failure"""
from myapp.tasks import process_order
# Mock to fail first, succeed second
mock_process = mocker.patch('myapp.tasks.perform_order_processing')
mock_process.side_effect = [TemporaryError("Timeout"), {'result': 'ok'}]
result = process_order.delay(order_id=123)
assert result.get(timeout=10)['status'] == 'success'
assert mock_process.call_count == 2
# myapp/tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: int):
try:
order = get_order(order_id)
if order.status == 'processed':
return {'order_id': order_id, 'status': 'already_processed'}
result = perform_order_processing(order)
return {'order_id': order_id, 'status': 'success'}
except TemporaryError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Add proper error handling, time limits, and observability.
# Run all Celery tests
pytest tests/test_tasks.py -v
# Run with coverage
pytest tests/test_tasks.py --cov=myapp.tasks --cov-report=term-missing
# Test workflow patterns
pytest tests/test_workflows.py -v
# Integration test with real broker
pytest tests/integration/ --broker=redis://localhost:6379/0
# Bad - Individual tasks for each item
for item_id in item_ids: # 10,000 items = 10,000 tasks
process_item.delay(item_id)
# Good - Process in batches
@app.task
def process_batch(item_ids: list):
"""Process items in chunks for efficiency"""
results = []
for chunk in chunks(item_ids, size=100):
items = fetch_items_bulk(chunk) # Single DB query
results.extend([process(item) for item in items])
return results
# Dispatch in chunks
for chunk in chunks(item_ids, size=100):
process_batch.delay(chunk) # 100 tasks instead of 10,000
# Bad - Default prefetch for I/O-bound tasks
app.conf.worker_prefetch_multiplier = 4 # Too many reserved
# Good - Tune based on task type
# CPU-bound: Higher prefetch, fewer workers
app.conf.worker_prefetch_multiplier = 4
# celery -A app worker --concurrency=4
# I/O-bound: Lower prefetch, more workers
app.conf.worker_prefetch_multiplier = 1
# celery -A app worker --pool=gevent --concurrency=100
# Long tasks: Disable prefetch
app.conf.worker_prefetch_multiplier = 1
app.conf.task_acks_late = True
# Bad - Storing results for fire-and-forget tasks
@app.task
def send_email(to, subject, body):
mailer.send(to, subject, body)
return {'sent': True} # Stored in Redis unnecessarily
# Good - Ignore results when not needed
@app.task(ignore_result=True)
def send_email(to, subject, body):
mailer.send(to, subject, body)
# Good - Set expiration for results you need
app.conf.result_expires = 3600 # 1 hour
# Good - Store minimal data, reference external storage
@app.task
def process_large_file(file_id):
data = process(read_file(file_id))
result_key = save_to_s3(data) # Store large result externally
return {'result_key': result_key} # Store only reference
# Bad - Creating new connections per task
@app.task
def query_database(query):
conn = psycopg2.connect(...) # New connection each time
result = conn.execute(query)
conn.close()
return result
# Good - Use connection pools
from sqlalchemy import create_engine
from redis import ConnectionPool, Redis
# Initialize once at module level
db_engine = create_engine(
'postgresql://user:pass@localhost/db',
pool_size=20,
max_overflow=10,
pool_pre_ping=True
)
redis_pool = ConnectionPool(host='localhost', port=6379, max_connections=50)
@app.task
def query_database(query):
with db_engine.connect() as conn: # Uses pool
return conn.execute(query).fetchall()
@app.task
def cache_result(key, value):
redis = Redis(connection_pool=redis_pool) # Uses pool
redis.set(key, value)
# Bad - All tasks in single queue
@app.task
def critical_payment(): pass
@app.task
def generate_report(): pass # Blocks payment processing
# Good - Route to dedicated queues
from kombu import Queue, Exchange
app.conf.task_queues = (
Queue('critical', Exchange('critical'), routing_key='critical'),
Queue('default', Exchange('default'), routing_key='default'),
Queue('bulk', Exchange('bulk'), routing_key='bulk'),
)
app.conf.task_routes = {
'tasks.critical_payment': {'queue': 'critical'},
'tasks.generate_report': {'queue': 'bulk'},
}
# Run dedicated workers per queue
# celery -A app worker -Q critical --concurrency=4
# celery -A app worker -Q bulk --concurrency=2
@app.task, @shared_task)acks_late=True for critical tasksignore_result=True for fire-and-forget tasks# COMPLETE TASK DEFINITION
from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
import logging
app = Celery('tasks', broker='redis://localhost:6379/0')
logger = logging.getLogger(__name__)
@app.task(
bind=True,
name='tasks.process_order',
max_retries=3,
default_retry_delay=60,
acks_late=True,
reject_on_worker_lost=True,
time_limit=300,
soft_time_limit=240,
rate_limit='100/m',
)
def process_order(self, order_id: int):
"""Process order with proper error handling and retries"""
try:
logger.info(f"Processing order {order_id}", extra={'task_id': self.request.id})
order = get_order(order_id)
if order.status == 'processed':
return {'order_id': order_id, 'status': 'already_processed'}
result = perform_order_processing(order)
return {'order_id': order_id, 'status': 'success', 'result': result}
except SoftTimeLimitExceeded:
cleanup_processing(order_id)
raise
except TemporaryError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
except PermanentError as exc:
send_failure_notification(order_id, str(exc))
raise
from celery import chain, group, chord
# CHAIN: Sequential execution (A -> B -> C)
workflow = chain(
fetch_data.s('https://api.example.com/data'),
process_item.s(),
send_notification.s()
)
# GROUP: Parallel execution
job = group(fetch_data.s(url) for url in urls)
# CHORD: Map-Reduce (parallel + callback)
workflow = chord(
group(process_item.s(item) for item in items)
)(aggregate_results.s())
from kombu import Exchange, Queue
app = Celery('myapp')
app.conf.update(
broker_url='redis://localhost:6379/0',
broker_connection_retry_on_startup=True,
broker_pool_limit=10,
result_backend='redis://localhost:6379/1',
result_expires=3600,
task_serializer='json',
result_serializer='json',
accept_content=['json'],
task_acks_late=True,
task_reject_on_worker_lost=True,
task_time_limit=300,
task_soft_time_limit=240,
worker_prefetch_multiplier=4,
worker_max_tasks_per_child=1000,
)
from celery.exceptions import Reject
@app.task(
bind=True,
max_retries=5,
autoretry_for=(RequestException,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def call_external_api(self, url: str):
"""Auto-retry on RequestException with exponential backoff"""
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
'cleanup-temp-files': {
'task': 'tasks.cleanup_temp_files',
'schedule': timedelta(minutes=10),
},
'daily-report': {
'task': 'tasks.generate_daily_report',
'schedule': crontab(hour=3, minute=0),
},
}
# DANGEROUS: Pickle allows code execution
app.conf.task_serializer = 'pickle' # NEVER!
# SECURE: Use JSON
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
)
# Redis with TLS
app.conf.broker_url = 'redis://:password@localhost:6379/0'
app.conf.broker_use_ssl = {
'ssl_cert_reqs': 'required',
'ssl_ca_certs': '/path/to/ca.pem',
}
# RabbitMQ with TLS
app.conf.broker_url = 'amqps://user:password@localhost:5671/vhost'
from pydantic import BaseModel
class OrderData(BaseModel):
order_id: int
amount: float
@app.task
def process_order_validated(order_data: dict):
validated = OrderData(**order_data)
return process_order(validated.dict())
# DON'T
app.conf.task_serializer = 'pickle'
# DO
app.conf.task_serializer = 'json'
# DON'T: Retries increment multiple times
@app.task
def increment_counter(user_id):
user.counter += 1
user.save()
# DO: Safe to retry
@app.task
def set_counter(user_id, value):
user.counter = value
user.save()
# DON'T
@app.task
def slow_task():
external_api_call()
# DO
@app.task(time_limit=30, soft_time_limit=25)
def safe_task():
external_api_call()
# DON'T
@app.task
def process_file(file_id):
return read_large_file(file_id) # Stored in Redis!
# DO
@app.task
def process_file(file_id):
result_id = save_to_storage(read_large_file(file_id))
return {'result_id': result_id}
acks_late=True for critical workpytest tests/test_tasks.py -vpytest --cov=myapp.tasksacks_late=True for critical tasksYou are a Celery expert focused on:
Key Principles :
Weekly Installs
169
Repository
GitHub Stars
29
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykWarn
Installed on
gemini-cli148
opencode145
codex142
github-copilot141
cursor128
amp114
agent-browser 浏览器自动化工具 - Vercel Labs 命令行网页操作与测试
152,900 周安装
数据库优化器专家指南:现代性能调优、查询优化与可扩展架构设计
153 周安装
Angular RxJS 模式实战指南:掌握异步操作与响应式编程
153 周安装
XLSX 生成器 - 自动化 Excel 报表生成与模板处理工具 (Deno)
153 周安装
TanStack Query v5 性能优化指南:40条最佳实践提升应用性能
153 周安装
JSON 转视频工具 - 使用 Remotion 渲染器将 JSON 时间线规范转换为动态视频
153 周安装
Slack 代理开发指南:Chat SDK 与 Bolt for JavaScript 框架选择与部署教程
153 周安装