inngest-durable-functions by inngest/inngest-skills
npx skills add https://github.com/inngest/inngest-skills --skill inngest-durable-functions掌握 Inngest 的持久化执行模型,用于构建容错、长时间运行的工作流。本技能涵盖从触发器到错误处理的完整生命周期。
这些技能主要针对 TypeScript。 对于 Python 或 Go,请参考 Inngest 文档 获取特定语言的指导。核心概念适用于所有语言。
// ❌ 错误示例:步骤外的非确定性逻辑
async ({ event, step }) => {
const timestamp = Date.now(); // 这会运行多次!
const result = await step.run("process-data", () => {
return processData(event.data);
});
};
// ✅ 正确示例:所有非确定性逻辑都在步骤内
async ({ event, step }) => {
const result = await step.run("process-with-timestamp", () => {
const timestamp = Date.now(); // 只运行一次
return processData(event.data, timestamp);
});
};
每个 Inngest 函数都有以下硬性限制:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
如果遇到这些限制,请将函数拆分为更小的函数,并通过 step.invoke() 或 step.sendEvent() 连接。
始终用 step.run() 包装:
切勿用 step.run() 包装:
const processOrder = inngest.createFunction(
{
id: "process-order", // 唯一标识符,永不更改
triggers: [{ event: "order/created" }],
retries: 4, // 默认:每个步骤重试 4 次
concurrency: 10 // 最大并发执行数
},
async ({ event, step }) => {
// 你的持久化工作流
}
);
// 步骤 ID 可以重复使用 - Inngest 会自动处理计数器
const data = await step.run("fetch-data", () => fetchUserData());
const more = await step.run("fetch-data", () => fetchOrderData()); // 不同的执行
// 使用描述性 ID 以提高清晰度
await step.run("validate-payment", () => validatePayment(event.data.paymentId));
await step.run("charge-customer", () => chargeCustomer(event.data));
await step.run("send-confirmation", () => sendEmail(event.data.email));
触发器在 createFunction 的第一个参数的 triggers 数组中定义:
// 单个事件触发器
inngest.createFunction(
{ id: "my-fn", triggers: [{ event: "user/signup" }] },
async ({ event }) => { /* ... */ }
);
// 带条件过滤器的事件
inngest.createFunction(
{ id: "my-fn", triggers: [{ event: "user/action", if: 'event.data.action == "purchase" && event.data.amount > 100' }] },
async ({ event }) => { /* ... */ }
);
// 多个触发器(最多 10 个)
inngest.createFunction(
{
id: "my-fn",
triggers: [
{ event: "user/signup" },
{ event: "user/login", if: 'event.data.firstLogin == true' },
{ cron: "0 9 * * *" } // 每天上午 9 点
]
},
async ({ event }) => { /* ... */ }
);
// 基本 cron
inngest.createFunction(
{ id: "my-fn", triggers: [{ cron: "0 */6 * * *" }] }, // 每 6 小时
async ({ step }) => { /* ... */ }
);
// 带时区
inngest.createFunction(
{ id: "my-fn", triggers: [{ cron: "TZ=Europe/Paris 0 12 * * 5" }] }, // 巴黎时间周五中午
async ({ step }) => { /* ... */ }
);
// 与事件结合
inngest.createFunction(
{
id: "my-fn",
triggers: [
{ event: "manual/report.requested" },
{ cron: "0 0 * * 0" } // 每周日
]
},
async ({ event, step }) => { /* ... */ }
);
// 将另一个函数作为步骤调用
const result = await step.invoke("generate-report", {
function: generateReportFunction,
data: { userId: event.data.userId }
});
// 使用返回的数据
await step.run("process-report", () => {
return processReport(result);
});
// 使用自定义 ID 防止重复事件
await inngest.send({
id: `checkout-completed-${cartId}`, // 24 小时去重
name: "cart/checkout.completed",
data: { cartId, email: "user@example.com" }
});
const sendEmail = inngest.createFunction(
{
id: "send-checkout-email",
triggers: [{ event: "cart/checkout.completed" }],
// 每个 cartId 每 24 小时只运行一次
idempotency: "event.data.cartId"
},
async ({ event, step }) => {
// 此函数不会对相同的 cartId 运行两次
}
);
// 复杂的幂等性键
const processUserAction = inngest.createFunction(
{
id: "process-user-action",
triggers: [{ event: "user/action.performed" }],
// 每个用户 + 组织组合唯一
idempotency: 'event.data.userId + "-" + event.data.organizationId'
},
async ({ event, step }) => {
/* ... */
}
);
在表达式中,event = 原始触发事件,async = 正在匹配的新事件。完整细节请参阅 表达式语法参考。
const processOrder = inngest.createFunction(
{
id: "process-order",
triggers: [{ event: "order/created" }],
cancelOn: [
{
event: "order/cancelled",
if: "event.data.orderId == async.data.orderId"
}
]
},
async ({ event, step }) => {
await step.sleepUntil("wait-for-payment", event.data.paymentDue);
// 如果收到 order/cancelled 事件,将被取消
await step.run("charge-payment", () => processPayment(event.data));
}
);
const processWithTimeout = inngest.createFunction(
{
id: "process-with-timeout",
triggers: [{ event: "long/process.requested" }],
timeouts: {
start: "5m", // 如果 5 分钟内未开始,则取消
finish: "30m" // 如果 30 分钟内未完成,则取消
}
},
async ({ event, step }) => {
/* ... */
}
);
// 监听取消事件
const cleanupCancelled = inngest.createFunction(
{ id: "cleanup-cancelled-process", triggers: [{ event: "inngest/function.cancelled" }] },
async ({ event, step }) => {
if (event.data.function_id === "process-order") {
await step.run("cleanup-resources", () => {
return cleanupOrderResources(event.data.run_id);
});
}
}
);
const reliableFunction = inngest.createFunction(
{
id: "reliable-function",
triggers: [{ event: "critical/task" }],
retries: 10 // 每个步骤最多重试 10 次
},
async ({ event, step, attempt }) => {
// `attempt` 是函数级别的尝试计数器(从 0 开始)
// 它跟踪当前执行步骤的重试次数,而不是整个函数
if (attempt > 5) {
// 对当前步骤的后续尝试使用不同的逻辑
}
}
);
对于重试也不会成功的代码,防止重试。
import { NonRetriableError } from "inngest";
const processUser = inngest.createFunction(
{ id: "process-user", triggers: [{ event: "user/process.requested" }] },
async ({ event, step }) => {
const user = await step.run("fetch-user", async () => {
const user = await db.users.findOne(event.data.userId);
if (!user) {
// 不要重试 - 用户不存在
throw new NonRetriableError("User not found, stopping execution");
}
return user;
});
// 继续处理...
}
);
import { RetryAfterError } from "inngest";
const respectRateLimit = inngest.createFunction(
{ id: "api-call", triggers: [{ event: "api/call.requested" }] },
async ({ event, step }) => {
await step.run("call-api", async () => {
const response = await externalAPI.call(event.data);
if (response.status === 429) {
// 根据 API 指定的时间后重试
const retryAfter = response.headers["retry-after"];
throw new RetryAfterError("Rate limited", `${retryAfter}s`);
}
return response.data;
});
}
);
import winston from "winston";
// 配置日志记录器
const logger = winston.createLogger({
level: "info",
format: winston.format.json(),
transports: [new winston.transports.Console()]
});
const inngest = new Inngest({
id: "my-app",
logger // 将日志记录器传递给客户端
});
// 或者使用内置的 ConsoleLogger 进行简单的日志级别控制
import { ConsoleLogger, Inngest } from "inngest";
const inngest = new Inngest({
id: "my-app",
logger: new ConsoleLogger({ level: "debug" }) // "debug" | "info" | "warn" | "error"
});
⚠️ v4 重大变更: logLevel 选项已被移除。请改用带有 ConsoleLogger 或自定义日志记录器的 logger 选项。
const processData = inngest.createFunction(
{ id: "process-data", triggers: [{ event: "data/process.requested" }] },
async ({ event, step, logger }) => {
// ✅ 正确:在步骤内记录日志以避免重复
const result = await step.run("fetch-data", async () => {
logger.info("Fetching data for user", { userId: event.data.userId });
return await fetchUserData(event.data.userId);
});
// ❌ 避免:在步骤外记录日志可能导致重复
// logger.info("Processing complete"); // 这可能会运行多次!
await step.run("log-completion", async () => {
logger.info("Processing complete", { resultCount: result.length });
});
}
);
检查点在 v4 中默认启用。它允许函数在执行期间定期持久化状态,减少步骤之间的延迟。
// 检查点在 v4 中默认启用
// 为无服务器平台配置 maxRuntime(设置为平台超时的 60-80%)
const realTimeFunction = inngest.createFunction(
{
id: "real-time-function",
triggers: [{ event: "realtime/process" }],
checkpointing: {
maxRuntime: "50s", // 适用于 60 秒超时的无服务器环境
}
},
async ({ event, step }) => {
// 步骤通过定期检查点立即执行
const result1 = await step.run("step-1", () => process1(event.data));
const result2 = await step.run("step-2", () => process2(result1));
return { result2 };
}
);
// 如果需要,可以禁用检查点
const legacyFunction = inngest.createFunction(
{
id: "legacy-function",
triggers: [{ event: "legacy/process" }],
checkpointing: false
},
async ({ event, step }) => { /* ... */ }
);
const conditionalProcess = inngest.createFunction(
{ id: "conditional-process", triggers: [{ event: "process/conditional" }] },
async ({ event, step }) => {
const userData = await step.run("fetch-user", () => {
return getUserData(event.data.userId);
});
// 条件步骤执行
if (userData.isPremium) {
await step.run("premium-processing", () => {
return processPremiumFeatures(userData);
});
}
// 始终运行
await step.run("standard-processing", () => {
return processStandardFeatures(userData);
});
}
);
const robustProcess = inngest.createFunction(
{ id: "robust-process", triggers: [{ event: "process/robust" }] },
async ({ event, step }) => {
let primaryResult;
try {
primaryResult = await step.run("primary-service", () => {
return callPrimaryService(event.data);
});
} catch (error) {
// 回退到次要服务
primaryResult = await step.run("fallback-service", () => {
return callSecondaryService(event.data);
});
}
return { result: primaryResult };
}
);
本技能涵盖 Inngest 的持久化函数模式。关于事件发送和 Webhook 处理,请参阅 inngest-events 技能。
每周安装次数
273
代码仓库
GitHub 星标数
15
首次出现
2026 年 2 月 17 日
安全审计
已安装于
codex262
opencode259
gemini-cli258
github-copilot258
amp257
kimi-cli257
Master Inngest's durable execution model for building fault-tolerant, long-running workflows. This skill covers the complete lifecycle from triggers to error handling.
These skills are focused on TypeScript. For Python or Go, refer to the Inngest documentation for language-specific guidance. Core concepts apply across all languages.
// ❌ BAD: Non-deterministic logic outside steps
async ({ event, step }) => {
const timestamp = Date.now(); // This runs multiple times!
const result = await step.run("process-data", () => {
return processData(event.data);
});
};
// ✅ GOOD: All non-deterministic logic in steps
async ({ event, step }) => {
const result = await step.run("process-with-timestamp", () => {
const timestamp = Date.now(); // Only runs once
return processData(event.data, timestamp);
});
};
Every Inngest function has these hard limits:
If you're hitting these limits, break your function into smaller functions connected via step.invoke() or step.sendEvent().
Always wrap instep.run():
Never wrap instep.run():
const processOrder = inngest.createFunction(
{
id: "process-order", // Unique, never change this
triggers: [{ event: "order/created" }],
retries: 4, // Default: 4 retries per step
concurrency: 10 // Max concurrent executions
},
async ({ event, step }) => {
// Your durable workflow
}
);
// Step IDs can be reused - Inngest handles counters automatically
const data = await step.run("fetch-data", () => fetchUserData());
const more = await step.run("fetch-data", () => fetchOrderData()); // Different execution
// Use descriptive IDs for clarity
await step.run("validate-payment", () => validatePayment(event.data.paymentId));
await step.run("charge-customer", () => chargeCustomer(event.data));
await step.run("send-confirmation", () => sendEmail(event.data.email));
Triggers are defined in the triggers array in the first argument of createFunction:
// Single event trigger
inngest.createFunction(
{ id: "my-fn", triggers: [{ event: "user/signup" }] },
async ({ event }) => { /* ... */ }
);
// Event with conditional filter
inngest.createFunction(
{ id: "my-fn", triggers: [{ event: "user/action", if: 'event.data.action == "purchase" && event.data.amount > 100' }] },
async ({ event }) => { /* ... */ }
);
// Multiple triggers (up to 10)
inngest.createFunction(
{
id: "my-fn",
triggers: [
{ event: "user/signup" },
{ event: "user/login", if: 'event.data.firstLogin == true' },
{ cron: "0 9 * * *" } // Daily at 9 AM
]
},
async ({ event }) => { /* ... */ }
);
// Basic cron
inngest.createFunction(
{ id: "my-fn", triggers: [{ cron: "0 */6 * * *" }] }, // Every 6 hours
async ({ step }) => { /* ... */ }
);
// With timezone
inngest.createFunction(
{ id: "my-fn", triggers: [{ cron: "TZ=Europe/Paris 0 12 * * 5" }] }, // Fridays at noon Paris time
async ({ step }) => { /* ... */ }
);
// Combine with events
inngest.createFunction(
{
id: "my-fn",
triggers: [
{ event: "manual/report.requested" },
{ cron: "0 0 * * 0" } // Weekly on Sunday
]
},
async ({ event, step }) => { /* ... */ }
);
// Invoke another function as a step
const result = await step.invoke("generate-report", {
function: generateReportFunction,
data: { userId: event.data.userId }
});
// Use returned data
await step.run("process-report", () => {
return processReport(result);
});
// Prevent duplicate events with custom ID
await inngest.send({
id: `checkout-completed-${cartId}`, // 24-hour deduplication
name: "cart/checkout.completed",
data: { cartId, email: "user@example.com" }
});
const sendEmail = inngest.createFunction(
{
id: "send-checkout-email",
triggers: [{ event: "cart/checkout.completed" }],
// Only run once per cartId per 24 hours
idempotency: "event.data.cartId"
},
async ({ event, step }) => {
// This function won't run twice for same cartId
}
);
// Complex idempotency keys
const processUserAction = inngest.createFunction(
{
id: "process-user-action",
triggers: [{ event: "user/action.performed" }],
// Unique per user + organization combination
idempotency: 'event.data.userId + "-" + event.data.organizationId'
},
async ({ event, step }) => {
/* ... */
}
);
In expressions, event = the original triggering event, async = the new event being matched. See Expression Syntax Reference for full details.
const processOrder = inngest.createFunction(
{
id: "process-order",
triggers: [{ event: "order/created" }],
cancelOn: [
{
event: "order/cancelled",
if: "event.data.orderId == async.data.orderId"
}
]
},
async ({ event, step }) => {
await step.sleepUntil("wait-for-payment", event.data.paymentDue);
// Will be cancelled if order/cancelled event received
await step.run("charge-payment", () => processPayment(event.data));
}
);
const processWithTimeout = inngest.createFunction(
{
id: "process-with-timeout",
triggers: [{ event: "long/process.requested" }],
timeouts: {
start: "5m", // Cancel if not started within 5 minutes
finish: "30m" // Cancel if not finished within 30 minutes
}
},
async ({ event, step }) => {
/* ... */
}
);
// Listen for cancellation events
const cleanupCancelled = inngest.createFunction(
{ id: "cleanup-cancelled-process", triggers: [{ event: "inngest/function.cancelled" }] },
async ({ event, step }) => {
if (event.data.function_id === "process-order") {
await step.run("cleanup-resources", () => {
return cleanupOrderResources(event.data.run_id);
});
}
}
);
const reliableFunction = inngest.createFunction(
{
id: "reliable-function",
triggers: [{ event: "critical/task" }],
retries: 10 // Up to 10 retries per step
},
async ({ event, step, attempt }) => {
// `attempt` is the function-level attempt counter (0-indexed)
// It tracks retries for the currently executing step, not the overall function
if (attempt > 5) {
// Different logic for later attempts of the current step
}
}
);
Prevent retries for code that won't succeed upon retry.
import { NonRetriableError } from "inngest";
const processUser = inngest.createFunction(
{ id: "process-user", triggers: [{ event: "user/process.requested" }] },
async ({ event, step }) => {
const user = await step.run("fetch-user", async () => {
const user = await db.users.findOne(event.data.userId);
if (!user) {
// Don't retry - user doesn't exist
throw new NonRetriableError("User not found, stopping execution");
}
return user;
});
// Continue processing...
}
);
import { RetryAfterError } from "inngest";
const respectRateLimit = inngest.createFunction(
{ id: "api-call", triggers: [{ event: "api/call.requested" }] },
async ({ event, step }) => {
await step.run("call-api", async () => {
const response = await externalAPI.call(event.data);
if (response.status === 429) {
// Retry after specific time from API
const retryAfter = response.headers["retry-after"];
throw new RetryAfterError("Rate limited", `${retryAfter}s`);
}
return response.data;
});
}
);
import winston from "winston";
// Configure logger
const logger = winston.createLogger({
level: "info",
format: winston.format.json(),
transports: [new winston.transports.Console()]
});
const inngest = new Inngest({
id: "my-app",
logger // Pass logger to client
});
// Or use the built-in ConsoleLogger for simple log level control
import { ConsoleLogger, Inngest } from "inngest";
const inngest = new Inngest({
id: "my-app",
logger: new ConsoleLogger({ level: "debug" }) // "debug" | "info" | "warn" | "error"
});
⚠️ v4 Breaking Change: The logLevel option has been removed. Use the logger option with ConsoleLogger or a custom logger instead.
const processData = inngest.createFunction(
{ id: "process-data", triggers: [{ event: "data/process.requested" }] },
async ({ event, step, logger }) => {
// ✅ GOOD: Log inside steps to avoid duplicates
const result = await step.run("fetch-data", async () => {
logger.info("Fetching data for user", { userId: event.data.userId });
return await fetchUserData(event.data.userId);
});
// ❌ AVOID: Logging outside steps can duplicate
// logger.info("Processing complete"); // This could run multiple times!
await step.run("log-completion", async () => {
logger.info("Processing complete", { resultCount: result.length });
});
}
);
Checkpointing is enabled by default in v4. It allows functions to persist state periodically during execution, reducing latency between steps.
// Checkpointing is enabled by default in v4
// Configure maxRuntime for serverless platforms (set to 60-80% of platform timeout)
const realTimeFunction = inngest.createFunction(
{
id: "real-time-function",
triggers: [{ event: "realtime/process" }],
checkpointing: {
maxRuntime: "50s", // For serverless with 60s timeout
}
},
async ({ event, step }) => {
// Steps execute immediately with periodic checkpointing
const result1 = await step.run("step-1", () => process1(event.data));
const result2 = await step.run("step-2", () => process2(result1));
return { result2 };
}
);
// Disable checkpointing if needed
const legacyFunction = inngest.createFunction(
{
id: "legacy-function",
triggers: [{ event: "legacy/process" }],
checkpointing: false
},
async ({ event, step }) => { /* ... */ }
);
const conditionalProcess = inngest.createFunction(
{ id: "conditional-process", triggers: [{ event: "process/conditional" }] },
async ({ event, step }) => {
const userData = await step.run("fetch-user", () => {
return getUserData(event.data.userId);
});
// Conditional step execution
if (userData.isPremium) {
await step.run("premium-processing", () => {
return processPremiumFeatures(userData);
});
}
// Always runs
await step.run("standard-processing", () => {
return processStandardFeatures(userData);
});
}
);
const robustProcess = inngest.createFunction(
{ id: "robust-process", triggers: [{ event: "process/robust" }] },
async ({ event, step }) => {
let primaryResult;
try {
primaryResult = await step.run("primary-service", () => {
return callPrimaryService(event.data);
});
} catch (error) {
// Fallback to secondary service
primaryResult = await step.run("fallback-service", () => {
return callSecondaryService(event.data);
});
}
return { result: primaryResult };
}
);
This skill covers Inngest's durable function patterns. For event sending and webhook handling, see theinngest-events skill.
Weekly Installs
273
Repository
GitHub Stars
15
First Seen
Feb 17, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex262
opencode259
gemini-cli258
github-copilot258
amp257
kimi-cli257
xdrop 文件传输脚本:Bun 环境下安全上传下载工具,支持加密分享
20,700 周安装