inngest-steps by inngest/inngest-skills
npx skills add https://github.com/inngest/inngest-skills --skill inngest-steps使用 Inngest 的步骤方法构建健壮、持久的工作流。每个步骤都是一个独立的 HTTP 请求,可以独立重试和监控。
这些技能主要针对 TypeScript。 对于 Python 或 Go,请参考 Inngest 文档 获取特定语言的指导。核心概念适用于所有语言。
🔄 关键点:每个步骤都会从头重新运行你的函数。 将所有非确定性代码(API 调用、数据库查询、随机操作)放在步骤内部,切勿放在外部。
📊 步骤限制: 每个函数最多有 1,000 个步骤,步骤数据总量最多为 4MB。
// ❌ 错误 - 会运行 4 次
export default inngest.createFunction(
{ id: "bad-example", triggers: [{ event: "test" }] },
async ({ step }) => {
console.log("This logs 4 times!"); // 在步骤外部 = 错误
await step.run("a", () => console.log("a"));
await step.run("b", () => console.log("b"));
await step.run("c", () => console.log("c"));
}
);
// ✅ 正确 - 每个日志只记录一次
export default inngest.createFunction(
{ id: "good-example", triggers: [{ event: "test" }] },
async ({ step }) => {
await step.run("log-hello", () => console.log("hello"));
await step.run("a", () => console.log("a"));
await step.run("b", () => console.log("b"));
await step.run("c", () => console.log("c"));
}
);
将可重试的代码作为步骤执行。每个步骤 ID 可以重复使用 - Inngest 会自动处理计数器。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
// 基本用法
const result = await step.run("fetch-user", async () => {
const user = await db.user.findById(userId);
return user; // 始终返回有用的数据
});
// 同步代码也可以
const transformed = await step.run("transform-data", () => {
return processData(result);
});
// 副作用(无需返回值)
await step.run("send-notification", async () => {
await sendEmail(user.email, "Welcome!");
});
✅ 应该做:
❌ 不应该做:
暂停执行而不占用计算时间。
// 持续时间字符串
await step.sleep("wait-24h", "24h");
await step.sleep("short-delay", "30s");
await step.sleep("weekly-pause", "7d");
// 在工作流中使用
await step.run("send-welcome", () => sendEmail(email));
await step.sleep("wait-for-engagement", "3d");
await step.run("send-followup", () => sendFollowupEmail(email));
休眠直到特定的日期时间。
const reminderDate = new Date("2024-12-25T09:00:00Z");
await step.sleepUntil("wait-for-christmas", reminderDate);
// 从事件数据中获取
const scheduledTime = new Date(event.data.remind_at);
await step.sleepUntil("wait-for-scheduled-time", scheduledTime);
🚨 关键:waitForEvent 只捕获在此步骤执行之后发送的事件。
❌ 在 waitForEvent 运行之前发送的事件 → 将不会被捕获
✅ 在 waitForEvent 运行之后发送的事件 → 将被捕获
始终检查 null 返回值(表示超时,事件从未到达)
// 带超时的基本事件等待 const approval = await step.waitForEvent("wait-for-approval", { event: "app/invoice.approved", timeout: "7d", match: "data.invoiceId" // 简单匹配 });
// 基于表达式的匹配(CEL 语法) const subscription = await step.waitForEvent("wait-for-subscription", { event: "app/subscription.created", timeout: "30d", if: "event.data.userId == async.data.userId && async.data.plan == 'pro'" });
// 处理超时 if (!approval) { await step.run("handle-timeout", () => { // 审批从未到达 return notifyAccountingTeam(); }); }
✅ 应该做:
❌ 不应该做:
在表达式中,event = 原始触发事件,async = 正在匹配的新事件。完整的语法、运算符和模式请参阅 表达式语法参考。
等待唯一的信号(而非事件)。更适合 1:1 匹配。
const taskId = "task-" + crypto.randomUUID();
const signal = await step.waitForSignal("wait-for-task-completion", {
signal: taskId,
timeout: "1h",
onConflict: "replace" // 必需:"replace" 覆盖待处理的信号,"fail" 抛出错误
});
// 通过 Inngest API 或 SDK 在其他地方发送信号
// POST /v1/events 并匹配 taskId 信号
使用时机:
将任务分发到其他函数,而无需等待结果。
// 触发其他函数
await step.sendEvent("notify-systems", {
name: "user/profile.updated",
data: { userId: user.id, changes: profileChanges }
});
// 同时发送多个事件
await step.sendEvent("batch-notifications", [
{ name: "billing/invoice.created", data: { invoiceId } },
{ name: "email/invoice.send", data: { email: user.email, invoiceId } }
]);
使用场景: 你想触发其他函数,但不需要在当前函数中使用它们的结果。
调用其他函数并处理它们的结果。非常适合组合。
const computeSquare = inngest.createFunction(
{ id: "compute-square", triggers: [{ event: "calculate/square" }] },
async ({ event }) => {
return { result: event.data.number * event.data.number };
}
);
// 调用并使用结果
const square = await step.invoke("get-square", {
function: computeSquare,
data: { number: 4 }
});
console.log(square.result); // 16,完全类型化!
// 用于跨应用调用(当无法直接导入函数时):
import { referenceFunction } from "inngest";
const externalFn = referenceFunction({
appId: "other-app",
functionId: "other-fn"
});
const result = await step.invoke("call-external", {
function: externalFn,
data: { key: "value" }
});
警告:v4 重大变更: step.invoke() 中不再支持字符串函数 ID(例如 function: "my-app-other-fn")。对于跨应用调用,请使用导入的函数引用或 referenceFunction()。
非常适合:
重复使用步骤 ID - Inngest 会自动处理计数器。
const allProducts = [];
let cursor = null;
let hasMore = true;
while (hasMore) {
// 重复使用相同的 ID "fetch-page" - 计数器自动处理
const page = await step.run("fetch-page", async () => {
return shopify.products.list({ cursor, limit: 50 });
});
allProducts.push(...page.products);
if (page.products.length < 50) {
hasMore = false;
} else {
cursor = page.products[49].id;
}
}
await step.run("process-products", () => {
return processAllProducts(allProducts);
});
使用 Promise.all 实现并行步骤。在 v4 中,默认优化了并行步骤执行
// 创建步骤但不等待
const sendEmail = step.run("send-email", async () => {
return await sendWelcomeEmail(user.email);
});
const updateCRM = step.run("update-crm", async () => {
return await crmService.addUser(user);
});
const createSubscription = step.run("create-subscription", async () => {
return await subscriptionService.create(user.id);
});
// 并行运行所有步骤
const [emailId, crmRecord, subscription] = await Promise.all([
sendEmail,
updateCRM,
createSubscription
]);
// 在 v4 中,默认优化了并行步骤
export default inngest.createFunction(
{
id: "parallel-heavy-function",
triggers: [{ event: "process/batch" }]
},
async ({ event, step }) => {
const results = await Promise.all(
event.data.items.map((item, i) =>
step.run(`process-item-${i}`, () => processItem(item))
)
);
}
);
// ⚠️ 使用 v4 优化并行性时 Promise.race() 的行为:
// 所有 promise 都会在 race 解析之前完成。使用 group.parallel() 实现真正的竞态:
const winner = await group.parallel(async () => {
return Promise.race([
step.run("fast-service", () => callFastService()),
step.run("slow-service", () => callSlowService())
]);
});
// 如果需要,可以禁用优化并行性:
// 在客户端级别:new Inngest({ id: "app", optimizeParallelism: false })
// 在函数级别:{ id: "fn", optimizeParallelism: false, triggers: [...] }
有关并发和节流选项,请参阅 inngest-flow-control。
非常适合使用并行步骤进行批处理。
export default inngest.createFunction(
{ id: "process-large-dataset", triggers: [{ event: "data/process.large" }] },
async ({ event, step }) => {
const chunks = chunkArray(event.data.items, 10);
// 并行处理块
const results = await Promise.all(
chunks.map((chunk, index) =>
step.run(`process-chunk-${index}`, () => processChunk(chunk))
)
);
// 合并结果
await step.run("combine-results", () => {
return aggregateResults(results);
});
}
);
🔄 函数重新执行: 步骤外部的代码在每次步骤执行时都会运行 ⏰ 事件时序: waitForEvent 只捕获步骤运行之后发送的事件 🔢 步骤限制: 每个函数最多 1,000 个步骤,每个步骤输出最多 4MB,每个函数运行总共最多 32MB 📨 HTTP 请求: 在 v4 中默认启用了检查点,减少了 HTTP 开销。对于无服务器平台,请在客户端配置 maxRuntime 🔁 步骤 ID: 可以在循环中重复使用 - Inngest 处理计数器 ⚡ 并行性: 使用 Promise.all 实现并行步骤(在 v4 中默认优化)。注意 Promise.race() 会等待所有 promise 完成 — 使用 group.parallel() 实现真正的竞态语义
记住:步骤使你的函数具有持久性、可观察性和可调试性。拥抱它们!
每周安装数
274
代码仓库
GitHub 星标数
15
首次出现
2026年2月17日
安全审计
安装于
codex263
opencode260
gemini-cli259
github-copilot259
amp258
kimi-cli258
Build robust, durable workflows with Inngest's step methods. Each step is a separate HTTP request that can be independently retried and monitored.
These skills are focused on TypeScript. For Python or Go, refer to the Inngest documentation for language-specific guidance. Core concepts apply across all languages.
🔄 Critical: Each step re-runs your function from the beginning. Put ALL non-deterministic code (API calls, DB queries, randomness) inside steps, never outside.
📊 Step Limits: Every function has a maximum of 1,000 steps and 4MB total step data.
// ❌ WRONG - will run 4 times
export default inngest.createFunction(
{ id: "bad-example", triggers: [{ event: "test" }] },
async ({ step }) => {
console.log("This logs 4 times!"); // Outside step = bad
await step.run("a", () => console.log("a"));
await step.run("b", () => console.log("b"));
await step.run("c", () => console.log("c"));
}
);
// ✅ CORRECT - logs once each
export default inngest.createFunction(
{ id: "good-example", triggers: [{ event: "test" }] },
async ({ step }) => {
await step.run("log-hello", () => console.log("hello"));
await step.run("a", () => console.log("a"));
await step.run("b", () => console.log("b"));
await step.run("c", () => console.log("c"));
}
);
Execute retriable code as a step. Each step ID can be reused - Inngest automatically handles counters.
// Basic usage
const result = await step.run("fetch-user", async () => {
const user = await db.user.findById(userId);
return user; // Always return useful data
});
// Synchronous code works too
const transformed = await step.run("transform-data", () => {
return processData(result);
});
// Side effects (no return needed)
await step.run("send-notification", async () => {
await sendEmail(user.email, "Welcome!");
});
✅ DO:
❌ DON'T:
Pause execution without using compute time.
// Duration strings
await step.sleep("wait-24h", "24h");
await step.sleep("short-delay", "30s");
await step.sleep("weekly-pause", "7d");
// Use in workflows
await step.run("send-welcome", () => sendEmail(email));
await step.sleep("wait-for-engagement", "3d");
await step.run("send-followup", () => sendFollowupEmail(email));
Sleep until a specific datetime.
const reminderDate = new Date("2024-12-25T09:00:00Z");
await step.sleepUntil("wait-for-christmas", reminderDate);
// From event data
const scheduledTime = new Date(event.data.remind_at);
await step.sleepUntil("wait-for-scheduled-time", scheduledTime);
🚨 CRITICAL: waitForEvent ONLY catches events sent AFTER this step executes.
❌ Event sent before waitForEvent runs → will NOT be caught
✅ Event sent after waitForEvent runs → will be caught
Always check for null return (means timeout, event never arrived)
// Basic event waiting with timeout const approval = await step.waitForEvent("wait-for-approval", { event: "app/invoice.approved", timeout: "7d", match: "data.invoiceId" // Simple matching });
// Expression-based matching (CEL syntax) const subscription = await step.waitForEvent("wait-for-subscription", { event: "app/subscription.created", timeout: "30d", if: "event.data.userId == async.data.userId && async.data.plan == 'pro'" });
// Handle timeout if (!approval) { await step.run("handle-timeout", () => { // Approval never came return notifyAccountingTeam(); }); }
✅ DO:
❌ DON'T:
In expressions, event = the original triggering event, async = the new event being matched. See Expression Syntax Reference for full syntax, operators, and patterns.
Wait for unique signals (not events). Better for 1:1 matching.
const taskId = "task-" + crypto.randomUUID();
const signal = await step.waitForSignal("wait-for-task-completion", {
signal: taskId,
timeout: "1h",
onConflict: "replace" // Required: "replace" overwrites pending signal, "fail" throws an error
});
// Send signal elsewhere via Inngest API or SDK
// POST /v1/events with signal matching taskId
When to use:
Fan out to other functions without waiting for results.
// Trigger other functions
await step.sendEvent("notify-systems", {
name: "user/profile.updated",
data: { userId: user.id, changes: profileChanges }
});
// Multiple events at once
await step.sendEvent("batch-notifications", [
{ name: "billing/invoice.created", data: { invoiceId } },
{ name: "email/invoice.send", data: { email: user.email, invoiceId } }
]);
Use when: You want to trigger other functions but don't need their results in the current function.
Call other functions and handle their results. Perfect for composition.
const computeSquare = inngest.createFunction(
{ id: "compute-square", triggers: [{ event: "calculate/square" }] },
async ({ event }) => {
return { result: event.data.number * event.data.number };
}
);
// Invoke and use result
const square = await step.invoke("get-square", {
function: computeSquare,
data: { number: 4 }
});
console.log(square.result); // 16, fully typed!
// For cross-app invocation (when you can't import the function directly):
import { referenceFunction } from "inngest";
const externalFn = referenceFunction({
appId: "other-app",
functionId: "other-fn"
});
const result = await step.invoke("call-external", {
function: externalFn,
data: { key: "value" }
});
Warning: v4 Breaking Change: String function IDs (e.g., function: "my-app-other-fn") are no longer supported in step.invoke(). Use an imported function reference or referenceFunction() for cross-app calls.
Great for:
Reuse step IDs - Inngest handles counters automatically.
const allProducts = [];
let cursor = null;
let hasMore = true;
while (hasMore) {
// Same ID "fetch-page" reused - counters handled automatically
const page = await step.run("fetch-page", async () => {
return shopify.products.list({ cursor, limit: 50 });
});
allProducts.push(...page.products);
if (page.products.length < 50) {
hasMore = false;
} else {
cursor = page.products[49].id;
}
}
await step.run("process-products", () => {
return processAllProducts(allProducts);
});
Use Promise.all for parallel steps. In v4, parallel step execution is optimized by default
// Create steps without awaiting
const sendEmail = step.run("send-email", async () => {
return await sendWelcomeEmail(user.email);
});
const updateCRM = step.run("update-crm", async () => {
return await crmService.addUser(user);
});
const createSubscription = step.run("create-subscription", async () => {
return await subscriptionService.create(user.id);
});
// Run all in parallel
const [emailId, crmRecord, subscription] = await Promise.all([
sendEmail,
updateCRM,
createSubscription
]);
// Parallel steps are optimized by default in v4
export default inngest.createFunction(
{
id: "parallel-heavy-function",
triggers: [{ event: "process/batch" }]
},
async ({ event, step }) => {
const results = await Promise.all(
event.data.items.map((item, i) =>
step.run(`process-item-${i}`, () => processItem(item))
)
);
}
);
// ⚠️ Promise.race() behavior with v4's optimized parallelism:
// All promises settle before race resolves. Use group.parallel() for true race:
const winner = await group.parallel(async () => {
return Promise.race([
step.run("fast-service", () => callFastService()),
step.run("slow-service", () => callSlowService())
]);
});
// To disable optimized parallelism if needed:
// At the client level: new Inngest({ id: "app", optimizeParallelism: false })
// At the function level: { id: "fn", optimizeParallelism: false, triggers: [...] }
See inngest-flow-control for concurrency and throttling options.
Perfect for batch processing with parallel steps.
export default inngest.createFunction(
{ id: "process-large-dataset", triggers: [{ event: "data/process.large" }] },
async ({ event, step }) => {
const chunks = chunkArray(event.data.items, 10);
// Process chunks in parallel
const results = await Promise.all(
chunks.map((chunk, index) =>
step.run(`process-chunk-${index}`, () => processChunk(chunk))
)
);
// Combine results
await step.run("combine-results", () => {
return aggregateResults(results);
});
}
);
🔄 Function Re-execution: Code outside steps runs on every step execution ⏰ Event Timing: waitForEvent only catches events sent AFTER the step runs 🔢 Step Limits: Max 1,000 steps per function, 4MB per step output, 32MB per function run in total 📨 HTTP Requests: Checkpointing is enabled by default in v4, reducing HTTP overhead. For serverless platforms, configure maxRuntime on the client 🔁 Step IDs: Can be reused in loops - Inngest handles counters ⚡ Parallelism: Use Promise.all for parallel steps (optimized by default in v4). Note that Promise.race() waits for all promises to settle — use group.parallel() for true race semantics
Remember: Steps make your functions durable, observable, and debuggable. Embrace them!
Weekly Installs
274
Repository
GitHub Stars
15
First Seen
Feb 17, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex263
opencode260
gemini-cli259
github-copilot259
amp258
kimi-cli258
xdrop 文件传输脚本:Bun 环境下安全上传下载工具,支持加密分享
20,700 周安装