重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
background-job-orchestrator by erichowens/some_claude_skills
npx skills add https://github.com/erichowens/some_claude_skills --skill background-job-orchestrator专长于设计和实现生产级后台作业系统,用于处理长时间运行的任务而不阻塞 API 响应。
✅ 适用于:
❌ 不适用于:
Does this task:
├── Take >5 seconds? → Background job
├── Need to retry on failure? → Background job
├── Run on a schedule? → Background job (cron pattern)
├── Block user interaction? → Background job
├── Process in batches? → Background job
└── Return immediately? → Keep synchronous
使用时机:
为何选择 BullMQ 而非 Bull:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
使用时机:
替代方案:
使用时机:
新手想法:"重试 3 次,然后静默失败"
问题:失败的作业消失,没有可见性或恢复路径。
正确方法:
// BullMQ with dead letter queue
const queue = new Queue('email-queue', {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: 100, // Keep last 100 successful
removeOnFail: false // Keep all failed for inspection
}
});
// Monitor failed jobs
const failedJobs = await queue.getFailed();
时间线:
症状:API 端点等待作业完成
问题:
// ❌ WRONG - Blocks API response
app.post('/send-email', async (req, res) => {
await sendEmail(req.body.to, req.body.subject);
res.json({ success: true });
});
为何错误:超时、用户体验差、浪费服务器资源
正确方法:
// ✅ RIGHT - Queue and return immediately
app.post('/send-email', async (req, res) => {
const job = await emailQueue.add('send', {
to: req.body.to,
subject: req.body.subject
});
res.json({
success: true,
jobId: job.id,
status: 'queued'
});
});
// Separate worker processes the job
worker.process('send', async (job) => {
await sendEmail(job.data.to, job.data.subject);
});
问题:作业运行两次 → 重复扣费、重复发送邮件
发生原因:
正确方法:
// ✅ Idempotent job with deduplication key
await queue.add('charge-payment', {
userId: 123,
amount: 50.00
}, {
jobId: `payment-${orderId}`, // Prevents duplicates
attempts: 3
});
// In worker: Check if already processed
worker.process('charge-payment', async (job) => {
const { userId, amount } = job.data;
// Check idempotency
const existing = await db.payments.findOne({
jobId: job.id
});
if (existing) {
return existing; // Already processed
}
// Process payment
const result = await stripe.charges.create({...});
// Store idempotency record
await db.payments.create({
jobId: job.id,
result
});
return result;
});
问题:压垮第三方 API 或耗尽配额
症状:来自 Sendgrid、Stripe 等的"超出速率限制"错误
正确方法:
// BullMQ rate limiting
const queue = new Queue('api-calls', {
limiter: {
max: 100, // Max 100 jobs
duration: 60000 // Per 60 seconds
}
});
// Or: Priority-based rate limits
await queue.add('send-email', data, {
priority: user.isPremium ? 1 : 10,
rateLimiter: {
max: user.isPremium ? 1000 : 100,
duration: 3600000 // Per hour
}
});
问题:单个 Worker 无法跟上队列深度
症状:队列积压,作业延迟数小时/数天
正确方法:
// Horizontal scaling with multiple workers
const worker = new Worker('email-queue', async (job) => {
await processEmail(job.data);
}, {
connection: redis,
concurrency: 5 // Process 5 jobs concurrently per worker
});
// Run multiple worker processes (PM2, Kubernetes, etc.)
// Each worker processes concurrency * num_workers jobs
监控:
// Set up alerts for queue depth
setInterval(async () => {
const waiting = await queue.getWaitingCount();
if (waiting > 1000) {
alert('Queue depth exceeds 1000, scale workers!');
}
}, 60000);
// Queue setup
const emailQueue = new Queue('email-campaign', { connection: redis });
// Enqueue batch
async function sendCampaign(userIds: number[], template: string) {
const jobs = userIds.map(userId => ({
name: 'send',
data: { userId, template },
opts: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 }
}
}));
await emailQueue.addBulk(jobs);
}
// Worker with retry logic
const worker = new Worker('email-campaign', async (job) => {
const { userId, template } = job.data;
const user = await db.users.findById(userId);
const email = renderTemplate(template, user);
try {
await sendgrid.send({
to: user.email,
subject: email.subject,
html: email.body
});
} catch (error) {
if (error.code === 'ECONNREFUSED') {
throw error; // Retry
}
// Invalid email, don't retry
console.error(`Invalid email for user ${userId}`);
}
}, {
connection: redis,
concurrency: 10
});
// Daily report at 9 AM
await queue.add('daily-report', {
type: 'sales',
recipients: ['admin@company.com']
}, {
repeat: {
pattern: '0 9 * * *', // Cron syntax
tz: 'America/New_York'
}
});
// Worker generates and emails report
worker.process('daily-report', async (job) => {
const { type, recipients } = job.data;
const data = await generateReport(type);
const pdf = await createPDF(data);
await emailQueue.add('send', {
to: recipients,
subject: `Daily ${type} Report`,
attachments: [{ filename: 'report.pdf', content: pdf }]
});
});
// Multi-stage job with progress tracking
await videoQueue.add('transcode', {
videoId: 123,
formats: ['720p', '1080p', '4k']
}, {
attempts: 2,
timeout: 3600000 // 1 hour timeout
});
worker.process('transcode', async (job) => {
const { videoId, formats } = job.data;
for (let i = 0; i < formats.length; i++) {
const format = formats[i];
// Update progress
await job.updateProgress((i / formats.length) * 100);
// Transcode
await ffmpeg.transcode(videoId, format);
}
await job.updateProgress(100);
});
// Client polls for progress
app.get('/videos/:id/status', async (req, res) => {
const job = await queue.getJob(req.params.jobId);
res.json({
state: await job.getState(),
progress: job.progress
});
});
// Queue health dashboard
async function getQueueMetrics() {
const [waiting, active, completed, failed, delayed] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount()
]);
return {
waiting, // Jobs waiting to be processed
active, // Jobs currently processing
completed, // Successfully completed
failed, // Failed after retries
delayed, // Scheduled for future
health: waiting < 1000 && failed < 100 ? 'healthy' : 'degraded'
};
}
// Development: Monitor jobs visually
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(videoQueue)
],
serverAdapter
});
app.use('/admin/queues', serverAdapter.getRouter());
// Visit http://localhost:3000/admin/queues
□ Dead letter queue configured
□ Retry strategy with exponential backoff
□ Job timeout limits set
□ Rate limiting for third-party APIs
□ Idempotency keys for critical operations
□ Worker concurrency tuned (CPU cores * 2)
□ Horizontal scaling configured (multiple workers)
□ Queue depth monitoring with alerts
□ Failed job inspection workflow
□ Job data doesn't contain PII in logs
□ Redis persistence enabled (AOF or RDB)
□ Graceful shutdown handling (SIGTERM)
| 场景 | 使用后台作业? |
|---|---|
| 注册时发送欢迎邮件 | ✅ 是 - 可能需要 2-5 秒 |
| 信用卡扣款 | ⚠️ 可能 - 取决于支付服务商的延迟 |
| 生成 PDF 报告(30 秒) | ✅ 是 - 绝对是后台任务 |
| 从数据库获取用户资料 | ❌ 否 - 毫秒级,保持同步 |
| 处理视频上传(5 分钟) | ✅ 是 - 始终是后台任务 |
| 验证表单输入 | ❌ 否 - 同步验证 |
| 每日定时任务 | ✅ 是 - 使用可重复作业 |
| 实时聊天消息 | ❌ 否 - 使用 WebSockets |
| 功能 | BullMQ | Celery | AWS SQS |
|---|---|---|---|
| 语言 | Node.js | Python | 任意 (HTTP API) |
| 后端 | Redis | Redis/RabbitMQ/SQS | 托管服务 |
| 优先级 | ✅ | ✅ | ✅ |
| 速率限制 | ✅ | ❌ | ✅ (通过属性) |
| 重复/Cron | ✅ | ✅ (celery-beat) | ❌ (使用 EventBridge) |
| UI 仪表板 | Bull Board | Flower | CloudWatch |
| 工作流 | ❌ | ✅ (链、组) | ❌ |
| 学习曲线 | 中等 | 中等 | 低 |
| 成本 | Redis 托管 | Redis 托管 | $0.40/百万请求 |
/references/bullmq-patterns.md - 高级 BullMQ 模式和示例/references/celery-workflows.md - Celery 链、组与和弦/references/job-observability.md - 监控、告警和调试scripts/setup_bullmq.sh - 使用 Redis 初始化 BullMQscripts/queue_health_check.ts - 队列指标仪表板scripts/retry_failed_jobs.ts - 批量重试失败作业本技能指导:后台作业实现 | 队列架构 | 重试策略 | Worker 扩展 | 作业可观测性
每周安装次数
51
代码仓库
GitHub Stars
78
首次出现
Jan 24, 2026
安全审计
安装于
codex44
gemini-cli44
cursor43
opencode42
github-copilot40
cline36
Expert in designing and implementing production-grade background job systems that handle long-running tasks without blocking API responses.
✅ Use for :
❌ NOT for :
Does this task:
├── Take >5 seconds? → Background job
├── Need to retry on failure? → Background job
├── Run on a schedule? → Background job (cron pattern)
├── Block user interaction? → Background job
├── Process in batches? → Background job
└── Return immediately? → Keep synchronous
When to use :
Why BullMQ over Bull :
When to use :
Alternatives :
When to use :
Novice thinking : "Retry 3 times, then fail silently"
Problem : Failed jobs disappear with no visibility or recovery path.
Correct approach :
// BullMQ with dead letter queue
const queue = new Queue('email-queue', {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: 100, // Keep last 100 successful
removeOnFail: false // Keep all failed for inspection
}
});
// Monitor failed jobs
const failedJobs = await queue.getFailed();
Timeline :
Symptom : API endpoint waits for job completion
Problem :
// ❌ WRONG - Blocks API response
app.post('/send-email', async (req, res) => {
await sendEmail(req.body.to, req.body.subject);
res.json({ success: true });
});
Why wrong : Timeout, poor UX, wastes server resources
Correct approach :
// ✅ RIGHT - Queue and return immediately
app.post('/send-email', async (req, res) => {
const job = await emailQueue.add('send', {
to: req.body.to,
subject: req.body.subject
});
res.json({
success: true,
jobId: job.id,
status: 'queued'
});
});
// Separate worker processes the job
worker.process('send', async (job) => {
await sendEmail(job.data.to, job.data.subject);
});
Problem : Job runs twice → duplicate charges, double emails
Why it happens :
Correct approach :
// ✅ Idempotent job with deduplication key
await queue.add('charge-payment', {
userId: 123,
amount: 50.00
}, {
jobId: `payment-${orderId}`, // Prevents duplicates
attempts: 3
});
// In worker: Check if already processed
worker.process('charge-payment', async (job) => {
const { userId, amount } = job.data;
// Check idempotency
const existing = await db.payments.findOne({
jobId: job.id
});
if (existing) {
return existing; // Already processed
}
// Process payment
const result = await stripe.charges.create({...});
// Store idempotency record
await db.payments.create({
jobId: job.id,
result
});
return result;
});
Problem : Overwhelm third-party APIs or exhaust quotas
Symptom : "Rate limit exceeded" errors from Sendgrid, Stripe, etc.
Correct approach :
// BullMQ rate limiting
const queue = new Queue('api-calls', {
limiter: {
max: 100, // Max 100 jobs
duration: 60000 // Per 60 seconds
}
});
// Or: Priority-based rate limits
await queue.add('send-email', data, {
priority: user.isPremium ? 1 : 10,
rateLimiter: {
max: user.isPremium ? 1000 : 100,
duration: 3600000 // Per hour
}
});
Problem : Single worker can't keep up with queue depth
Symptom : Queue backs up, jobs delayed hours/days
Correct approach :
// Horizontal scaling with multiple workers
const worker = new Worker('email-queue', async (job) => {
await processEmail(job.data);
}, {
connection: redis,
concurrency: 5 // Process 5 jobs concurrently per worker
});
// Run multiple worker processes (PM2, Kubernetes, etc.)
// Each worker processes concurrency * num_workers jobs
Monitoring :
// Set up alerts for queue depth
setInterval(async () => {
const waiting = await queue.getWaitingCount();
if (waiting > 1000) {
alert('Queue depth exceeds 1000, scale workers!');
}
}, 60000);
// Queue setup
const emailQueue = new Queue('email-campaign', { connection: redis });
// Enqueue batch
async function sendCampaign(userIds: number[], template: string) {
const jobs = userIds.map(userId => ({
name: 'send',
data: { userId, template },
opts: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 }
}
}));
await emailQueue.addBulk(jobs);
}
// Worker with retry logic
const worker = new Worker('email-campaign', async (job) => {
const { userId, template } = job.data;
const user = await db.users.findById(userId);
const email = renderTemplate(template, user);
try {
await sendgrid.send({
to: user.email,
subject: email.subject,
html: email.body
});
} catch (error) {
if (error.code === 'ECONNREFUSED') {
throw error; // Retry
}
// Invalid email, don't retry
console.error(`Invalid email for user ${userId}`);
}
}, {
connection: redis,
concurrency: 10
});
// Daily report at 9 AM
await queue.add('daily-report', {
type: 'sales',
recipients: ['admin@company.com']
}, {
repeat: {
pattern: '0 9 * * *', // Cron syntax
tz: 'America/New_York'
}
});
// Worker generates and emails report
worker.process('daily-report', async (job) => {
const { type, recipients } = job.data;
const data = await generateReport(type);
const pdf = await createPDF(data);
await emailQueue.add('send', {
to: recipients,
subject: `Daily ${type} Report`,
attachments: [{ filename: 'report.pdf', content: pdf }]
});
});
// Multi-stage job with progress tracking
await videoQueue.add('transcode', {
videoId: 123,
formats: ['720p', '1080p', '4k']
}, {
attempts: 2,
timeout: 3600000 // 1 hour timeout
});
worker.process('transcode', async (job) => {
const { videoId, formats } = job.data;
for (let i = 0; i < formats.length; i++) {
const format = formats[i];
// Update progress
await job.updateProgress((i / formats.length) * 100);
// Transcode
await ffmpeg.transcode(videoId, format);
}
await job.updateProgress(100);
});
// Client polls for progress
app.get('/videos/:id/status', async (req, res) => {
const job = await queue.getJob(req.params.jobId);
res.json({
state: await job.getState(),
progress: job.progress
});
});
// Queue health dashboard
async function getQueueMetrics() {
const [waiting, active, completed, failed, delayed] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount()
]);
return {
waiting, // Jobs waiting to be processed
active, // Jobs currently processing
completed, // Successfully completed
failed, // Failed after retries
delayed, // Scheduled for future
health: waiting < 1000 && failed < 100 ? 'healthy' : 'degraded'
};
}
// Development: Monitor jobs visually
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(videoQueue)
],
serverAdapter
});
app.use('/admin/queues', serverAdapter.getRouter());
// Visit http://localhost:3000/admin/queues
□ Dead letter queue configured
□ Retry strategy with exponential backoff
□ Job timeout limits set
□ Rate limiting for third-party APIs
□ Idempotency keys for critical operations
□ Worker concurrency tuned (CPU cores * 2)
□ Horizontal scaling configured (multiple workers)
□ Queue depth monitoring with alerts
□ Failed job inspection workflow
□ Job data doesn't contain PII in logs
□ Redis persistence enabled (AOF or RDB)
□ Graceful shutdown handling (SIGTERM)
| Scenario | Use Background Jobs? |
|---|---|
| Send welcome email on signup | ✅ Yes - can take 2-5 seconds |
| Charge credit card | ⚠️ Maybe - depends on payment provider latency |
| Generate PDF report (30 seconds) | ✅ Yes - definitely background |
| Fetch user profile from DB | ❌ No - milliseconds, keep synchronous |
| Process video upload (5 minutes) | ✅ Yes - always background |
| Validate form input | ❌ No - synchronous validation |
| Daily cron job | ✅ Yes - use repeatable jobs |
| Real-time chat message | ❌ No - use WebSockets |
| Feature | BullMQ | Celery | AWS SQS |
|---|---|---|---|
| Language | Node.js | Python | Any (HTTP API) |
| Backend | Redis | Redis/RabbitMQ/SQS | Managed |
| Priorities | ✅ | ✅ | ✅ |
| Rate Limiting | ✅ | ❌ | ✅ (via attributes) |
| Repeat/Cron | ✅ | ✅ (celery-beat) | ❌ (use EventBridge) |
| UI Dashboard | Bull Board | Flower | CloudWatch |
| Workflows | ❌ | ✅ (chains, groups) | ❌ |
| Learning Curve |
/references/bullmq-patterns.md - Advanced BullMQ patterns and examples/references/celery-workflows.md - Celery chains, groups, and chords/references/job-observability.md - Monitoring, alerting, and debuggingscripts/setup_bullmq.sh - Initialize BullMQ with Redisscripts/queue_health_check.ts - Queue metrics dashboardscripts/retry_failed_jobs.ts - Bulk retry failed jobsThis skill guides : Background job implementation | Queue architecture | Retry strategies | Worker scaling | Job observability
Weekly Installs
51
Repository
GitHub Stars
78
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex44
gemini-cli44
cursor43
opencode42
github-copilot40
cline36
Android 整洁架构指南:模块化设计、依赖注入与数据层实现
1,600 周安装
HuggingFace Tokenizers:NLP快速分词工具,Rust性能Python易用,支持自定义训练
236 周安装
Claude AI金融助手 - 智能金融分析工具,支持多种开发环境安装
236 周安装
Neon Postgres 连接配置指南:Prisma、Drizzle ORM 与 PgBouncer 连接池最佳实践
237 周安装
Jotai适配器:连接Jotai状态管理与JSON-Render的桥梁 | 状态管理解决方案
238 周安装
Nansen 聪明钱 Alpha 发现工具 - 追踪智能资金代币积累信号
239 周安装
msgraph技能:本地搜索调用27,700+微软Graph API,无需网络,提升开发效率
238 周安装
| Medium |
| Medium |
| Low |
| Cost | Redis hosting | Redis hosting | $0.40/million requests |