cloudflare-queues by jezweb/claude-skills
npx skills add https://github.com/jezweb/claude-skills --skill cloudflare-queues状态 : 生产就绪 ✅ 最后更新 : 2026-01-09 依赖项 : cloudflare-worker-base (用于 Worker 设置) 最新版本 : wrangler@4.58.0, @cloudflare/workers-types@4.20260109.0
近期更新 (2025) :
# 1. 创建队列
npx wrangler queues create my-queue
# 2. 在 wrangler.jsonc 中添加生产者绑定
# { "queues": { "producers": [{ "binding": "MY_QUEUE", "queue": "my-queue" }] } }
# 3. 从 Worker 发送消息
await env.MY_QUEUE.send({ userId: '123', action: 'process-order' });
# 或通过 HTTP 发布 (2025年5月+) 从任何服务
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages" \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-d '{"messages": [{"body": {"userId": "123"}}]}'
# 4. 在 wrangler.jsonc 中添加消费者绑定
# { "queues": { "consumers": [{ "queue": "my-queue", "max_batch_size": 10 }] } }
# 5. 处理消息
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
await processMessage(message.body);
message.ack(); // 显式确认
}
}
};
# 6. 部署并测试
npx wrangler deploy
npx wrangler tail my-consumer
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
// 发送单条消息
await env.MY_QUEUE.send({ userId: '123', action: 'send-email' });
// 发送带延迟的消息 (最长 12 小时)
await env.MY_QUEUE.send({ action: 'reminder' }, { delaySeconds: 600 });
// 发送批量消息 (最多 100 条消息或 256 KB)
await env.MY_QUEUE.sendBatch([
{ body: { userId: '1' } },
{ body: { userId: '2' } },
]);
关键限制:
2025年5月新增 : 从任何服务或编程语言通过 HTTP 向队列发布消息。
来源 : Cloudflare 更新日志
身份验证 : 需要具有 Queues Edit 权限的 Cloudflare API 令牌。
# 单条消息
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages" \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"body": {"userId": "123", "action": "process-order"}}
]
}'
# 批量消息
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages" \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"body": {"userId": "1"}},
{"body": {"userId": "2"}},
{"body": {"userId": "3"}}
]
}'
使用场景 :
2025年8月新增 : 订阅来自 Cloudflare 服务的事件并通过 Queues 消费。
来源 : Cloudflare 更新日志
支持的事件源 :
创建订阅 :
npx wrangler queues subscription create my-queue \
--source r2 \
--events bucket.created,object.uploaded
事件结构 :
interface CloudflareEvent {
type: string; // 'r2.bucket.created', 'kv.namespace.created'
source: string; // 'r2', 'kv', 'ai', etc.
payload: any; // 事件特定数据
metadata: {
accountId: string;
timestamp: string;
};
}
消费者示例 :
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
const event = message.body as CloudflareEvent;
switch (event.type) {
case 'r2.bucket.created':
console.log('新 R2 存储桶:', event.payload.bucketName);
await notifyAdmin(event.payload);
break;
case 'r2.object.uploaded':
console.log('文件已上传:', event.payload.key);
await processNewFile(event.payload.key);
break;
case 'kv.namespace.created':
console.log('新 KV 命名空间:', event.payload.namespaceId);
break;
case 'ai.inference.completed':
console.log('AI 推理完成:', event.payload.modelId);
break;
}
message.ack();
}
}
};
使用场景 :
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise<void> {
for (const message of batch.messages) {
// message.id - 唯一 UUID
// message.timestamp - 发送时间
// message.body - 你的内容
// message.attempts - 重试次数 (从 1 开始)
await processMessage(message.body);
message.ack(); // 显式确认 (对非幂等操作至关重要)
}
}
};
// 使用指数退避重试
message.retry({ delaySeconds: Math.min(60 * Math.pow(2, message.attempts - 1), 3600) });
// 批量方法
batch.ackAll(); // 确认所有消息
batch.retryAll(); // 重试所有消息
关键点:
message.ack() - 标记成功,即使后续处理程序失败也会阻止重试始终对以下情况使用显式 ack(): 数据库写入、API 调用、金融交易
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await env.DB.prepare('INSERT INTO orders (id, amount) VALUES (?, ?)')
.bind(message.body.orderId, message.body.amount).run();
message.ack(); // 仅在成功时确认
} catch (error) {
console.error(`失败 ${message.id}:`, error);
// 不确认 - 将重试
}
}
}
};
为什么? 防止批处理中一条消息失败时出现重复写入。失败的消息会独立重试。
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await fetch('https://api.example.com/process', {
method: 'POST',
body: JSON.stringify(message.body),
});
message.ack();
} catch (error) {
if (error.status === 429) {
const delaySeconds = Math.min(60 * Math.pow(2, message.attempts - 1), 3600);
message.retry({ delaySeconds });
} else {
message.retry();
}
}
}
}
};
⚠️ 没有 DLQ,失败的消息在达到 max_retries 后会被永久删除
npx wrangler queues create my-dlq
wrangler.jsonc:
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_retries": 3,
"dead_letter_queue": "my-dlq" // 3 次重试失败后消息会进入这里
}]
}
}
DLQ 消费者:
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
console.error('永久失败:', message.id, message.body);
await env.DB.prepare('INSERT INTO failed_messages (id, body) VALUES (?, ?)')
.bind(message.id, JSON.stringify(message.body)).run();
message.ack(); // 从 DLQ 中移除
}
}
};
此技能预防 13 个已记录的问题:
错误 : 在一个 wrangler dev 进程中发送的队列消息不会出现在另一个 wrangler dev 消费者进程中 来源 : GitHub Issue #9795
发生原因 : wrangler 使用的虚拟队列是进程内内存。独立的开发进程无法共享队列状态。
预防措施 :
# ❌ 不要将生产者和消费者作为独立进程运行
# 终端 1: wrangler dev (生产者)
# 终端 2: wrangler dev (消费者) # 不会收到消息!
# ✅ 选项 1: 在单个 dev 命令中运行两者
wrangler dev -c producer/wrangler.jsonc -c consumer/wrangler.jsonc
# ✅ 选项 2: 使用带有 auxiliaryWorkers 的 Vite 插件
# vite.config.ts:
export default defineConfig({
plugins: [
cloudflare({
auxiliaryWorkers: ['./consumer/wrangler.jsonc']
})
]
})
错误 : 当使用 wrangler dev --remote 和队列绑定时,所有路由都返回 500 内部服务器错误 来源 : GitHub Issue #9642
发生原因 : 队列在 wrangler dev --remote 模式下尚不支持。即使不使用队列绑定的路由也会失败。
预防措施 :
// 使用远程开发时,暂时注释掉队列绑定
{
"queues": {
// "producers": [{ "queue": "my-queue", "binding": "MY_QUEUE" }]
}
}
// 或改用本地开发
// wrangler dev (不带 --remote)
错误 : 当在队列生产者绑定上设置 remote: true 时,D1 远程绑定停止工作 来源 : GitHub Issue #11106
发生原因 : 影响混合本地/远程开发的绑定冲突问题。
预防措施 :
// ❌ 不要混合使用 D1 远程和队列远程
{
"d1_databases": [{
"binding": "DB",
"database_id": "...",
"remote": true
}],
"queues": {
"producers": [{
"binding": "QUEUE",
"queue": "my-queue",
"remote": true // ❌ 破坏 D1 远程功能
}]
}
}
// ✅ 使用 D1 远程时避免在队列上设置 remote
{
"d1_databases": [{ "binding": "DB", "remote": true }],
"queues": {
"producers": [{ "binding": "QUEUE", "queue": "my-queue" }]
}
}
状态 : 暂无解决方法。请跟踪该问题以获取更新。
错误 : 当混合本地队列与远程 AI/Vectorize 绑定时,队列消费者绑定不出现 来源 : GitHub Issue #9887
发生原因 : Wrangler 不支持在同一 worker 中混合本地/远程绑定。
预防措施 :
// ❌ 不要混合本地队列与远程 AI
{
"queues": {
"consumers": [{ "queue": "my-queue" }]
},
"ai": {
"binding": "AI",
"experimental_remote": true // ❌ 破坏队列消费者
}
}
// ✅ 选项 1: 全部本地 (无远程绑定)
wrangler dev
// ✅ 选项 2: 为队列和 AI 使用独立的 worker
// Worker 1: 队列处理 (本地)
// Worker 2: AI 操作 (远程)
错误 : 带有 type: "http_pull" 的队列消费者在生产环境中不执行 来源 : GitHub Issue #6619
发生原因 : http_pull 用于基于 HTTP 的外部消费者,而非基于 Worker 的消费者。
预防措施 :
// ❌ 不要对 Worker 消费者使用 type: "http_pull"
{
"queues": {
"consumers": [{
"queue": "my-queue",
"type": "http_pull", // ❌ 对 Workers 不正确
"max_batch_size": 10
}]
}
}
// ✅ 对于基于推送的 Worker 消费者,省略 type 字段
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 10
// 无 "type" 字段 - 默认为 Worker 消费者
}]
}
}
警告 : 生产者绑定中的 delivery_delay 参数将在未来的 wrangler 版本中被移除。
来源 : GitHub Issue #10286
// ❌ 将被移除 - 不要使用
{
"queues": {
"producers": [{
"binding": "MY_QUEUE",
"queue": "my-queue",
"delivery_delay": 300 // ❌ 不要使用这个
}]
}
}
迁移 : 改用每条消息的延迟:
// ✅ 正确方法 - 每条消息的延迟
await env.MY_QUEUE.send({ data }, { delaySeconds: 300 });
原因 : Workers 不应影响队列级别的设置。当有多个生产者时,最后部署的生产者的设置会生效,导致不可预测的行为。
注意 : 这些技巧来自社区讨论和 GitHub 问题。请根据你的 wrangler 版本进行验证。
来源 : GitHub Issue #6619 置信度 : 中等 适用于 : 使用 wrangler dev 的本地开发
如果你的队列消费者在本地不执行,尝试移除 max_batch_timeout:
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 10
// 移除 max_batch_timeout 以进行本地开发
}]
}
}
这似乎是版本特定的,可能不会影响所有设置。
来源 : GitHub Issue #10131 置信度 : 高 适用于 : 多环境部署 (暂存环境、PR 预览、租户特定队列)
队列名称仅可通过消费者处理程序中的 batch.queue 获取,而非生产者绑定。这会导致环境特定的队列名称出现问题,如 email-queue-staging 或 email-queue-pr-123。
当前限制 :
// ❌ 无法从生产者绑定访问队列名称
const queueName = env.MY_QUEUE.name; // 不存在!
// ❌ 必须在消费者中硬编码或规范化
switch (batch.queue) {
case 'email-queue': // email-queue-staging 怎么办?
case 'email-queue-staging': // 必须处理所有变体
case 'email-queue-pr-123': // 动态环境名称会破坏这个
}
变通方法 :
// 在消费者中: 规范化队列名称
function normalizeQueueName(queueName: string): string {
return queueName.replace(/-staging|-pr-\d+|-dev/g, '');
}
switch (normalizeQueueName(batch.queue)) {
case 'email-queue':
// 处理所有 email-queue-* 变体
}
状态 : 功能请求内部跟踪: MQ-923
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100, // 1-100 (默认: 10)
"max_batch_timeout": 30, // 0-60s (默认: 5s)
"max_retries": 5, // 0-100 (默认: 3)
"retry_delay": 300, // 秒 (默认: 0)
"max_concurrency": 10, // 1-250 (默认: 自动扩展)
"dead_letter_queue": "my-dlq" // 生产环境必需
}]
}
}
关键设置:
npx wrangler queues create my-dlq# 创建队列
npx wrangler queues create my-queue
npx wrangler queues create my-queue --message-retention-period-secs 1209600 # 14 天
# 管理队列
npx wrangler queues list
npx wrangler queues info my-queue
npx wrangler queues delete my-queue # ⚠️ 删除所有消息!
# 暂停/清空 (2025年3月 - 新增)
npx wrangler queues pause-delivery my-queue # 暂停处理,继续接收
npx wrangler queues resume-delivery my-queue
npx wrangler queues purge my-queue # ⚠️ 永久删除所有消息!
# 消费者管理
npx wrangler queues consumer add my-queue my-consumer-worker \
--batch-size 50 --batch-timeout 10 --message-retries 5
npx wrangler queues consumer remove my-queue my-consumer-worker
| 功能 | 限制 |
|---|---|
| 每个账户的队列数 | 10,000 |
| 消息大小 | 128 KB (包含约 100 字节元数据) |
| 消息重试次数 | 最多 100 次 |
| 批量大小 | 1-100 条消息 |
| 批量超时 | 0-60 秒 |
| 每次 sendBatch 的消息数 | 100 (或总计 256 KB) |
| 队列吞吐量 | 每个队列 5,000 条消息/秒 |
| 消息保留期 | 4 天 (默认), 14 天 (最大) |
| 队列积压大小 | 每个队列 25 GB |
| 并发消费者 | 250 (基于推送,自动扩展) |
| 消费者执行时长 | 15 分钟 (挂钟时间) |
| 消费者 CPU 时间 | 30 秒 (默认), 5 分钟 (最大) |
| 可见性超时 | 12 小时 (拉取消费者) |
| 消息延迟 | 12 小时 (最大) |
| API 速率限制 | 1200 请求 / 5 分钟 |
需要 Workers 付费计划 ($5/月)
操作定价:
什么算作一次操作:
典型消息生命周期:
重试:
死信队列:
成本示例:
// ❌ 错误: 消息 >128 KB
await env.MY_QUEUE.send({
data: largeArray, // >128 KB
});
// ✅ 正确: 发送前检查大小
const message = { data: largeArray };
const size = new TextEncoder().encode(JSON.stringify(message)).length;
if (size > 128000) {
// 存储在 R2 中,发送引用
const key = `messages/${crypto.randomUUID()}.json`;
await env.MY_BUCKET.put(key, JSON.stringify(message));
await env.MY_QUEUE.send({ type: 'large-message', r2Key: key });
} else {
await env.MY_QUEUE.send(message);
}
// ❌ 错误: 超出每个队列 5000 条消息/秒
for (let i = 0; i < 10000; i++) {
await env.MY_QUEUE.send({ id: i }); // 太快了!
}
// ✅ 正确: 使用 sendBatch
const messages = Array.from({ length: 10000 }, (_, i) => ({
body: { id: i },
}));
// 以 100 条为一批发送
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
}
// ✅ 更好: 使用延迟进行速率限制
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
if (i + 100 < messages.length) {
await new Promise(resolve => setTimeout(resolve, 100)); // 100ms 延迟
}
}
// ❌ 错误: 长时间处理但没有增加 CPU 限制
export default {
async queue(batch: MessageBatch): Promise<void> {
for (const message of batch.messages) {
await processForMinutes(message.body); // CPU 超时!
}
},
};
// ✅ 正确: 在 wrangler.jsonc 中增加 CPU 限制
wrangler.jsonc:
{
"limits": {
"cpu_ms": 300000 // 5 分钟 (允许的最大值)
}
}
// 问题: 消费者太慢,积压增长
// ✅ 解决方案 1: 增加批量大小
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100 // 每次调用处理更多消息
}]
}
}
// ✅ 解决方案 2: 让并发自动扩展 (不要设置 max_concurrency)
// ✅ 解决方案 3: 优化消费者代码
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
// 并行处理
await Promise.all(
batch.messages.map(async (message) => {
await process(message.body);
message.ack();
})
);
},
};
始终:
dead_letter_queue)message.ack() (数据库写入、API 调用)sendBatch() (更高效)60 * Math.pow(2, message.attempts - 1)max_concurrency)绝不:
ack()Promise.all() 进行并行处理可能原因:
解决方案:
# 检查队列信息
npx wrangler queues info my-queue
# 检查是否暂停了传递
npx wrangler queues resume-delivery my-queue
# 检查消费者日志
npx wrangler tail my-consumer
原因: 对非幂等操作使用隐式确认
解决方案: 使用显式 ack()
// ✅ 显式确认
for (const message of batch.messages) {
try {
await dbWrite(message.body);
message.ack(); // 仅在成功时确认
} catch (error) {
console.error(`失败: ${message.id}`);
// 不确认 - 将重试
}
}
原因: 未配置死信队列
解决方案:
# 创建 DLQ
npx wrangler queues create my-dlq
# 添加到消费者配置
{
"queues": {
"consumers": [{
"queue": "my-queue",
"dead_letter_queue": "my-dlq"
}]
}
}
可能原因:
max_concurrency 设置为 1解决方案:
{
"queues": {
"consumers": [{
"queue": "my-queue",
// 不要设置 max_concurrency - 让它自动扩展
"max_batch_size": 50 // 改为增加批量大小
}]
}
}
最后更新 : 2026-01-21 版本 : 2.0.0 变更 : 添加了 HTTP 发布 (2025年5月)、事件订阅 (2025年8月)、已知问题预防 (13 个问题)、破坏性变更部分、社区技巧。错误计数: 0 → 13。主要功能添加和全面的问题文档。 维护者 : Jeremy Dawes | jeremy@jezweb.net
每周安装数
339
代码仓库
GitHub 星标数
643
首次出现
Jan 20, 2026
安全审计
安装于
claude-code280
gemini-cli230
opencode226
cursor205
antigravity203
codex201
Status : Production Ready ✅ Last Updated : 2026-01-09 Dependencies : cloudflare-worker-base (for Worker setup) Latest Versions : wrangler@4.58.0, @cloudflare/workers-types@4.20260109.0
Recent Updates (2025) :
# 1. Create queue
npx wrangler queues create my-queue
# 2. Add producer binding to wrangler.jsonc
# { "queues": { "producers": [{ "binding": "MY_QUEUE", "queue": "my-queue" }] } }
# 3. Send message from Worker
await env.MY_QUEUE.send({ userId: '123', action: 'process-order' });
# Or publish via HTTP (May 2025+) from any service
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages" \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-d '{"messages": [{"body": {"userId": "123"}}]}'
# 4. Add consumer binding to wrangler.jsonc
# { "queues": { "consumers": [{ "queue": "my-queue", "max_batch_size": 10 }] } }
# 5. Process messages
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
await processMessage(message.body);
message.ack(); // Explicit acknowledgement
}
}
};
# 6. Deploy and test
npx wrangler deploy
npx wrangler tail my-consumer
// Send single message
await env.MY_QUEUE.send({ userId: '123', action: 'send-email' });
// Send with delay (max 12 hours)
await env.MY_QUEUE.send({ action: 'reminder' }, { delaySeconds: 600 });
// Send batch (max 100 messages or 256 KB)
await env.MY_QUEUE.sendBatch([
{ body: { userId: '1' } },
{ body: { userId: '2' } },
]);
Critical Limits:
New in May 2025 : Publish messages to queues via HTTP from any service or programming language.
Source : Cloudflare Changelog
Authentication : Requires Cloudflare API token with Queues Edit permissions.
# Single message
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages" \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"body": {"userId": "123", "action": "process-order"}}
]
}'
# Batch messages
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages" \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"body": {"userId": "1"}},
{"body": {"userId": "2"}},
{"body": {"userId": "3"}}
]
}'
Use Cases :
New in August 2025 : Subscribe to events from Cloudflare services and consume via Queues.
Source : Cloudflare Changelog
Supported Event Sources :
Create Subscription :
npx wrangler queues subscription create my-queue \
--source r2 \
--events bucket.created,object.uploaded
Event Structure :
interface CloudflareEvent {
type: string; // 'r2.bucket.created', 'kv.namespace.created'
source: string; // 'r2', 'kv', 'ai', etc.
payload: any; // Event-specific data
metadata: {
accountId: string;
timestamp: string;
};
}
Consumer Example :
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
const event = message.body as CloudflareEvent;
switch (event.type) {
case 'r2.bucket.created':
console.log('New R2 bucket:', event.payload.bucketName);
await notifyAdmin(event.payload);
break;
case 'r2.object.uploaded':
console.log('File uploaded:', event.payload.key);
await processNewFile(event.payload.key);
break;
case 'kv.namespace.created':
console.log('New KV namespace:', event.payload.namespaceId);
break;
case 'ai.inference.completed':
console.log('AI inference done:', event.payload.modelId);
break;
}
message.ack();
}
}
};
Use Cases :
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise<void> {
for (const message of batch.messages) {
// message.id - unique UUID
// message.timestamp - Date when sent
// message.body - your content
// message.attempts - retry count (starts at 1)
await processMessage(message.body);
message.ack(); // Explicit ack (critical for non-idempotent ops)
}
}
};
// Retry with exponential backoff
message.retry({ delaySeconds: Math.min(60 * Math.pow(2, message.attempts - 1), 3600) });
// Batch methods
batch.ackAll(); // Ack all messages
batch.retryAll(); // Retry all messages
Critical:
message.ack() - Mark success, prevents retry even if handler fails laterALWAYS use explicit ack() for: Database writes, API calls, financial transactions
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await env.DB.prepare('INSERT INTO orders (id, amount) VALUES (?, ?)')
.bind(message.body.orderId, message.body.amount).run();
message.ack(); // Only ack on success
} catch (error) {
console.error(`Failed ${message.id}:`, error);
// Don't ack - will retry
}
}
}
};
Why? Prevents duplicate writes if one message in batch fails. Failed messages retry independently.
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await fetch('https://api.example.com/process', {
method: 'POST',
body: JSON.stringify(message.body),
});
message.ack();
} catch (error) {
if (error.status === 429) {
const delaySeconds = Math.min(60 * Math.pow(2, message.attempts - 1), 3600);
message.retry({ delaySeconds });
} else {
message.retry();
}
}
}
}
};
⚠️ Without DLQ, failed messages are DELETED PERMANENTLY after max_retries
npx wrangler queues create my-dlq
wrangler.jsonc:
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_retries": 3,
"dead_letter_queue": "my-dlq" // Messages go here after 3 failed retries
}]
}
}
DLQ Consumer:
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
console.error('PERMANENTLY FAILED:', message.id, message.body);
await env.DB.prepare('INSERT INTO failed_messages (id, body) VALUES (?, ?)')
.bind(message.id, JSON.stringify(message.body)).run();
message.ack(); // Remove from DLQ
}
}
};
This skill prevents 13 documented issues:
Error : Queue messages sent in one wrangler dev process don't appear in another wrangler dev consumer process Source : GitHub Issue #9795
Why It Happens : The virtual queue used by wrangler is in-process memory. Separate dev processes cannot share the queue state.
Prevention :
# ❌ Don't run producer and consumer as separate processes
# Terminal 1: wrangler dev (producer)
# Terminal 2: wrangler dev (consumer) # Won't receive messages!
# ✅ Option 1: Run both in single dev command
wrangler dev -c producer/wrangler.jsonc -c consumer/wrangler.jsonc
# ✅ Option 2: Use Vite plugin with auxiliaryWorkers
# vite.config.ts:
export default defineConfig({
plugins: [
cloudflare({
auxiliaryWorkers: ['./consumer/wrangler.jsonc']
})
]
})
Error : All routes return 500 Internal Server Error when using wrangler dev --remote with queue bindings Source : GitHub Issue #9642
Why It Happens : Queues are not yet supported in wrangler dev --remote mode. Even routes that don't use the queue binding fail.
Prevention :
// When using remote dev, temporarily comment out queue bindings
{
"queues": {
// "producers": [{ "queue": "my-queue", "binding": "MY_QUEUE" }]
}
}
// Or use local dev instead
// wrangler dev (without --remote)
Error : D1 remote binding stops working when remote: true is set on queue producer binding Source : GitHub Issue #11106
Why It Happens : Binding conflict issue affecting mixed local/remote development.
Prevention :
// ❌ Don't mix D1 remote with queue remote
{
"d1_databases": [{
"binding": "DB",
"database_id": "...",
"remote": true
}],
"queues": {
"producers": [{
"binding": "QUEUE",
"queue": "my-queue",
"remote": true // ❌ Breaks D1 remote
}]
}
}
// ✅ Avoid remote on queues when using D1 remote
{
"d1_databases": [{ "binding": "DB", "remote": true }],
"queues": {
"producers": [{ "binding": "QUEUE", "queue": "my-queue" }]
}
}
Status : No workaround yet. Track issue for updates.
Error : Queue consumer binding does not appear when mixing local queues with remote AI/Vectorize bindings Source : GitHub Issue #9887
Why It Happens : Wrangler doesn't support mixed local/remote bindings in the same worker.
Prevention :
// ❌ Don't mix local queues with remote AI
{
"queues": {
"consumers": [{ "queue": "my-queue" }]
},
"ai": {
"binding": "AI",
"experimental_remote": true // ❌ Breaks queue consumer
}
}
// ✅ Option 1: All local (no remote bindings)
wrangler dev
// ✅ Option 2: Separate workers for queues vs AI
// Worker 1: Queue processing (local)
// Worker 2: AI operations (remote)
Error : Queue consumer with type: "http_pull" doesn't execute in production Source : GitHub Issue #6619
Why It Happens : http_pull is for external HTTP-based consumers, not Worker-based consumers.
Prevention :
// ❌ Don't use type: "http_pull" for Worker consumers
{
"queues": {
"consumers": [{
"queue": "my-queue",
"type": "http_pull", // ❌ Wrong for Workers
"max_batch_size": 10
}]
}
}
// ✅ Omit type field for push-based Worker consumers
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 10
// No "type" field - defaults to Worker consumer
}]
}
}
Warning : The delivery_delay parameter in producer bindings will be removed in a future wrangler version.
Source : GitHub Issue #10286
// ❌ Will be removed - don't use
{
"queues": {
"producers": [{
"binding": "MY_QUEUE",
"queue": "my-queue",
"delivery_delay": 300 // ❌ Don't use this
}]
}
}
Migration : Use per-message delay instead:
// ✅ Correct approach - per-message delay
await env.MY_QUEUE.send({ data }, { delaySeconds: 300 });
Why : Workers should not affect queue-level settings. With multiple producers, the setting from the last-deployed producer wins, causing unpredictable behavior.
Note : These tips come from community discussions and GitHub issues. Verify against your wrangler version.
Source : GitHub Issue #6619 Confidence : MEDIUM Applies to : Local development with wrangler dev
If your queue consumer doesn't execute locally, try removing max_batch_timeout:
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 10
// Remove max_batch_timeout for local dev
}]
}
}
This appears to be version-specific and may not affect all setups.
Source : GitHub Issue #10131 Confidence : HIGH Applies to : Multi-environment deployments (staging, PR previews, tenant-specific queues)
Queue names are only available via batch.queue in consumer handlers, not on producer bindings. This creates issues with environment-specific queue names like email-queue-staging or email-queue-pr-123.
Current Limitation :
// ❌ Can't access queue name from producer binding
const queueName = env.MY_QUEUE.name; // Doesn't exist!
// ❌ Must hardcode or normalize in consumer
switch (batch.queue) {
case 'email-queue': // What about email-queue-staging?
case 'email-queue-staging': // Must handle all variants
case 'email-queue-pr-123': // Dynamic env names break this
}
Workaround :
// In consumer: Normalize queue name
function normalizeQueueName(queueName: string): string {
return queueName.replace(/-staging|-pr-\d+|-dev/g, '');
}
switch (normalizeQueueName(batch.queue)) {
case 'email-queue':
// Handle all email-queue-* variants
}
Status : Feature request tracked internally: MQ-923
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100, // 1-100 (default: 10)
"max_batch_timeout": 30, // 0-60s (default: 5s)
"max_retries": 5, // 0-100 (default: 3)
"retry_delay": 300, // Seconds (default: 0)
"max_concurrency": 10, // 1-250 (default: auto-scale)
"dead_letter_queue": "my-dlq" // REQUIRED for production
}]
}
}
Critical Settings:
npx wrangler queues create my-dlq# Create queue
npx wrangler queues create my-queue
npx wrangler queues create my-queue --message-retention-period-secs 1209600 # 14 days
# Manage queues
npx wrangler queues list
npx wrangler queues info my-queue
npx wrangler queues delete my-queue # ⚠️ Deletes ALL messages!
# Pause/Purge (March 2025 - NEW)
npx wrangler queues pause-delivery my-queue # Pause processing, keep receiving
npx wrangler queues resume-delivery my-queue
npx wrangler queues purge my-queue # ⚠️ Permanently deletes all messages!
# Consumer management
npx wrangler queues consumer add my-queue my-consumer-worker \
--batch-size 50 --batch-timeout 10 --message-retries 5
npx wrangler queues consumer remove my-queue my-consumer-worker
| Feature | Limit |
|---|---|
| Queues per account | 10,000 |
| Message size | 128 KB (includes ~100 bytes metadata) |
| Message retries | 100 max |
| Batch size | 1-100 messages |
| Batch timeout | 0-60 seconds |
| Messages per sendBatch | 100 (or 256 KB total) |
| Queue throughput | 5,000 messages/second per queue |
| Message retention | 4 days (default), 14 days (max) |
| Queue backlog size | 25 GB per queue |
| Concurrent consumers | 250 (push-based, auto-scale) |
Requires Workers Paid plan ($5/month)
Operations Pricing:
What counts as an operation:
Typical message lifecycle:
Retries:
Dead Letter Queue:
Cost examples:
// ❌ Bad: Message >128 KB
await env.MY_QUEUE.send({
data: largeArray, // >128 KB
});
// ✅ Good: Check size before sending
const message = { data: largeArray };
const size = new TextEncoder().encode(JSON.stringify(message)).length;
if (size > 128000) {
// Store in R2, send reference
const key = `messages/${crypto.randomUUID()}.json`;
await env.MY_BUCKET.put(key, JSON.stringify(message));
await env.MY_QUEUE.send({ type: 'large-message', r2Key: key });
} else {
await env.MY_QUEUE.send(message);
}
// ❌ Bad: Exceeding 5000 msg/s per queue
for (let i = 0; i < 10000; i++) {
await env.MY_QUEUE.send({ id: i }); // Too fast!
}
// ✅ Good: Use sendBatch
const messages = Array.from({ length: 10000 }, (_, i) => ({
body: { id: i },
}));
// Send in batches of 100
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
}
// ✅ Even better: Rate limit with delay
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
if (i + 100 < messages.length) {
await new Promise(resolve => setTimeout(resolve, 100)); // 100ms delay
}
}
// ❌ Bad: Long processing without CPU limit increase
export default {
async queue(batch: MessageBatch): Promise<void> {
for (const message of batch.messages) {
await processForMinutes(message.body); // CPU timeout!
}
},
};
// ✅ Good: Increase CPU limit in wrangler.jsonc
wrangler.jsonc:
{
"limits": {
"cpu_ms": 300000 // 5 minutes (max allowed)
}
}
// Issue: Consumer too slow, backlog growing
// ✅ Solution 1: Increase batch size
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100 // Process more per invocation
}]
}
}
// ✅ Solution 2: Let concurrency auto-scale (don't set max_concurrency)
// ✅ Solution 3: Optimize consumer code
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
// Process in parallel
await Promise.all(
batch.messages.map(async (message) => {
await process(message.body);
message.ack();
})
);
},
};
Always:
dead_letter_queue in consumer config)message.ack() for non-idempotent ops (DB writes, API calls)sendBatch() for multiple messages (more efficient)60 * Math.pow(2, message.attempts - 1)max_concurrency unless upstream has rate limits)Never:
ack()Promise.all() for parallelismPossible causes:
Solution:
# Check queue info
npx wrangler queues info my-queue
# Check if delivery paused
npx wrangler queues resume-delivery my-queue
# Check consumer logs
npx wrangler tail my-consumer
Cause: Using implicit acknowledgement with non-idempotent operations
Solution: Use explicit ack()
// ✅ Explicit ack
for (const message of batch.messages) {
try {
await dbWrite(message.body);
message.ack(); // Only ack on success
} catch (error) {
console.error(`Failed: ${message.id}`);
// Don't ack - will retry
}
}
Cause: No Dead Letter Queue configured
Solution:
# Create DLQ
npx wrangler queues create my-dlq
# Add to consumer config
{
"queues": {
"consumers": [{
"queue": "my-queue",
"dead_letter_queue": "my-dlq"
}]
}
}
Possible causes:
max_concurrency set to 1Solution:
{
"queues": {
"consumers": [{
"queue": "my-queue",
// Don't set max_concurrency - let it auto-scale
"max_batch_size": 50 // Increase batch size instead
}]
}
}
Last Updated : 2026-01-21 Version : 2.0.0 Changes : Added HTTP Publishing (May 2025), Event Subscriptions (Aug 2025), Known Issues Prevention (13 issues), Breaking Changes section, Community Tips. Error count: 0 → 13. Major feature additions and comprehensive issue documentation. Maintainer : Jeremy Dawes | jeremy@jezweb.net
Weekly Installs
339
Repository
GitHub Stars
643
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
claude-code280
gemini-cli230
opencode226
cursor205
antigravity203
codex201
Azure Data Explorer (Kusto) 查询技能:KQL数据分析、日志遥测与时间序列处理
100,500 周安装
Cloudflare Images 图像托管与转换 API 使用指南 | 支持 AI 人脸裁剪与内容凭证
328 周安装
Swift iOS HomeKit Matter 开发指南:控制智能家居与设备配网
329 周安装
iOS WeatherKit 使用指南:获取天气数据、预报与警报的 Swift 实现
329 周安装
Microsoft Agent Framework 开发指南:统一Semantic Kernel与AutoGen的AI智能体框架
329 周安装
Spring缓存单元测试指南:@Cacheable、@CacheEvict、@CachePut测试方法与内存缓存管理器
329 周安装
React Native 升级指南:使用 upgrade-react-native 技能轻松升级项目版本
329 周安装
| 15 minutes (wall clock) |
| Consumer CPU time | 30 seconds (default), 5 minutes (max) |
| Visibility timeout | 12 hours (pull consumers) |
| Message delay | 12 hours (max) |
| API rate limit | 1200 requests / 5 minutes |