npx skills add https://github.com/bobmatnyc/claude-mpm-skills --skill celeryCelery 是一个用于 Python 的分布式任务队列系统,支持跨多个工作节点异步执行后台作业。它支持调度、重试、任务工作流,并能与 Django、Flask 和 FastAPI 无缝集成。
不适用场景:
asyncio)# 基本安装
pip install celery
# 使用 Redis 代理
pip install celery[redis]
# 使用 RabbitMQ 代理
pip install celery[amqp]
# 完整功能(推荐)
pip install celery[redis,msgpack,auth,cassandra,elasticsearch,s3,sqs]
# celery_app.py
from celery import Celery
# 使用 Redis 代理创建 Celery 应用
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# 配置
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
# 定义任务
@app.task
def add(x, y):
return x + y
@app.task
def send_email(to, subject, body):
# 模拟发送电子邮件
import time
time.sleep(2)
print(f"Email sent to {to}: {subject}")
return {"status": "sent", "to": to}
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
# 启动工作节点
celery -A celery_app worker --loglevel=info
# 多个工作节点并发
celery -A celery_app worker --concurrency=4 --loglevel=info
# 指定队列的命名工作节点
celery -A celery_app worker -Q emails,reports --loglevel=info
# 异步调用任务
result = add.delay(4, 6)
# 等待结果
print(result.get(timeout=10)) # 10
# 使用选项异步应用
result = send_email.apply_async(
args=['user@example.com', 'Hello', 'Welcome!'],
countdown=60 # 60 秒后执行
)
# 检查任务状态
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
代理:存储任务的消息队列
工作节点:执行任务的进程
结果后端:任务结果的存储
Beat 调度器:周期性任务调度器
PENDING → STARTED → SUCCESS
→ RETRY → SUCCESS
→ FAILURE
# celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
# 使用身份验证
broker_url = 'redis://:password@localhost:6379/0'
# Redis Sentinel(高可用性)
broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380'
broker_transport_options = {
'master_name': 'mymaster',
'sentinel_kwargs': {'password': 'password'},
}
# Redis 连接池设置
broker_pool_limit = 10
broker_connection_retry = True
broker_connection_retry_on_startup = True
broker_connection_max_retries = 10
# 基本 RabbitMQ
broker_url = 'amqp://guest:guest@localhost:5672//'
# 使用虚拟主机
broker_url = 'amqp://user:password@localhost:5672/myvhost'
# 高可用性(多个代理)
broker_url = [
'amqp://user:password@host1:5672//',
'amqp://user:password@host2:5672//',
]
# RabbitMQ 特定设置
broker_heartbeat = 30
broker_pool_limit = 10
# AWS SQS(无服务器)
broker_url = 'sqs://'
broker_transport_options = {
'region': 'us-east-1',
'queue_name_prefix': 'myapp-',
'visibility_timeout': 3600,
'polling_interval': 1,
}
# 使用自定义凭据
import boto3
broker_transport_options = {
'region': 'us-east-1',
'predefined_queues': {
'default': {
'url': 'https://sqs.us-east-1.amazonaws.com/123456789/myapp-default',
}
}
}
from celery import Task, shared_task
from celery_app import app
# 方法 1:装饰器
@app.task
def simple_task(x, y):
return x + y
# 方法 2:共享任务(与框架无关)
@shared_task
def framework_task(data):
return process(data)
# 方法 3:任务类(高级)
class CustomTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print(f"Task {task_id} succeeded with {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(f"Task {task_id} failed: {exc}")
def on_retry(self, exc, task_id, args, kwargs, einfo):
print(f"Task {task_id} retrying: {exc}")
@app.task(base=CustomTask)
def monitored_task(x):
return x * 2
@app.task(
name='custom.task.name', # 自定义任务名称
bind=True, # 将任务实例绑定为第一个参数
ignore_result=True, # 不存储结果(性能)
max_retries=3, # 最大重试次数
default_retry_delay=60, # 重试延迟(秒)
rate_limit='100/h', # 速率限制
time_limit=300, # 硬性时间限制(终止任务)
soft_time_limit=240, # 软性时间限制(引发异常)
serializer='json', # 任务序列化器
compression='gzip', # 压缩大消息
priority=5, # 任务优先级 (0-9)
queue='high_priority', # 目标队列
routing_key='priority.high', # 路由键
acks_late=True, # 执行后确认
reject_on_worker_lost=True, # 如果工作节点死亡则拒绝
)
def advanced_task(self, data):
try:
return process(data)
except Exception as exc:
# 使用指数退避重试
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task(bind=True)
def context_aware_task(self, x, y):
# 访问任务元数据
print(f"Task ID: {self.request.id}")
print(f"Task Name: {self.name}")
print(f"Args: {self.request.args}")
print(f"Kwargs: {self.request.kwargs}")
print(f"Retries: {self.request.retries}")
print(f"Delivery Info: {self.request.delivery_info}")
# 手动重试
try:
result = risky_operation(x, y)
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=3)
return result
# delay() - 简单的异步执行
result = add.delay(4, 6)
# apply_async() - 完全控制
result = add.apply_async(
args=(4, 6),
kwargs={'extra': 'data'},
# 定时选项
countdown=60, # N 秒后执行
eta=datetime(2025, 12, 1, 10, 0), # 在特定时间执行
expires=3600, # 任务在 N 秒后过期
# 路由选项
queue='math', # 目标特定队列
routing_key='math.add', # 自定义路由键
exchange='tasks', # 目标交换机
priority=9, # 高优先级
# 执行选项
serializer='json', # 消息序列化器
compression='gzip', # 压缩负载
retry=True, # 失败时自动重试
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
},
# 任务链接
link=log_result.s(), # 成功回调
link_error=handle_error.s(), # 错误回调
)
# 检查结果
if result.ready():
print(result.get()) # 获取结果(阻塞)
print(result.result) # 获取结果(非阻塞)
from celery import signature
# 创建签名(不执行)
sig = add.signature((2, 2), countdown=10)
sig = add.s(2, 2) # 简写
# 部分参数(柯里化)
partial = add.s(2) # 固定一个参数
result = partial.apply_async(args=(4,)) # 添加第二个参数
# 不可变签名(参数无法替换)
immutable = add.si(2, 2)
# 克隆并修改
new_sig = sig.clone(countdown=60)
# 执行签名
result = sig.delay()
result = sig.apply_async()
result = sig() # 同步执行
# 基本结果检索
result = add.delay(4, 6)
value = result.get(timeout=10) # 阻塞直到完成
# 非阻塞结果检查
if result.ready():
print(result.result)
# 结果状态
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
print(result.successful()) # 如果为 SUCCESS 则返回 True
print(result.failed()) # 如果为 FAILURE 则返回 True
# 结果元数据
print(result.traceback) # 如果失败则为异常回溯
print(result.info) # 任务返回值或异常
# 忘记结果(释放内存)
result.forget()
# 撤销任务(取消)
result.revoke(terminate=True) # 终止正在运行的任务
add.AsyncResult(task_id).revoke() # 按 ID 撤销
# 定义队列
from kombu import Queue, Exchange
app.conf.task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('priority'), routing_key='priority.high'),
Queue('low_priority', Exchange('priority'), routing_key='priority.low'),
Queue('emails', Exchange('tasks'), routing_key='tasks.email'),
Queue('reports', Exchange('tasks'), routing_key='tasks.report'),
)
# 默认队列
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_routing_key = 'default'
# 将特定任务路由到队列
app.conf.task_routes = {
'myapp.tasks.send_email': {'queue': 'emails'},
'myapp.tasks.generate_report': {'queue': 'reports', 'priority': 9},
'myapp.tasks.*': {'queue': 'default'},
}
# 基于函数的路由
def route_task(name, args, kwargs, options, task=None, **kw):
if 'email' in name:
return {'queue': 'emails', 'routing_key': 'email.send'}
elif 'report' in name:
return {'queue': 'reports', 'priority': 5}
return {'queue': 'default'}
app.conf.task_routes = (route_task,)
# 工作节点消费特定队列
celery -A myapp worker -Q emails,reports --loglevel=info
# 多个工作节点用于不同队列
celery -A myapp worker -Q high_priority -c 4 --loglevel=info
celery -A myapp worker -Q default -c 2 --loglevel=info
celery -A myapp worker -Q low_priority -c 1 --loglevel=info
# 配置优先级支持
app.conf.task_queue_max_priority = 10
app.conf.task_default_priority = 5
# 发送带优先级的任务
high_priority_task.apply_async(args=(), priority=9)
low_priority_task.apply_async(args=(), priority=1)
# 基于优先级的路由
app.conf.task_routes = {
'critical_task': {'queue': 'default', 'priority': 10},
'background_task': {'queue': 'default', 'priority': 1},
}
# celery_config.py
from celery.schedules import crontab, solar
# 周期性任务计划
beat_schedule = {
# 每 30 秒运行一次
'add-every-30-seconds': {
'task': 'myapp.tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
# 每天早上 7:30 运行
'send-daily-report': {
'task': 'myapp.tasks.send_daily_report',
'schedule': crontab(hour=7, minute=30),
},
# 每周一早上运行
'weekly-cleanup': {
'task': 'myapp.tasks.cleanup',
'schedule': crontab(hour=0, minute=0, day_of_week=1),
},
# 在特定日期运行
'monthly-report': {
'task': 'myapp.tasks.monthly_report',
'schedule': crontab(hour=0, minute=0, day_of_month='1'),
'kwargs': {'month_offset': 1}
},
# 太阳时间表(日出/日落)
'wake-up-at-sunrise': {
'task': 'myapp.tasks.morning_routine',
'schedule': solar('sunrise', -37.81, 144.96), # 墨尔本
},
}
app.conf.beat_schedule = beat_schedule
from celery.schedules import crontab
# 每分钟
crontab()
# 每 15 分钟
crontab(minute='*/15')
# 每小时的第 30 分钟
crontab(minute=30)
# 每天午夜
crontab(hour=0, minute=0)
# 每个工作日下午 5 点
crontab(hour=17, minute=0, day_of_week='1-5')
# 每周一、三、五中午
crontab(hour=12, minute=0, day_of_week='mon,wed,fri')
# 每月第一天
crontab(hour=0, minute=0, day_of_month='1')
# 每月最后一天(在任务中使用逻辑,配合 day_of_month='28-31')
crontab(hour=0, minute=0, day_of_month='28-31')
# 每季度(每 3 个月)
crontab(hour=0, minute=0, day_of_month='1', month_of_year='*/3')
# 启动 beat 调度器
celery -A myapp beat --loglevel=info
# 使用自定义调度器的 beat
celery -A myapp beat --scheduler django_celery_beat.schedulers:DatabaseScheduler
# 组合工作节点和 beat(仅限开发)
celery -A myapp worker --beat --loglevel=info
pip install django-celery-beat
# settings.py (Django)
INSTALLED_APPS = [
'django_celery_beat',
]
# 迁移数据库
python manage.py migrate django_celery_beat
# 使用数据库调度器运行 beat
celery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# 通过 Django 管理界面或 ORM 创建周期性任务
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
import json
# 创建间隔计划(每 10 秒)
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.SECONDS,
)
PeriodicTask.objects.create(
interval=schedule,
name='每 10 秒导入一次源',
task='myapp.tasks.import_feed',
args=json.dumps(['https://example.com/feed']),
)
# 创建 crontab 计划
schedule, created = CrontabSchedule.objects.get_or_create(
minute='0',
hour='*/4', # 每 4 小时
day_of_week='*',
day_of_month='*',
month_of_year='*',
)
PeriodicTask.objects.create(
crontab=schedule,
name='每小时清理',
task='myapp.tasks.cleanup',
)
from celery import chain
# 顺序执行
result = chain(add.s(2, 2), add.s(4), add.s(8))()
# 等同于:add(add(add(2, 2), 4), 8)
# 结果:16
# 简写语法
result = (add.s(2, 2) | add.s(4) | add.s(8))()
# 包含不同任务的链
workflow = (
fetch_data.s(url) |
process_data.s() |
save_results.s()
)
result = workflow.apply_async()
from celery import group
# 并行执行
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
])
result = job.apply_async()
# 等待所有结果
results = result.get(timeout=10) # [4, 8, 16]
# 带回调的组
job = group([
process_item.s(item) for item in items
]) | summarize_results.s()
from celery import chord
# 带回调的组
job = chord([
fetch_url.s(url) for url in urls
])(combine_results.s())
# 示例:处理多个文件,然后合并
workflow = chord([
process_file.s(file) for file in files
])(merge_results.s())
result = workflow.apply_async()
from celery import group
# Map:将相同任务应用于参数列表
results = add.map([(2, 2), (4, 4), (8, 8)])
# Starmap:解包参数
results = add.starmap([(2, 2), (4, 4), (8, 8)])
# 等同于:
results = group([add.s(2, 2), add.s(4, 4), add.s(8, 8)])()
from celery import chain, group, chord
# 包含顺序步骤的并行处理
workflow = chain(
# 步骤 1:获取数据
fetch_data.s(source),
# 步骤 2:并行处理
group([
process_chunk.s(chunk_id) for chunk_id in range(10)
]),
# 步骤 3:聚合结果
aggregate.s(),
# 步骤 4:保存到数据库
save_results.s()
)
# 嵌套和弦
workflow = chord([
chord([
subtask.s(item) for item in chunk
])(process_chunk.s())
for chunk in chunks
])(final_callback.s())
# 真实示例:报告生成
generate_report = chain(
fetch_user_data.s(user_id),
chord([
calculate_stats.s(),
fetch_transactions.s(),
fetch_activity.s(),
])(combine_sections.s()),
render_pdf.s(),
send_email.s(user_email)
)
@app.task(
autoretry_for=(RequestException, IOError), # 自动重试这些异常
retry_kwargs={'max_retries': 5}, # 最多重试 5 次
retry_backoff=True, # 指数退避
retry_backoff_max=600, # 最大退避 10 分钟
retry_jitter=True, # 为退避添加随机性
)
def fetch_url(url):
response = requests.get(url)
response.raise_for_status()
return response.json()
@app.task(bind=True, max_retries=3)
def process_data(self, data):
try:
result = external_api_call(data)
return result
except TemporaryError as exc:
# 60 秒后重试
raise self.retry(exc=exc, countdown=60)
except PermanentError as exc:
# 不重试,记录日志并失败
logger.error(f"Permanent error: {exc}")
raise
except Exception as exc:
# 指数退避
raise self.retry(
exc=exc,
countdown=2 ** self.request.retries,
max_retries=3
)
@app.task
def on_error(request, exc, traceback):
"""任务失败时调用"""
logger.error(f"Task {request.id} failed: {exc}")
send_alert(f"Task failure: {request.task}", str(exc))
@app.task
def risky_task(data):
return process(data)
# 链接错误回调
risky_task.apply_async(
args=(data,),
link_error=on_error.s()
)
from celery import Task
class CallbackTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""处理任务失败"""
logger.error(f"Task {task_id} failed with {exc}")
# 发送通知
send_notification('Task Failed', str(exc))
def on_success(self, retval, task_id, args, kwargs):
"""处理任务成功"""
logger.info(f"Task {task_id} succeeded: {retval}")
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""处理任务重试"""
logger.warning(f"Task {task_id} retrying: {exc}")
@app.task(base=CallbackTask)
def monitored_task(x):
if x < 0:
raise ValueError("Negative value")
return x * 2
@app.task(bind=True)
def robust_task(self, data):
# 对异常进行分类
try:
return process(data)
except NetworkError as exc:
# 暂时性错误 - 重试
raise self.retry(exc=exc, countdown=60, max_retries=5)
except ValidationError as exc:
# 永久性错误 - 不重试
logger.error(f"Invalid data: {exc}")
return {'status': 'failed', 'error': str(exc)}
except DatabaseError as exc:
# 关键错误 - 使用指数退避重试
backoff = min(2 ** self.request.retries * 60, 3600)
raise self.retry(exc=exc, countdown=backoff, max_retries=10)
except Exception as exc:
# 未知错误 - 有限次重试
if self.request.retries < 3:
raise self.retry(exc=exc, countdown=120)
else:
# 超过最大重试次数 - 失败并告警
logger.critical(f"Task failed after retries: {exc}")
send_alert('Critical Task Failure', str(exc))
raise
# 启用事件
app.conf.worker_send_task_events = True
app.conf.task_send_sent_event = True
# 事件监听器
from celery import signals
@signals.task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extra):
print(f"Task {task.name}[{task_id}] starting")
@signals.task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, retval=None, **extra):
print(f"Task {task.name}[{task_id}] completed: {retval}")
@signals.task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, **extra):
print(f"Task {task_id} failed: {exception}")
@signals.task_retry.connect
def task_retry_handler(sender=None, task_id=None, reason=None, **extra):
print(f"Task {task_id} retrying: {reason}")
# 安装 Flower
pip install flower
# 启动 Flower
celery -A myapp flower --port=5555
# 访问仪表板
# http://localhost:5555
# Flower 配置
flower_basic_auth = ['admin:password']
flower_persistent = True
flower_db = 'flower.db'
flower_max_tasks = 10000
from celery_app import app
# 获取活动任务
i = app.control.inspect()
print(i.active())
# 获取计划任务
print(i.scheduled())
# 获取保留任务
print(i.reserved())
# 获取工作节点统计信息
print(i.stats())
# 获取已注册任务
print(i.registered())
# 撤销任务
app.control.revoke(task_id, terminate=True)
# 关闭工作节点
app.control.shutdown()
# 重启池
app.control.pool_restart()
# 速率限制
app.control.rate_limit('myapp.tasks.slow_task', '10/m')
# 列出活动任务
celery -A myapp inspect active
# 列出计划任务
celery -A myapp inspect scheduled
# 工作节点统计信息
celery -A myapp inspect stats
# 已注册任务
celery -A myapp inspect registered
# 撤销任务
celery -A myapp control revoke <task_id>
# 关闭工作节点
celery -A myapp control shutdown
# 清除所有任务
celery -A myapp purge
@app.task(bind=True)
def tracked_task(self, data):
from prometheus_client import Counter, Histogram
task_counter = Counter('celery_tasks_total', 'Total tasks')
task_duration = Histogram('celery_task_duration_seconds', 'Task duration')
with task_duration.time():
result = process(data)
task_counter.inc()
return result
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
# Django 应用中的任务
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_email_task(subject, message, recipient):
send_mail(subject, message, 'from@example.com', [recipient])
return f"Email sent to {recipient}"
# 在视图中使用
from myapp.tasks import send_email_task
def my_view(request):
send_email_task.delay('Hello', 'Welcome!', 'user@example.com')
return HttpResponse('Email queued')
# celery_app.py
from celery import Celery
celery_app = Celery(
'fastapi_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
@celery_app.task
def process_data(data: dict):
# 长时间运行的任务
import time
time.sleep(10)
return {"processed": data, "status": "complete"}
# main.py
from fastapi import FastAPI, BackgroundTasks
from celery_app import process_data
app = FastAPI()
@app.post("/process")
async def process_endpoint(data: dict):
# 选项 1:FastAPI BackgroundTasks(简单,进程内)
# background_tasks.add_task(process_data, data)
# 选项 2:Celery(分布式,持久化)
task = process_data.delay(data)
return {"task_id": task.id, "status": "queued"}
@app.get("/status/{task_id}")
async def check_status(task_id: str):
from celery.result import AsyncResult
task = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}
何时使用 Celery 与 FastAPI BackgroundTasks:
# celery_app.py
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
# app.py
from flask import Flask
from celery_app import make_celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1'
celery = make_celery(app)
@celery.task
def send_email(to, subject, body):
with app.app_context():
# 使用 Flask-Mail 或类似工具
mail.send(Message(subject, recipients=[to], body=body))
@app.route('/send')
def send_route():
send_email.delay('user@example.com', 'Hello', 'Welcome!')
return 'Email queued'
# conftest.py (pytest)
import pytest
from celery_app import app
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'memory://',
'result_backend': 'cache+memory://',
'task_always_eager': True, # 同步执行任务
'task_eager_propagates': True, # 传播异常
}
# 测试任务
def test_add_task():
result = add.delay(4, 6)
assert result.get() == 10
def test_task_failure():
with pytest.raises(ValueError):
failing_task.delay()
Celery is a distributed task queue system for Python that enables asynchronous execution of background jobs across multiple workers. It supports scheduling, retries, task workflows, and integrates seamlessly with Django, Flask, and FastAPI.
Don't Use When :
asyncio instead)# Basic installation
pip install celery
# With Redis broker
pip install celery[redis]
# With RabbitMQ broker
pip install celery[amqp]
# Full batteries (recommended)
pip install celery[redis,msgpack,auth,cassandra,elasticsearch,s3,sqs]
# celery_app.py
from celery import Celery
# Create Celery app with Redis broker
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# Configuration
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
# Define a task
@app.task
def add(x, y):
return x + y
@app.task
def send_email(to, subject, body):
# Simulate email sending
import time
time.sleep(2)
print(f"Email sent to {to}: {subject}")
return {"status": "sent", "to": to}
# Start worker
celery -A celery_app worker --loglevel=info
# Multiple workers with concurrency
celery -A celery_app worker --concurrency=4 --loglevel=info
# Named worker for specific queues
celery -A celery_app worker -Q emails,reports --loglevel=info
# Call task asynchronously
result = add.delay(4, 6)
# Wait for result
print(result.get(timeout=10)) # 10
# Apply async with options
result = send_email.apply_async(
args=['user@example.com', 'Hello', 'Welcome!'],
countdown=60 # Execute after 60 seconds
)
# Check task state
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
Broker : Message queue that stores tasks
Workers : Processes that execute tasks
Result Backend : Storage for task results
Beat Scheduler : Periodic task scheduler
PENDING → STARTED → SUCCESS
→ RETRY → SUCCESS
→ FAILURE
# celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
# With authentication
broker_url = 'redis://:password@localhost:6379/0'
# Redis Sentinel (high availability)
broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380'
broker_transport_options = {
'master_name': 'mymaster',
'sentinel_kwargs': {'password': 'password'},
}
# Redis connection pool settings
broker_pool_limit = 10
broker_connection_retry = True
broker_connection_retry_on_startup = True
broker_connection_max_retries = 10
# Basic RabbitMQ
broker_url = 'amqp://guest:guest@localhost:5672//'
# With virtual host
broker_url = 'amqp://user:password@localhost:5672/myvhost'
# High availability (multiple brokers)
broker_url = [
'amqp://user:password@host1:5672//',
'amqp://user:password@host2:5672//',
]
# RabbitMQ-specific settings
broker_heartbeat = 30
broker_pool_limit = 10
# AWS SQS (serverless)
broker_url = 'sqs://'
broker_transport_options = {
'region': 'us-east-1',
'queue_name_prefix': 'myapp-',
'visibility_timeout': 3600,
'polling_interval': 1,
}
# With custom credentials
import boto3
broker_transport_options = {
'region': 'us-east-1',
'predefined_queues': {
'default': {
'url': 'https://sqs.us-east-1.amazonaws.com/123456789/myapp-default',
}
}
}
from celery import Task, shared_task
from celery_app import app
# Method 1: Decorator
@app.task
def simple_task(x, y):
return x + y
# Method 2: Shared task (framework-agnostic)
@shared_task
def framework_task(data):
return process(data)
# Method 3: Task class (advanced)
class CustomTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print(f"Task {task_id} succeeded with {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(f"Task {task_id} failed: {exc}")
def on_retry(self, exc, task_id, args, kwargs, einfo):
print(f"Task {task_id} retrying: {exc}")
@app.task(base=CustomTask)
def monitored_task(x):
return x * 2
@app.task(
name='custom.task.name', # Custom task name
bind=True, # Bind task instance as first arg
ignore_result=True, # Don't store result (performance)
max_retries=3, # Max retry attempts
default_retry_delay=60, # Retry delay in seconds
rate_limit='100/h', # Rate limiting
time_limit=300, # Hard time limit (kills task)
soft_time_limit=240, # Soft time limit (raises exception)
serializer='json', # Task serializer
compression='gzip', # Compress large messages
priority=5, # Task priority (0-9)
queue='high_priority', # Target queue
routing_key='priority.high', # Routing key
acks_late=True, # Acknowledge after execution
reject_on_worker_lost=True, # Reject if worker dies
)
def advanced_task(self, data):
try:
return process(data)
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task(bind=True)
def context_aware_task(self, x, y):
# Access task metadata
print(f"Task ID: {self.request.id}")
print(f"Task Name: {self.name}")
print(f"Args: {self.request.args}")
print(f"Kwargs: {self.request.kwargs}")
print(f"Retries: {self.request.retries}")
print(f"Delivery Info: {self.request.delivery_info}")
# Manual retry
try:
result = risky_operation(x, y)
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=3)
return result
# delay() - Simple async execution
result = add.delay(4, 6)
# apply_async() - Full control
result = add.apply_async(
args=(4, 6),
kwargs={'extra': 'data'},
# Timing options
countdown=60, # Execute after N seconds
eta=datetime(2025, 12, 1, 10, 0), # Execute at specific time
expires=3600, # Task expires after N seconds
# Routing options
queue='math', # Target specific queue
routing_key='math.add', # Custom routing key
exchange='tasks', # Target exchange
priority=9, # High priority
# Execution options
serializer='json', # Message serializer
compression='gzip', # Compress payload
retry=True, # Auto-retry on failure
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
},
# Task linking
link=log_result.s(), # Success callback
link_error=handle_error.s(), # Error callback
)
# Check result
if result.ready():
print(result.get()) # Get result (blocks)
print(result.result) # Get result (non-blocking)
from celery import signature
# Create signature (doesn't execute)
sig = add.signature((2, 2), countdown=10)
sig = add.s(2, 2) # Shorthand
# Partial arguments (currying)
partial = add.s(2) # One arg fixed
result = partial.apply_async(args=(4,)) # Add second arg
# Immutable signature (args can't be replaced)
immutable = add.si(2, 2)
# Clone and modify
new_sig = sig.clone(countdown=60)
# Execute signature
result = sig.delay()
result = sig.apply_async()
result = sig() # Synchronous execution
# Basic result retrieval
result = add.delay(4, 6)
value = result.get(timeout=10) # Blocks until complete
# Non-blocking result check
if result.ready():
print(result.result)
# Result states
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
print(result.successful()) # True if SUCCESS
print(result.failed()) # True if FAILURE
# Result metadata
print(result.traceback) # Exception traceback if failed
print(result.info) # Task return value or exception
# Forget result (free memory)
result.forget()
# Revoke task (cancel)
result.revoke(terminate=True) # Kill running task
add.AsyncResult(task_id).revoke() # Revoke by ID
# Define queues
from kombu import Queue, Exchange
app.conf.task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('priority'), routing_key='priority.high'),
Queue('low_priority', Exchange('priority'), routing_key='priority.low'),
Queue('emails', Exchange('tasks'), routing_key='tasks.email'),
Queue('reports', Exchange('tasks'), routing_key='tasks.report'),
)
# Default queue
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_routing_key = 'default'
# Route specific tasks to queues
app.conf.task_routes = {
'myapp.tasks.send_email': {'queue': 'emails'},
'myapp.tasks.generate_report': {'queue': 'reports', 'priority': 9},
'myapp.tasks.*': {'queue': 'default'},
}
# Function-based routing
def route_task(name, args, kwargs, options, task=None, **kw):
if 'email' in name:
return {'queue': 'emails', 'routing_key': 'email.send'}
elif 'report' in name:
return {'queue': 'reports', 'priority': 5}
return {'queue': 'default'}
app.conf.task_routes = (route_task,)
# Worker consuming specific queues
celery -A myapp worker -Q emails,reports --loglevel=info
# Multiple workers for different queues
celery -A myapp worker -Q high_priority -c 4 --loglevel=info
celery -A myapp worker -Q default -c 2 --loglevel=info
celery -A myapp worker -Q low_priority -c 1 --loglevel=info
# Configure priority support
app.conf.task_queue_max_priority = 10
app.conf.task_default_priority = 5
# Send task with priority
high_priority_task.apply_async(args=(), priority=9)
low_priority_task.apply_async(args=(), priority=1)
# Priority-based routing
app.conf.task_routes = {
'critical_task': {'queue': 'default', 'priority': 10},
'background_task': {'queue': 'default', 'priority': 1},
}
# celery_config.py
from celery.schedules import crontab, solar
# Periodic task schedule
beat_schedule = {
# Run every 30 seconds
'add-every-30-seconds': {
'task': 'myapp.tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
# Run every morning at 7:30 AM
'send-daily-report': {
'task': 'myapp.tasks.send_daily_report',
'schedule': crontab(hour=7, minute=30),
},
# Run every Monday morning
'weekly-cleanup': {
'task': 'myapp.tasks.cleanup',
'schedule': crontab(hour=0, minute=0, day_of_week=1),
},
# Run on specific days
'monthly-report': {
'task': 'myapp.tasks.monthly_report',
'schedule': crontab(hour=0, minute=0, day_of_month='1'),
'kwargs': {'month_offset': 1}
},
# Solar schedule (sunrise/sunset)
'wake-up-at-sunrise': {
'task': 'myapp.tasks.morning_routine',
'schedule': solar('sunrise', -37.81, 144.96), # Melbourne
},
}
app.conf.beat_schedule = beat_schedule
from celery.schedules import crontab
# Every minute
crontab()
# Every 15 minutes
crontab(minute='*/15')
# Every hour at :30
crontab(minute=30)
# Every day at midnight
crontab(hour=0, minute=0)
# Every weekday at 5 PM
crontab(hour=17, minute=0, day_of_week='1-5')
# Every Monday, Wednesday, Friday at noon
crontab(hour=12, minute=0, day_of_week='mon,wed,fri')
# First day of month
crontab(hour=0, minute=0, day_of_month='1')
# Last day of month (use day_of_month='28-31' with logic in task)
crontab(hour=0, minute=0, day_of_month='28-31')
# Quarterly (every 3 months)
crontab(hour=0, minute=0, day_of_month='1', month_of_year='*/3')
# Start beat scheduler
celery -A myapp beat --loglevel=info
# Beat with custom scheduler
celery -A myapp beat --scheduler django_celery_beat.schedulers:DatabaseScheduler
# Combine worker and beat (development only)
celery -A myapp worker --beat --loglevel=info
pip install django-celery-beat
# settings.py (Django)
INSTALLED_APPS = [
'django_celery_beat',
]
# Migrate database
python manage.py migrate django_celery_beat
# Run beat with database scheduler
celery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# Create periodic task via Django admin or ORM
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
import json
# Create interval schedule (every 10 seconds)
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.SECONDS,
)
PeriodicTask.objects.create(
interval=schedule,
name='Import feed every 10 seconds',
task='myapp.tasks.import_feed',
args=json.dumps(['https://example.com/feed']),
)
# Create crontab schedule
schedule, created = CrontabSchedule.objects.get_or_create(
minute='0',
hour='*/4', # Every 4 hours
day_of_week='*',
day_of_month='*',
month_of_year='*',
)
PeriodicTask.objects.create(
crontab=schedule,
name='Hourly cleanup',
task='myapp.tasks.cleanup',
)
from celery import chain
# Sequential execution
result = chain(add.s(2, 2), add.s(4), add.s(8))()
# Equivalent to: add(add(add(2, 2), 4), 8)
# Result: 16
# Shorthand syntax
result = (add.s(2, 2) | add.s(4) | add.s(8))()
# Chain with different tasks
workflow = (
fetch_data.s(url) |
process_data.s() |
save_results.s()
)
result = workflow.apply_async()
from celery import group
# Parallel execution
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
])
result = job.apply_async()
# Wait for all results
results = result.get(timeout=10) # [4, 8, 16]
# Group with callbacks
job = group([
process_item.s(item) for item in items
]) | summarize_results.s()
from celery import chord
# Group with callback
job = chord([
fetch_url.s(url) for url in urls
])(combine_results.s())
# Example: Process multiple files, then merge
workflow = chord([
process_file.s(file) for file in files
])(merge_results.s())
result = workflow.apply_async()
from celery import group
# Map: Apply same task to list of args
results = add.map([(2, 2), (4, 4), (8, 8)])
# Starmap: Unpack arguments
results = add.starmap([(2, 2), (4, 4), (8, 8)])
# Equivalent to:
results = group([add.s(2, 2), add.s(4, 4), add.s(8, 8)])()
from celery import chain, group, chord
# Parallel processing with sequential steps
workflow = chain(
# Step 1: Fetch data
fetch_data.s(source),
# Step 2: Process in parallel
group([
process_chunk.s(chunk_id) for chunk_id in range(10)
]),
# Step 3: Aggregate results
aggregate.s(),
# Step 4: Save to database
save_results.s()
)
# Nested chords
workflow = chord([
chord([
subtask.s(item) for item in chunk
])(process_chunk.s())
for chunk in chunks
])(final_callback.s())
# Real-world example: Report generation
generate_report = chain(
fetch_user_data.s(user_id),
chord([
calculate_stats.s(),
fetch_transactions.s(),
fetch_activity.s(),
])(combine_sections.s()),
render_pdf.s(),
send_email.s(user_email)
)
@app.task(
autoretry_for=(RequestException, IOError), # Auto-retry these exceptions
retry_kwargs={'max_retries': 5}, # Max 5 retries
retry_backoff=True, # Exponential backoff
retry_backoff_max=600, # Max 10 minutes backoff
retry_jitter=True, # Add randomness to backoff
)
def fetch_url(url):
response = requests.get(url)
response.raise_for_status()
return response.json()
@app.task(bind=True, max_retries=3)
def process_data(self, data):
try:
result = external_api_call(data)
return result
except TemporaryError as exc:
# Retry after 60 seconds
raise self.retry(exc=exc, countdown=60)
except PermanentError as exc:
# Don't retry, log and fail
logger.error(f"Permanent error: {exc}")
raise
except Exception as exc:
# Exponential backoff
raise self.retry(
exc=exc,
countdown=2 ** self.request.retries,
max_retries=3
)
@app.task
def on_error(request, exc, traceback):
"""Called when task fails"""
logger.error(f"Task {request.id} failed: {exc}")
send_alert(f"Task failure: {request.task}", str(exc))
@app.task
def risky_task(data):
return process(data)
# Link error callback
risky_task.apply_async(
args=(data,),
link_error=on_error.s()
)
from celery import Task
class CallbackTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Handle task failure"""
logger.error(f"Task {task_id} failed with {exc}")
# Send notification
send_notification('Task Failed', str(exc))
def on_success(self, retval, task_id, args, kwargs):
"""Handle task success"""
logger.info(f"Task {task_id} succeeded: {retval}")
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""Handle task retry"""
logger.warning(f"Task {task_id} retrying: {exc}")
@app.task(base=CallbackTask)
def monitored_task(x):
if x < 0:
raise ValueError("Negative value")
return x * 2
@app.task(bind=True)
def robust_task(self, data):
# Categorize exceptions
try:
return process(data)
except NetworkError as exc:
# Transient error - retry
raise self.retry(exc=exc, countdown=60, max_retries=5)
except ValidationError as exc:
# Permanent error - don't retry
logger.error(f"Invalid data: {exc}")
return {'status': 'failed', 'error': str(exc)}
except DatabaseError as exc:
# Critical error - retry with exponential backoff
backoff = min(2 ** self.request.retries * 60, 3600)
raise self.retry(exc=exc, countdown=backoff, max_retries=10)
except Exception as exc:
# Unknown error - retry limited times
if self.request.retries < 3:
raise self.retry(exc=exc, countdown=120)
else:
# Max retries exceeded - fail and alert
logger.critical(f"Task failed after retries: {exc}")
send_alert('Critical Task Failure', str(exc))
raise
# Enable events
app.conf.worker_send_task_events = True
app.conf.task_send_sent_event = True
# Event listeners
from celery import signals
@signals.task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extra):
print(f"Task {task.name}[{task_id}] starting")
@signals.task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, retval=None, **extra):
print(f"Task {task.name}[{task_id}] completed: {retval}")
@signals.task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, **extra):
print(f"Task {task_id} failed: {exception}")
@signals.task_retry.connect
def task_retry_handler(sender=None, task_id=None, reason=None, **extra):
print(f"Task {task_id} retrying: {reason}")
# Install Flower
pip install flower
# Start Flower
celery -A myapp flower --port=5555
# Access dashboard
# http://localhost:5555
# Flower configuration
flower_basic_auth = ['admin:password']
flower_persistent = True
flower_db = 'flower.db'
flower_max_tasks = 10000
from celery_app import app
# Get active tasks
i = app.control.inspect()
print(i.active())
# Get scheduled tasks
print(i.scheduled())
# Get reserved tasks
print(i.reserved())
# Get worker stats
print(i.stats())
# Get registered tasks
print(i.registered())
# Revoke task
app.control.revoke(task_id, terminate=True)
# Shutdown worker
app.control.shutdown()
# Pool restart
app.control.pool_restart()
# Rate limit
app.control.rate_limit('myapp.tasks.slow_task', '10/m')
# List active tasks
celery -A myapp inspect active
# List scheduled tasks
celery -A myapp inspect scheduled
# Worker stats
celery -A myapp inspect stats
# Registered tasks
celery -A myapp inspect registered
# Revoke task
celery -A myapp control revoke <task_id>
# Shutdown workers
celery -A myapp control shutdown
# Purge all tasks
celery -A myapp purge
@app.task(bind=True)
def tracked_task(self, data):
from prometheus_client import Counter, Histogram
task_counter = Counter('celery_tasks_total', 'Total tasks')
task_duration = Histogram('celery_task_duration_seconds', 'Task duration')
with task_duration.time():
result = process(data)
task_counter.inc()
return result
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
# Task in Django app
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_email_task(subject, message, recipient):
send_mail(subject, message, 'from@example.com', [recipient])
return f"Email sent to {recipient}"
# Use in views
from myapp.tasks import send_email_task
def my_view(request):
send_email_task.delay('Hello', 'Welcome!', 'user@example.com')
return HttpResponse('Email queued')
# celery_app.py
from celery import Celery
celery_app = Celery(
'fastapi_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
@celery_app.task
def process_data(data: dict):
# Long-running task
import time
time.sleep(10)
return {"processed": data, "status": "complete"}
# main.py
from fastapi import FastAPI, BackgroundTasks
from celery_app import process_data
app = FastAPI()
@app.post("/process")
async def process_endpoint(data: dict):
# Option 1: FastAPI BackgroundTasks (simple, in-process)
# background_tasks.add_task(process_data, data)
# Option 2: Celery (distributed, persistent)
task = process_data.delay(data)
return {"task_id": task.id, "status": "queued"}
@app.get("/status/{task_id}")
async def check_status(task_id: str):
from celery.result import AsyncResult
task = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}
When to Use Celery vs FastAPI BackgroundTasks :
# celery_app.py
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
# app.py
from flask import Flask
from celery_app import make_celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1'
celery = make_celery(app)
@celery.task
def send_email(to, subject, body):
with app.app_context():
# Use Flask-Mail or similar
mail.send(Message(subject, recipients=[to], body=body))
@app.route('/send')
def send_route():
send_email.delay('user@example.com', 'Hello', 'Welcome!')
return 'Email queued'
# conftest.py (pytest)
import pytest
from celery_app import app
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'memory://',
'result_backend': 'cache+memory://',
'task_always_eager': True, # Execute tasks synchronously
'task_eager_propagates': True, # Propagate exceptions
}
# Test tasks
def test_add_task():
result = add.delay(4, 6)
assert result.get() == 10
def test_task_failure():
with pytest.raises(ValueError):
failing_task.delay()
# conftest.py
import pytest
from celery_app import app
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'redis://localhost:6379/15', # Test database
'result_backend': 'redis://localhost:6379/15',
}
@pytest.fixture
def celery_worker(celery_app):
"""Start worker for tests"""
with celery_app.Worker() as worker:
yield worker
def test_async_task(celery_worker):
result = async_task.delay(data)
assert result.get(timeout=10) == expected
from unittest.mock import patch, MagicMock
@app.task
def fetch_and_process(url):
response = requests.get(url)
return process(response.json())
def test_fetch_and_process():
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = {'data': 'test'}
result = fetch_and_process.delay('http://example.com')
assert result.get() == expected_result
mock_get.assert_called_once_with('http://example.com')
from celery.schedules import crontab
def test_periodic_task_schedule():
from celery_app import app
schedule = app.conf.beat_schedule['daily-report']
assert schedule['task'] == 'myapp.tasks.daily_report'
assert schedule['schedule'] == crontab(hour=0, minute=0)
def test_periodic_task_execution():
# Test task logic directly
result = daily_report()
assert result['status'] == 'complete'
import pytest
from celery_app import app
@pytest.fixture(scope='module')
def celery_app():
app.conf.update(
broker_url='redis://localhost:6379/15',
result_backend='redis://localhost:6379/15',
)
return app
@pytest.fixture(scope='module')
def celery_worker(celery_app):
with celery_app.Worker() as worker:
yield worker
def test_workflow(celery_worker):
from celery import chain
workflow = chain(
fetch_data.s(url),
process_data.s(),
save_results.s()
)
result = workflow.apply_async()
output = result.get(timeout=30)
assert output['status'] == 'saved'
# Production worker with autoscaling
celery -A myapp worker \
--autoscale=10,3 \
--max-tasks-per-child=1000 \
--time-limit=300 \
--soft-time-limit=240 \
--loglevel=info \
--logfile=/var/log/celery/worker.log \
--pidfile=/var/run/celery/worker.pid
# Multiple specialized workers
celery multi start \
worker1 -A myapp -Q high_priority -c 4 --max-tasks-per-child=100 \
worker2 -A myapp -Q default -c 2 --max-tasks-per-child=1000 \
worker3 -A myapp -Q low_priority -c 1 --autoscale=3,1
# Graceful shutdown
celery multi stop worker1 worker2 worker3
celery multi stopwait worker1 worker2 worker3 # Wait for tasks to finish
# production_config.py
import os
# Broker settings
broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
broker_connection_retry_on_startup = True
broker_pool_limit = 50
# Result backend
result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
result_expires = 3600 # 1 hour
# Serialization
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True
# Performance
worker_prefetch_multiplier = 4 # Tasks to prefetch per worker
worker_max_tasks_per_child = 1000 # Restart worker after N tasks (prevent memory leaks)
task_acks_late = True # Acknowledge after task completes
task_reject_on_worker_lost = True # Requeue if worker dies
# Reliability
task_track_started = True # Track when task starts
task_time_limit = 300 # 5 minutes hard limit
task_soft_time_limit = 240 # 4 minutes soft limit
# Logging
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
# /etc/systemd/system/celery.service
[Unit]
Description=Celery Service
After=network.target redis.target
[Service]
Type=forking
User=celery
Group=celery
WorkingDirectory=/opt/myapp
Environment="PATH=/opt/myapp/venv/bin"
ExecStart=/opt/myapp/venv/bin/celery multi start worker1 \
-A myapp \
--pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log \
--loglevel=INFO
ExecStop=/opt/myapp/venv/bin/celery multi stopwait worker1 \
--pidfile=/var/run/celery/%n.pid
ExecReload=/opt/myapp/venv/bin/celery multi restart worker1 \
-A myapp \
--pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log \
--loglevel=INFO
Restart=always
[Install]
WantedBy=multi-user.target
# /etc/systemd/system/celerybeat.service
[Unit]
Description=Celery Beat Service
After=network.target redis.target
[Service]
Type=simple
User=celery
Group=celery
WorkingDirectory=/opt/myapp
Environment="PATH=/opt/myapp/venv/bin"
ExecStart=/opt/myapp/venv/bin/celery -A myapp beat \
--loglevel=INFO \
--pidfile=/var/run/celery/beat.pid
Restart=always
[Install]
WantedBy=multi-user.target
# Install
pip install sentry-sdk
# Configuration
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init(
dsn="https://your-sentry-dsn",
integrations=[CeleryIntegration()],
traces_sample_rate=0.1, # 10% of transactions
)
# Tasks are automatically tracked
@app.task
def my_task(x):
# Exceptions automatically sent to Sentry
return risky_operation(x)
# Global rate limiting
app.conf.task_default_rate_limit = '100/m' # 100 tasks per minute
# Per-task rate limiting
@app.task(rate_limit='10/m')
def rate_limited_task(x):
return expensive_operation(x)
# Dynamic rate limiting
app.control.rate_limit('myapp.tasks.slow_task', '5/m')
# Token bucket rate limiting
@app.task(rate_limit='10/s')
def api_call(endpoint):
return requests.get(endpoint)
# health.py
from celery_app import app
def check_celery_health():
"""Health check endpoint"""
try:
# Ping workers
i = app.control.inspect()
stats = i.stats()
if not stats:
return {'status': 'unhealthy', 'reason': 'No workers available'}
# Check broker connection
result = app.control.ping(timeout=1.0)
if not result:
return {'status': 'unhealthy', 'reason': 'Workers not responding'}
return {'status': 'healthy', 'workers': len(stats)}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
# FastAPI health endpoint
@app.get("/health/celery")
async def celery_health():
return check_celery_health()
# Use ignore_result for fire-and-forget tasks
@app.task(ignore_result=True)
def send_notification(user_id, message):
# Don't need result, save backend overhead
notify(user_id, message)
# Compression for large payloads
@app.task(compression='gzip')
def process_large_data(data):
return analyze(data)
# Serialization choice
@app.task(serializer='msgpack') # Faster than JSON
def fast_task(data):
return process(data)
# Worker concurrency
worker_concurrency = 4 # CPU-bound: num_cores
worker_concurrency = 20 # I/O-bound: higher value
# Prefetch multiplier (how many tasks to prefetch)
worker_prefetch_multiplier = 4 # Balance: 4x concurrency
# Task acknowledgment
task_acks_late = True # Acknowledge after completion (reliability)
task_acks_late = False # Acknowledge on receipt (performance)
# Memory management
worker_max_tasks_per_child = 1000 # Restart worker after N tasks
worker_max_memory_per_child = 200000 # Restart after 200MB
# Use Redis instead of database for results
result_backend = 'redis://localhost:6379/1'
# If using database, optimize
result_backend = 'db+postgresql://user:pass@localhost/celery'
database_engine_options = {
'pool_size': 20,
'pool_recycle': 3600,
}
# Reduce result expiry time
result_expires = 3600 # 1 hour instead of default 24 hours
from celery import group
# Bad: One task per item (overhead)
for item in large_list:
process_item.delay(item)
# Good: Chunk items
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
@app.task
def process_batch(items):
return [process_item(item) for item in items]
# Process in batches of 100
job = group(process_batch.s(chunk) for chunk in chunks(large_list, 100))
result = job.apply_async()
# Redis connection pool
broker_pool_limit = 50 # Max connections to broker
redis_max_connections = 50
# Database connection pool
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://user:pass@localhost/db',
poolclass=QueuePool,
pool_size=20,
max_overflow=0,
)
@app.task(bind=True, max_retries=3)
def send_email_task(self, to, subject, body, attachments=None):
try:
msg = EmailMessage(subject, body, 'from@example.com', [to])
if attachments:
for filename, content, mimetype in attachments:
msg.attach(filename, content, mimetype)
msg.send()
return {'status': 'sent', 'to': to}
except SMTPException as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# Bulk email with rate limiting
@app.task(rate_limit='100/m')
def send_bulk_email(recipients, subject, template):
for recipient in recipients:
send_email_task.delay(recipient, subject, render_template(template, recipient))
@app.task(bind=True, time_limit=600)
def generate_report(self, report_type, user_id, start_date, end_date):
# Update progress
self.update_state(state='PROGRESS', meta={'current': 0, 'total': 100})
# Fetch data
data = fetch_report_data(report_type, start_date, end_date)
self.update_state(state='PROGRESS', meta={'current': 30, 'total': 100})
# Generate PDF
pdf = render_pdf(data)
self.update_state(state='PROGRESS', meta={'current': 70, 'total': 100})
# Upload to S3
url = upload_to_s3(pdf, f'reports/{user_id}/{report_type}.pdf')
self.update_state(state='PROGRESS', meta={'current': 90, 'total': 100})
# Send notification
send_email_task.delay(
get_user_email(user_id),
'Report Ready',
f'Your report is ready: {url}'
)
return {'status': 'complete', 'url': url}
# Check progress
from celery.result import AsyncResult
task = AsyncResult(task_id)
if task.state == 'PROGRESS':
print(task.info) # {'current': 30, 'total': 100}
from celery import chain, group
@app.task
def fetch_data(source):
return download(source)
@app.task
def clean_data(raw_data):
return clean(raw_data)
@app.task
def transform_data(clean_data):
return transform(clean_data)
@app.task
def load_data(transformed_data):
save_to_database(transformed_data)
return {'status': 'loaded', 'rows': len(transformed_data)}
# ETL pipeline
etl_pipeline = chain(
fetch_data.s('https://api.example.com/data'),
clean_data.s(),
transform_data.s(),
load_data.s()
)
result = etl_pipeline.apply_async()
@app.task(bind=True, autoretry_for=(RequestException,), max_retries=5)
def process_webhook(self, webhook_data):
# Validate signature
if not verify_signature(webhook_data):
raise ValueError("Invalid signature")
# Process event
event_type = webhook_data['type']
if event_type == 'payment.success':
update_order_status(webhook_data['order_id'], 'paid')
send_confirmation_email.delay(webhook_data['customer_email'])
elif event_type == 'payment.failed':
notify_admin.delay('Payment Failed', webhook_data)
return {'status': 'processed', 'event': event_type}
# FastAPI webhook endpoint
@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
data = await request.json()
process_webhook.delay(data)
return {"status": "queued"}
from celery import group, chord
@app.task
def resize_image(image_path, size):
from PIL import Image
img = Image.open(image_path)
img.thumbnail(size)
output_path = f"{image_path}_{size[0]}x{size[1]}.jpg"
img.save(output_path)
return output_path
@app.task
def upload_to_cdn(image_paths):
urls = []
for path in image_paths:
url = cdn_upload(path)
urls.append(url)
return urls
# Generate multiple sizes and upload
def process_uploaded_image(image_path):
sizes = [(800, 600), (400, 300), (200, 150), (100, 100)]
workflow = chord([
resize_image.s(image_path, size) for size in sizes
])(upload_to_cdn.s())
return workflow.apply_async()
RQ : Simpler Redis-only task queue
When to use RQ :
When to use Celery :
Complex workflows (chains, chords)
Multiple broker options
Advanced routing and priorities
Large scale (>1000 tasks/min)
Periodic tasks
from redis import Redis from rq import Queue
redis_conn = Redis() q = Queue(connection=redis_conn)
job = q.enqueue(my_function, arg1, arg2) result = job.result
Huey : Lightweight task queue with minimal dependencies
When to use Huey :
When to use Celery :
Enterprise-scale applications
Complex task dependencies
Multiple broker/backend options
Advanced monitoring needs
from huey import RedisHuey
huey = RedisHuey('myapp')
@huey.task() def add(a, b): return a + b
result = add(1, 2)
Dramatiq : Modern alternative focusing on reliability
When to use Dramatiq :
When to use Celery :
Mature ecosystem
More broker options
Canvas workflows
Larger community
import dramatiq
@dramatiq.actor def add(x, y): return x + y
add.send(1, 2)
AWS Lambda, Google Cloud Functions, Azure Functions
When to use Cloud Functions :
When to use Celery :
Idempotency : Tasks should be safe to run multiple times
@app.task
def process_order(order_id):
order = Order.objects.get(id=order_id)
if order.status == 'processed':
return # Already processed, skip
order.process()
order.status = 'processed'
order.save()
Small, Focused Tasks : One responsibility per task
# Bad: Monolithic task
@app.task
def process_user(user_id):
send_welcome_email(user_id)
create_profile(user_id)
setup_notifications(user_id)
# Good: Separate tasks
@app.task
def send_welcome_email(user_id):
...
@app.task
def create_profile(user_id):
...
workflow = group([
send_welcome_email.s(user_id),
create_profile.s(user_id),
setup_notifications.s(user_id)
])
Avoid Database Objects in Arguments : Use IDs instead
# Bad
@app.task
def process_user(user): # User object
...
# Good
@app.task
def process_user(user_id):
user = User.objects.get(id=user_id)
...
Set Time Limits : Prevent runaway tasks
ignore_result=True: For tasks that don't need resultsSymptoms : Tasks queued but not processing
Diagnosis :
# Check if workers are running
celery -A myapp inspect active
# Check worker stats
celery -A myapp inspect stats
# Check registered tasks
celery -A myapp inspect registered
Solutions :
celery -A myapp workerSymptoms : Tasks show SUCCESS but don't work
Diagnosis :
# Enable task tracking
app.conf.task_track_started = True
# Check task result and traceback
result = task.delay()
if result.failed():
print(result.traceback)
Solutions :
celery -A myapp worker --loglevel=debugtask_eager_propagates = True in testsSymptoms : Worker memory grows over time
Solutions :
# Restart workers after N tasks
worker_max_tasks_per_child = 1000
# Restart on memory limit
worker_max_memory_per_child = 200000 # 200MB
Symptoms : Tasks taking longer than expected
Diagnosis :
# Add timing
import time
@app.task(bind=True)
def timed_task(self):
start = time.time()
result = slow_operation()
duration = time.time() - start
logger.info(f"Task {self.request.id} took {duration}s")
return result
Solutions :
Symptoms : Tasks not reaching workers
Diagnosis :
# Test broker connection
python -c "from celery_app import app; print(app.connection().connect())"
Solutions :
redis-cli ping or rabbitmqctl statusbroker_connection_retry_on_startup = TrueSymptoms : result.get() returns None
Solutions :
ignore_result=Trueresult_expires)Symptoms : Periodic tasks not running
Diagnosis :
# Check beat is running
ps aux | grep celery | grep beat
# Check beat schedule
celery -A myapp inspect scheduled
Solutions :
beat_schedule configurationSymptoms : Workers die unexpectedly
Solutions :
worker_max_tasks_per_child to prevent memory leaksSymptoms : Tasks accumulating in queue
Solutions :
from celery import Task
class DatabaseTask(Task):
"""Task that manages database connections"""
_db = None
@property
def db(self):
if self._db is None:
self._db = create_db_connection()
return self._db
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""Close connection after task"""
if self._db is not None:
self._db.close()
@app.task(base=DatabaseTask)
def db_task(query):
return db_task.db.execute(query)
from kombu.serialization import register
def my_encoder(obj):
# Custom encoding logic
return json.dumps(obj)
def my_decoder(data):
# Custom decoding logic
return json.loads(data)
register('myjson', my_encoder, my_decoder,
content_type='application/x-myjson',
content_encoding='utf-8')
app.conf.task_serializer = 'myjson'
class BaseTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
send_alert(f"Task {self.name} failed", str(exc))
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.warning(f"Task {self.name} retrying")
@app.task(base=BaseTask)
def monitored_task():
return perform_work()
End of Celery Skill Documentation
For more information:
When using Celery, these skills enhance your workflow:
[Full documentation available in these skills if deployed in your bundle]
Weekly Installs
89
Repository
GitHub Stars
18
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykWarn
Installed on
gemini-cli67
opencode67
claude-code65
codex65
github-copilot64
cursor63
通过 LiteLLM 代理让 Claude Code 对接 GitHub Copilot 运行 | 高级变通方案指南
43,100 周安装
@app.task(time_limit=300, soft_time_limit=240)
def bounded_task():
...