async-jobs by yonatangross/orchestkit
npx skills add https://github.com/yonatangross/orchestkit --skill async-jobs使用 Celery、ARQ 和 Redis 进行后台任务处理的模式。涵盖任务队列、画布工作流、调度、重试策略、速率限制和生产监控。每个类别在 references/ 目录下都有按需加载的独立规则文件。
| 类别 | 规则 | 影响 | 使用时机 |
|---|---|---|---|
| 配置 | celery-config | 高 | Celery 应用设置、代理、序列化、工作进程调优 |
| 任务路由 | task-routing | 高 | 优先级队列、多队列工作进程、动态路由 |
| 画布工作流 | canvas-workflows | 高 | 链、组、和弦、嵌套工作流 |
| 重试策略 | retry-strategies | 高 | 指数退避、幂等性、死信队列 |
| 调度 | scheduled-tasks | 中 | Celery Beat、crontab、基于数据库的调度 |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 监控 | monitoring-health | 中 | Flower、自定义事件、健康检查、指标 |
| 结果后端 | result-backends | 中 | Redis 结果、自定义状态、进度跟踪 |
| ARQ 模式 | arq-patterns | 中 | 用于 FastAPI 的异步 Redis 队列、轻量级作业 |
| Temporal 工作流 | temporal-workflows | 高 | 持久化工作流定义、Saga、信号、查询 |
| Temporal 活动 | temporal-activities | 高 | 活动模式、工作进程、心跳、测试 |
总计:9 个类别共 10 条规则
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_payment(self, order_id: str):
try:
return gateway.charge(order_id)
except TransientError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
加载更多示例:Read("${CLAUDE_SKILL_DIR}/references/quick-start-examples.md") 以获取 Celery 重试任务和 ARQ/FastAPI 集成模式。
具有安全默认值和工作者调优的生产环境 Celery 应用配置。
task_serializer="json" 进行 JSON 序列化 以确保安全task_acks_late=True 进行 延迟确认 以防止崩溃时任务丢失task_time_limit(硬限制)和 task_soft_time_limit(软限制)设置 时间限制worker_prefetch_multiplier=1 实现 公平分发task_reject_on_worker_lost=True 实现 丢失时拒绝| 决策 | 推荐 |
|---|---|
| 序列化器 | JSON(永不使用 pickle) |
| 确认模式 | 延迟确认(task_acks_late=True) |
| 预取 | 1 用于公平,4-8 用于吞吐量 |
| 时间限制 | 软限制 < 硬限制(例如,540/600) |
| 时区 | 始终使用 UTC |
具有多队列工作进程和动态路由的优先级队列配置。
queue_order_strategy: "priority" 和 0-9 级别实现 Redis 优先级| 决策 | 推荐 |
|---|---|
| 队列数量 | 3-5(关键/高/默认/低/批量) |
| 优先级级别 | 0-9,配合 Redis x-max-priority |
| 工作进程分配 | 每个队列专用工作进程 |
| 预取 | 1 用于关键队列,4-8 用于批量队列 |
| 路由 | 使用路由器类处理 5 条以上的路由规则 |
用于顺序、并行和扇入/扇出工作流的 Celery 画布原语。
si())处理忽略输入的步骤| 决策 | 推荐 |
|---|---|
| 顺序执行 | 使用 s() 的链 |
| 并行执行 | 用于独立任务的组 |
| 扇入 | 使用和弦(所有任务必须成功才能回调) |
| 忽略输入 | 使用 si() 不可变签名 |
| 链中错误 | 拒绝停止链,重试继续 |
| 部分失败 | 在和弦任务中返回错误字典 |
具有指数退避、幂等性和死信队列的重试模式。
retry_backoff=True 和 retry_backoff_max 进行 指数退避retry_jitter=True 添加 抖动 以防止惊群效应| 决策 | 推荐 |
|---|---|
| 重试延迟 | 带抖动的指数退避 |
| 最大重试次数 | 3-5 用于瞬态错误,0 用于永久错误 |
| 幂等性 | 带 TTL 的 Redis 键 |
| 失败任务 | 使用死信队列进行手动审查 |
| 单例任务 | 带 TTL 的 Redis 锁 |
使用 crontab、基于数据库的调度和重叠预防的 Celery Beat 周期性任务配置。
django-celery-beat 实现 数据库调度器 以支持动态调度| 决策 | 推荐 |
|---|---|
| 调度类型 | Crontab 用于基于时间,间隔用于频率 |
| 动态调度 | 数据库调度器(django-celery-beat) |
| 重叠 | 带超时的 Redis 锁 |
| Beat 进程 | 独立进程(非嵌入式) |
| 时区 | 始终使用 UTC |
使用 Flower、自定义信号、健康检查和 Prometheus 指标的生产环境监控。
task_prerun、task_postrun、task_failure)收集指标| 决策 | 推荐 |
|---|---|
| 仪表板 | 带持久化存储的 Flower |
| 指标 | 通过 Celery 信号接入 Prometheus |
| 健康检查 | 代理 + 工作进程 + 队列深度 |
| 告警 | 基于 task_failure 信号 |
| 自动扩缩容 | 队列深度 > 阈值时触发 |
任务结果存储、自定义状态和进度跟踪模式。
update_state() 进行实时进度报告| 决策 | 推荐 |
|---|---|
| 状态存储 | Redis 结果后端 |
| 大结果 | S3 或数据库(永不使用 Redis) |
| 进度 | 使用 update_state() 的自定义状态 |
| 结果查询 | 带状态检查的 AsyncResult |
用于 FastAPI 和简单后台任务的轻量级异步 Redis 队列。
arq 实现 原生 async/await 以集成 FastAPIstartup/shutdown 钩子实现 工作进程生命周期 管理资源enqueue_job() 从 FastAPI 路由 作业入队Job.status() 和 Job.result() 进行 作业状态 跟踪_delay=timedelta() 实现 延迟任务 以推迟执行| 决策 | 推荐 |
|---|---|
| 简单异步 | ARQ(原生异步) |
| 复杂工作流 | Celery(链、和弦) |
| 进程内快速任务 | FastAPI BackgroundTasks |
| LLM 工作流 | LangGraph(非 Celery) |
加载:Read("${CLAUDE_SKILL_DIR}/references/quick-start-examples.md") 以获取完整的工具比较表(ARQ、Celery、RQ、Dramatiq、FastAPI BackgroundTasks)。
加载详情:Read("${CLAUDE_SKILL_DIR}/references/anti-patterns.md") 以获取完整列表。
关键规则:切勿在请求处理程序中运行长任务,切勿在任务内阻塞等待结果,切勿在 Redis 中存储大结果,重试任务始终使用幂等性。
使用 Temporal.io 为可靠的分布式应用程序提供持久化执行引擎。
@workflow.defn 和确定性代码定义 工作流定义workflow.wait_condition() 设置 计时器 以支持人工介入asyncio.gather 实现 并行活动| 决策 | 推荐 |
|---|---|
| 工作流 ID | 具有业务意义、幂等 |
| 确定性 | 使用 workflow.random()、workflow.now() |
| I/O 操作 | 始终通过活动,永不直接执行 |
用于 Temporal.io I/O 操作的活动和工作进程模式。
@activity.defn 定义 活动定义 以处理所有 I/OApplicationError(non_retryable=True) 对业务错误进行 错误分类WorkflowEnvironment.start_local() 进行 测试| 决策 | 推荐 |
|---|---|
| 活动超时 | 大多数情况下使用 start_to_close |
| 错误处理 | 业务错误标记为不可重试 |
| 测试 | 使用 WorkflowEnvironment 进行集成测试 |
ork:python-backend - FastAPI、asyncio、SQLAlchemy 模式ork:langgraph - LangGraph 工作流模式(用于 LLM 工作流,非 Celery)ork:distributed-systems - 弹性模式、熔断器ork:monitoring-observability - 指标和告警加载详情:Read("${CLAUDE_SKILL_DIR}/references/capability-details.md") 以获取所有 8 项能力的完整关键词索引和问题-解决方案映射。
每周安装数
78
代码仓库
GitHub 星标数
132
首次出现
2026年2月14日
安全审计
安装于
opencode76
codex75
gemini-cli75
github-copilot74
cursor74
kimi-cli70
Patterns for background task processing with Celery, ARQ, and Redis. Covers task queues, canvas workflows, scheduling, retry strategies, rate limiting, and production monitoring. Each category has individual rule files in references/ loaded on-demand.
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Configuration | celery-config | HIGH | Celery app setup, broker, serialization, worker tuning |
| Task Routing | task-routing | HIGH | Priority queues, multi-queue workers, dynamic routing |
| Canvas Workflows | canvas-workflows | HIGH | Chain, group, chord, nested workflows |
| Retry Strategies | retry-strategies | HIGH | Exponential backoff, idempotency, dead letter queues |
| Scheduling | scheduled-tasks | MEDIUM | Celery Beat, crontab, database-backed schedules |
| Monitoring | monitoring-health | MEDIUM | Flower, custom events, health checks, metrics |
| Result Backends | result-backends | MEDIUM | Redis results, custom states, progress tracking |
| ARQ Patterns | arq-patterns | MEDIUM | Async Redis Queue for FastAPI, lightweight jobs |
| Temporal Workflows | temporal-workflows | HIGH | Durable workflow definitions, sagas, signals, queries |
| Temporal Activities | temporal-activities | HIGH | Activity patterns, workers, heartbeats, testing |
Total: 10 rules across 9 categories
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_payment(self, order_id: str):
try:
return gateway.charge(order_id)
except TransientError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
Load more examples: Read("${CLAUDE_SKILL_DIR}/references/quick-start-examples.md") for Celery retry task and ARQ/FastAPI integration patterns.
Production Celery app configuration with secure defaults and worker tuning.
task_serializer="json" for safetytask_acks_late=True to prevent task loss on crashtask_time_limit (hard) and task_soft_time_limit (soft)worker_prefetch_multiplier=1task_reject_on_worker_lost=True| Decision | Recommendation |
|---|---|
| Serializer | JSON (never pickle) |
| Ack mode | Late ack (task_acks_late=True) |
| Prefetch | 1 for fair, 4-8 for throughput |
| Time limit | soft < hard (e.g., 540/600) |
| Timezone | UTC always |
Priority queue configuration with multi-queue workers and dynamic routing.
queue_order_strategy: "priority" and 0-9 levels| Decision | Recommendation |
|---|---|
| Queue count | 3-5 (critical/high/default/low/bulk) |
| Priority levels | 0-9 with Redis x-max-priority |
| Worker assignment | Dedicated workers per queue |
| Prefetch | 1 for critical, 4-8 for bulk |
| Routing | Router class for 5+ routing rules |
Celery canvas primitives for sequential, parallel, and fan-in/fan-out workflows.
si()) for steps that ignore input| Decision | Recommendation |
|---|---|
| Sequential | Chain with s() |
| Parallel | Group for independent tasks |
| Fan-in | Chord (all must succeed for callback) |
| Ignore input | Use si() immutable signature |
| Error in chain | Reject stops chain, retry continues |
| Partial failures | Return error dict in chord tasks |
Retry patterns with exponential backoff, idempotency, and dead letter queues.
retry_backoff=True and retry_backoff_maxretry_jitter=True to prevent thundering herd| Decision | Recommendation |
|---|---|
| Retry delay | Exponential backoff with jitter |
| Max retries | 3-5 for transient, 0 for permanent |
| Idempotency | Redis key with TTL |
| Failed tasks | DLQ for manual review |
| Singleton | Redis lock with TTL |
Celery Beat periodic task configuration with crontab, database-backed schedules, and overlap prevention.
django-celery-beat for dynamic schedules| Decision | Recommendation |
|---|---|
| Schedule type | Crontab for time-based, interval for frequency |
| Dynamic | Database scheduler (django-celery-beat) |
| Overlap | Redis lock with timeout |
| Beat process | Separate process (not embedded) |
| Timezone | UTC always |
Production monitoring with Flower, custom signals, health checks, and Prometheus metrics.
task_prerun, task_postrun, task_failure) for metrics| Decision | Recommendation |
|---|---|
| Dashboard | Flower with persistent storage |
| Metrics | Prometheus via celery signals |
| Health | Broker + worker + queue depth |
| Alerting | Signal on task_failure |
| Autoscale | Queue depth > threshold |
Task result storage, custom states, and progress tracking patterns.
update_state() for real-time progress reporting| Decision | Recommendation |
|---|---|
| Status storage | Redis result backend |
| Large results | S3 or database (never Redis) |
| Progress | Custom states with update_state() |
| Result query | AsyncResult with state checks |
Lightweight async Redis Queue for FastAPI and simple background tasks.
arq for FastAPI integrationstartup/shutdown hooks for resource managementenqueue_job()Job.status() and Job.result()_delay=timedelta() for deferred execution| Decision | Recommendation |
|---|---|
| Simple async | ARQ (native async) |
| Complex workflows | Celery (chains, chords) |
| In-process quick | FastAPI BackgroundTasks |
| LLM workflows | LangGraph (not Celery) |
Load: Read("${CLAUDE_SKILL_DIR}/references/quick-start-examples.md") for the full tool comparison table (ARQ, Celery, RQ, Dramatiq, FastAPI BackgroundTasks).
Load details: Read("${CLAUDE_SKILL_DIR}/references/anti-patterns.md") for full list.
Key rules: never run long tasks in request handlers, never block on results inside tasks, never store large results in Redis, always use idempotency for retried tasks.
Durable execution engine for reliable distributed applications with Temporal.io.
@workflow.defn and deterministic codeworkflow.wait_condition() for human-in-the-loopasyncio.gather inside workflows| Decision | Recommendation |
|---|---|
| Workflow ID | Business-meaningful, idempotent |
| Determinism | Use workflow.random(), workflow.now() |
| I/O | Always via activities, never directly |
Activity and worker patterns for Temporal.io I/O operations.
@activity.defn for all I/OApplicationError(non_retryable=True) for business errorsWorkflowEnvironment.start_local()| Decision | Recommendation |
|---|---|
| Activity timeout | start_to_close for most cases |
| Error handling | Non-retryable for business errors |
| Testing | WorkflowEnvironment for integration tests |
ork:python-backend - FastAPI, asyncio, SQLAlchemy patternsork:langgraph - LangGraph workflow patterns (use for LLM workflows, not Celery)ork:distributed-systems - Resilience patterns, circuit breakersork:monitoring-observability - Metrics and alertingLoad details: Read("${CLAUDE_SKILL_DIR}/references/capability-details.md") for full keyword index and problem-solution mapping across all 8 capabilities.
Weekly Installs
78
Repository
GitHub Stars
132
First Seen
Feb 14, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode76
codex75
gemini-cli75
github-copilot74
cursor74
kimi-cli70
Azure 升级评估与自动化工具 - 轻松迁移 Functions 计划、托管层级和 SKU
104,900 周安装