inngest-flow-control by inngest/inngest-skills
npx skills add https://github.com/inngest/inngest-skills --skill inngest-flow-control掌握 Inngest 流控机制,有效管理资源、防止系统过载并确保应用可靠性。本技能涵盖所有流控选项,并提供关于何时及如何使用每种机制的规范性指导。
这些技能主要针对 TypeScript。 对于 Python 或 Go,请参考 Inngest 文档 获取特定语言的指导。核心概念在所有语言中均适用。
使用时机: 限制正在执行的步骤(而非函数运行)数量,以管理计算资源并防止系统不堪重负。
关键洞察: 并发限制的是活动代码执行,而非函数运行。等待 step.sleep() 或 step.waitForEvent() 的函数不计入限制。
inngest.createFunction(
{
id: "process-images",
concurrency: 5,
triggers: [{ event: "media/image.uploaded" }]
},
async ({ event, step }) => {
// 最多只有 5 个步骤可以同时执行
await step.run("resize", () => resizeImage(event.data.imageUrl));
}
);
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
使用 key 参数为键的每个唯一值应用限制。
inngest.createFunction(
{
id: "user-sync",
concurrency: [
{
key: "event.data.user_id",
limit: 1
}
],
triggers: [{ event: "user/profile.updated" }]
},
async ({ event, step }) => {
// 每个用户一次只能执行一个步骤
// 防止特定于用户的操作出现竞态条件
}
);
inngest.createFunction(
{
id: "ai-summary",
concurrency: [
{
scope: "account",
key: `"openai"`,
limit: 60
}
],
triggers: [{ event: "ai/summary.requested" }]
},
async ({ event, step }) => {
// 在所有函数间共享 60 个并发的 OpenAI 调用
}
);
每种方式的适用场景:
使用时机: 控制函数启动的速率,以应对 API 速率限制或平滑流量峰值。
与并发控制的关键区别: 节流限制函数运行的启动;并发控制限制步骤的执行。
inngest.createFunction(
{
id: "sync-crm-data",
throttle: {
limit: 10, // 10 次函数启动
period: "60s", // 每分钟
burst: 5, // 外加 5 次立即突发
key: "event.data.customer_id" // 每个客户
},
triggers: [{ event: "crm/contact.updated" }]
},
async ({ event, step }) => {
// 遵守 CRM API 速率限制:每个客户每分钟 10 次调用
await step.run("sync", () => crmApi.updateContact(event.data));
}
);
配置项:
limit:每个时间段内可以启动的函数数量period:时间窗口(1 秒到 7 天)burst:允许的额外立即启动次数key:为每个唯一的键值应用限制使用时机: 硬性限制,用于防止滥用或跳过过多的重复事件。
与节流的关键区别: 速率限制会丢弃事件;节流会延迟它们。
inngest.createFunction(
{
id: "webhook-processor",
rateLimit: {
limit: 1,
period: "4h",
key: "event.data.webhook_id"
},
triggers: [{ event: "webhook/data.received" }]
},
async ({ event, step }) => {
// 每个 webhook 每 4 小时只处理一次
// 防止重复的 webhook 垃圾信息
}
);
使用场景:
使用时机: 等待一系列事件停止到达后,再处理最新的一个。
inngest.createFunction(
{
id: "save-document",
debounce: {
period: "5m", // 最后一次编辑后等待 5 分钟
key: "event.data.document_id",
timeout: "30m" // 最多 30 分钟后强制保存
},
triggers: [{ event: "document/content.changed" }]
},
async ({ event, step }) => {
// 仅在用户停止编辑后保存文档
// 使用接收到的最后一个事件
await step.run("save", () => saveDocument(event.data));
}
);
完美适用于:
使用时机: 根据动态数据,让某些函数运行优先于其他函数执行。
inngest.createFunction(
{
id: "process-order",
priority: {
// VIP 用户获得优先权,最多提前 120 秒
run: "event.data.user_tier == 'vip' ? 120 : 0"
},
triggers: [{ event: "order/placed" }]
},
async ({ event, step }) => {
// VIP 订单在队列中优先处理
}
);
高级示例:
inngest.createFunction(
{
id: "support-ticket",
priority: {
run: `
event.data.severity == 'critical' ? 300 :
event.data.severity == 'high' ? 120 :
event.data.user_plan == 'enterprise' ? 60 : 0
`
},
triggers: [{ event: "support/ticket.created" }]
},
async ({ event, step }) => {
// 关键工单获得最高优先级(提前 300 秒)
// 高严重性:提前 120 秒
// 企业用户:提前 60 秒
// 其他所有人:正常优先级
}
);
使用时机: 确保一次只运行一个函数实例。
inngest.createFunction(
{
id: "data-backup",
singleton: {
key: "event.data.database_id",
mode: "skip"
},
triggers: [{ event: "backup/requested" }]
},
async ({ event, step }) => {
// 如果此数据库的备份已在运行,则跳过新的备份
await step.run("backup", () => performBackup(event.data.database_id));
}
);
inngest.createFunction(
{
id: "realtime-sync",
singleton: {
key: "event.data.user_id",
mode: "cancel"
},
triggers: [{ event: "user/data.changed" }]
},
async ({ event, step }) => {
// 取消之前的同步,并使用最新数据开始
await step.run("sync", () => syncUserData(event.data));
}
);
使用时机: 将多个事件一起处理以提高效率。
inngest.createFunction(
{
id: "bulk-email-send",
batchEvents: {
maxSize: 100, // 最多 100 个事件
timeout: "30s", // 或 30 秒,以先到者为准
// `key` 将事件按唯一值分组到不同的批次中
// 这与过滤事件的表达式 `if` 不同
key: "event.data.campaign_id" // 按营销活动分批
},
triggers: [{ event: "email/send.queued" }]
},
async ({ events, step }) => {
// 一起处理事件数组
const emails = events.map((evt) => ({
to: evt.data.email,
subject: evt.data.subject,
body: evt.data.body
}));
await step.run("send-batch", () => emailService.sendBulk(emails));
}
);
inngest.createFunction(
{
id: "ai-image-processing",
// 针对 API 限制的全局节流
throttle: {
limit: 50,
period: "60s",
key: `"gpu-cluster"`
},
// 每个用户的并发控制以确保公平性
concurrency: [
{
key: "event.data.user_id",
limit: 3
}
],
// VIP 用户获得优先级
priority: {
run: "event.data.plan == 'pro' ? 60 : 0"
},
triggers: [{ event: "ai/image.generate" }]
},
async ({ event, step }) => {
// 结合多种流控机制以实现最佳资源利用
}
);
专业提示: 大多数生产函数通过结合 1-3 种流控机制,可以获得最佳的可靠性和性能。
每周安装量
280
代码仓库
GitHub 星标数
15
首次出现
2026年2月17日
安全审计
安装于
codex269
opencode266
gemini-cli265
github-copilot265
amp264
kimi-cli264
Master Inngest flow control mechanisms to manage resources, prevent overloading systems, and ensure application reliability. This skill covers all flow control options with prescriptive guidance on when and how to use each.
These skills are focused on TypeScript. For Python or Go, refer to the Inngest documentation for language-specific guidance. Core concepts apply across all languages.
When to use: Limit the number of executing steps (not function runs) to manage computing resources and prevent system overwhelm.
Key insight: Concurrency limits active code execution, not function runs. A function waiting on step.sleep() or step.waitForEvent() doesn't count against the limit.
inngest.createFunction(
{
id: "process-images",
concurrency: 5,
triggers: [{ event: "media/image.uploaded" }]
},
async ({ event, step }) => {
// Only 5 steps can execute simultaneously
await step.run("resize", () => resizeImage(event.data.imageUrl));
}
);
Use key parameter to apply limit per unique value of the key.
inngest.createFunction(
{
id: "user-sync",
concurrency: [
{
key: "event.data.user_id",
limit: 1
}
],
triggers: [{ event: "user/profile.updated" }]
},
async ({ event, step }) => {
// Only 1 step per user can execute at once
// Prevents race conditions in user-specific operations
}
);
inngest.createFunction(
{
id: "ai-summary",
concurrency: [
{
scope: "account",
key: `"openai"`,
limit: 60
}
],
triggers: [{ event: "ai/summary.requested" }]
},
async ({ event, step }) => {
// Share 60 concurrent OpenAI calls across all functions
}
);
When to use each:
When to use: Control the rate of function starts over time to work around API rate limits or smooth traffic spikes.
Key difference from concurrency: Throttling limits function run starts; concurrency limits step execution.
inngest.createFunction(
{
id: "sync-crm-data",
throttle: {
limit: 10, // 10 function starts
period: "60s", // per minute
burst: 5, // plus 5 immediate bursts
key: "event.data.customer_id" // per customer
},
triggers: [{ event: "crm/contact.updated" }]
},
async ({ event, step }) => {
// Respects CRM API rate limits: 10 calls/min per customer
await step.run("sync", () => crmApi.updateContact(event.data));
}
);
Configuration:
limit: Functions that can start per periodperiod: Time window (1s to 7d)burst: Extra immediate starts allowedkey: Apply limits per unique key valueWhen to use: Hard limit to prevent abuse or skip excessive duplicate events.
Key difference from throttling: Rate limiting discards events; throttling delays them.
inngest.createFunction(
{
id: "webhook-processor",
rateLimit: {
limit: 1,
period: "4h",
key: "event.data.webhook_id"
},
triggers: [{ event: "webhook/data.received" }]
},
async ({ event, step }) => {
// Process each webhook only once per 4 hours
// Prevents duplicate webhook spam
}
);
Use cases:
When to use: Wait for a series of events to stop arriving before processing the latest one.
inngest.createFunction(
{
id: "save-document",
debounce: {
period: "5m", // Wait 5min after last edit
key: "event.data.document_id",
timeout: "30m" // Force save after 30min max
},
triggers: [{ event: "document/content.changed" }]
},
async ({ event, step }) => {
// Saves document only after user stops editing
// Uses the LAST event received
await step.run("save", () => saveDocument(event.data));
}
);
Perfect for:
When to use: Execute some function runs ahead of others based on dynamic data.
inngest.createFunction(
{
id: "process-order",
priority: {
// VIP users get priority up to 120 seconds ahead
run: "event.data.user_tier == 'vip' ? 120 : 0"
},
triggers: [{ event: "order/placed" }]
},
async ({ event, step }) => {
// VIP orders jump ahead in the queue
}
);
Advanced example:
inngest.createFunction(
{
id: "support-ticket",
priority: {
run: `
event.data.severity == 'critical' ? 300 :
event.data.severity == 'high' ? 120 :
event.data.user_plan == 'enterprise' ? 60 : 0
`
},
triggers: [{ event: "support/ticket.created" }]
},
async ({ event, step }) => {
// Critical tickets get highest priority (300s ahead)
// High severity: 120s ahead
// Enterprise users: 60s ahead
// Everyone else: normal priority
}
);
When to use: Ensure only one instance of a function runs at a time.
inngest.createFunction(
{
id: "data-backup",
singleton: {
key: "event.data.database_id",
mode: "skip"
},
triggers: [{ event: "backup/requested" }]
},
async ({ event, step }) => {
// Skip new backups if one is already running for this database
await step.run("backup", () => performBackup(event.data.database_id));
}
);
inngest.createFunction(
{
id: "realtime-sync",
singleton: {
key: "event.data.user_id",
mode: "cancel"
},
triggers: [{ event: "user/data.changed" }]
},
async ({ event, step }) => {
// Cancel previous sync and start with latest data
await step.run("sync", () => syncUserData(event.data));
}
);
When to use: Process multiple events together for efficiency.
inngest.createFunction(
{
id: "bulk-email-send",
batchEvents: {
maxSize: 100, // Up to 100 events
timeout: "30s", // Or 30 seconds, whichever first
// `key` groups events into separate batches per unique value
// This is different from expressions `if` which filters events
key: "event.data.campaign_id" // Batch per campaign
},
triggers: [{ event: "email/send.queued" }]
},
async ({ events, step }) => {
// Process array of events together
const emails = events.map((evt) => ({
to: evt.data.email,
subject: evt.data.subject,
body: evt.data.body
}));
await step.run("send-batch", () => emailService.sendBulk(emails));
}
);
inngest.createFunction(
{
id: "ai-image-processing",
// Global throttling for API limits
throttle: {
limit: 50,
period: "60s",
key: `"gpu-cluster"`
},
// Per-user concurrency for fairness
concurrency: [
{
key: "event.data.user_id",
limit: 3
}
],
// VIP users get priority
priority: {
run: "event.data.plan == 'pro' ? 60 : 0"
},
triggers: [{ event: "ai/image.generate" }]
},
async ({ event, step }) => {
// Combines multiple flow controls for optimal resource usage
}
);
Pro tip: Most production functions benefit from combining 1-3 flow control mechanisms for optimal reliability and performance.
Weekly Installs
280
Repository
GitHub Stars
15
First Seen
Feb 17, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex269
opencode266
gemini-cli265
github-copilot265
amp264
kimi-cli264
xdrop 文件传输脚本:Bun 环境下安全上传下载工具,支持加密分享
20,700 周安装