inngest-events by inngest/inngest-skills
npx skills add https://github.com/inngest/inngest-skills --skill inngest-events掌握 Inngest 事件的设计与交付模式。事件是 Inngest 的基础 - 学习如何设计健壮的事件模式、实现幂等性、利用扇出模式以及有效处理系统事件。
这些技能主要针对 TypeScript。 对于 Python 或 Go,请参考 Inngest 文档 获取特定语言的指导。核心概念适用于所有语言。
每个 Inngest 事件都是一个包含必需和可选属性的 JSON 对象:
type Event = {
name: string; // 事件类型(触发函数)
data: object; // 负载数据(任何嵌套的 JSON)
};
type EventPayload = {
name: string; // 必需:事件类型
data: Record<string, any>; // 必需:事件数据
id?: string; // 可选:去重 ID
ts?: number; // 可选:时间戳(Unix 毫秒)
v?: string; // 可选:模式版本
};
await inngest.send({
name: "billing/invoice.paid",
data: {
customerId: "cus_NffrFeUfNV2Hib",
invoiceId: "in_1J5g2n2eZvKYlo2C0Z1Z2Z3Z",
userId: "user_03028hf09j2d02",
amount: 1000,
metadata: {
accountId: "acct_1J5g2n2eZvKYlo2C0Z1Z2Z3Z",
accountName: "Acme.ai"
}
}
});
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
使用对象-动作模式: 域/名词.动词
// ✅ 良好:清晰的对象-动作模式
"billing/invoice.paid";
"user/profile.updated";
"order/item.shipped";
"ai/summary.completed";
// ✅ 良好:用于组织的域前缀
"stripe/customer.created";
"intercom/conversation.assigned";
"slack/message.posted";
// ❌ 避免:不清晰或不一致
"payment"; // 发生了什么?
"user_update"; // 使用点号,而非下划线
"invoiceWasPaid"; // 过于冗长
created、updated、failed)billing/invoice.paid)api/user.created、webhook/stripe.received)何时使用 ID: 当事件可能被多次发送时,防止重复处理。
await inngest.send({
id: "cart-checkout-completed-ed12c8bde", // 每个事件类型唯一
name: "storefront/cart.checkout.completed",
data: {
cartId: "ed12c8bde",
items: ["item1", "item2"]
}
});
// ✅ 良好:特定于事件类型和实例
id: `invoice-paid-${invoiceId}`;
id: `user-signup-${userId}-${timestamp}`;
id: `order-shipped-${orderId}-${trackingNumber}`;
// ❌ 不佳:跨事件类型共享的通用 ID
id: invoiceId; // 可能与其他事件冲突
id: "user-action"; // 过于通用
id: customerId; // 同一客户,不同事件
去重窗口: 自首次接收事件起 24 小时
有关幂等性配置,请参阅 inngest-durable-functions。
ts 参数何时使用: 为未来处理调度事件或维持事件顺序。
const oneHourFromNow = Date.now() + 60 * 60 * 1000;
await inngest.send({
name: "trial/reminder.send",
ts: oneHourFromNow, // 1 小时后交付
data: {
userId: "user_123",
trialExpiresAt: "2024-02-15T12:00:00Z"
}
});
// 带有时间戳的事件按时间顺序处理
const events = [
{
name: "user/action.performed",
ts: 1640995200000, // 较早
data: { action: "login" }
},
{
name: "user/action.performed",
ts: 1640995260000, // 较晚
data: { action: "purchase" }
}
];
await inngest.send(events);
使用场景: 一个事件触发多个独立函数,以实现可靠性和并行处理。
// 发送单个事件
await inngest.send({
name: "user/signup.completed",
data: {
userId: "user_123",
email: "user@example.com",
plan: "pro"
}
});
// 多个函数响应同一事件
const sendWelcomeEmail = inngest.createFunction(
{ id: "send-welcome-email", triggers: [{ event: "user/signup.completed" }] },
async ({ event, step }) => {
await step.run("send-email", async () => {
return sendEmail({
to: event.data.email,
template: "welcome"
});
});
}
);
const createTrialSubscription = inngest.createFunction(
{ id: "create-trial", triggers: [{ event: "user/signup.completed" }] },
async ({ event, step }) => {
await step.run("create-subscription", async () => {
return stripe.subscriptions.create({
customer: event.data.stripeCustomerId,
trial_period_days: 14
});
});
}
);
const addToCrm = inngest.createFunction(
{ id: "add-to-crm", triggers: [{ event: "user/signup.completed" }] },
async ({ event, step }) => {
await step.run("crm-sync", async () => {
return crm.contacts.create({
email: event.data.email,
plan: event.data.plan
});
});
}
);
waitForEvent 的高级扇出在表达式中,event = 原始 触发事件,async = 正在匹配的 新 事件。完整细节请参阅 表达式语法参考。
const orchestrateOnboarding = inngest.createFunction(
{ id: "orchestrate-onboarding", triggers: [{ event: "user/signup.completed" }] },
async ({ event, step }) => {
// 扇出到多个服务
await step.sendEvent("fan-out", [
{ name: "email/welcome.send", data: event.data },
{ name: "subscription/trial.create", data: event.data },
{ name: "crm/contact.add", data: event.data }
]);
// 等待所有完成
const [emailResult, subResult, crmResult] = await Promise.all([
step.waitForEvent("email-sent", {
event: "email/welcome.sent",
timeout: "5m",
if: `event.data.userId == async.data.userId`
}),
step.waitForEvent("subscription-created", {
event: "subscription/trial.created",
timeout: "5m",
if: `event.data.userId == async.data.userId`
}),
step.waitForEvent("crm-synced", {
event: "crm/contact.added",
timeout: "5m",
if: `event.data.userId == async.data.userId`
})
]);
// 完成用户引导
await step.run("complete-onboarding", async () => {
return completeUserOnboarding(event.data.userId);
});
}
);
有关包括 step.invoke 在内的其他模式,请参阅 inngest-steps。
Inngest 会发出用于函数生命周期监控的系统事件:
// 函数执行事件
"inngest/function.failed"; // 函数在重试后失败
"inngest/function.finished"; // 函数完成 - 成功或失败
"inngest/function.cancelled"; // 函数在完成前被取消
const handleFailures = inngest.createFunction(
{ id: "handle-failed-functions", triggers: [{ event: "inngest/function.failed" }] },
async ({ event, step }) => {
const { function_id, run_id, error } = event.data;
await step.run("log-failure", async () => {
logger.error("Function failed", {
functionId: function_id,
runId: run_id,
error: error.message,
stack: error.stack
});
});
// 对关键函数失败发出警报
if (function_id.includes("critical")) {
await step.run("send-alert", async () => {
return alerting.sendAlert({
title: `Critical function failed: ${function_id}`,
severity: "high",
runId: run_id
});
});
}
// 自动重试某些失败
if (error.code === "RATE_LIMIT_EXCEEDED") {
await step.run("schedule-retry", async () => {
return inngest.send({
name: "retry/function.requested",
ts: Date.now() + 5 * 60 * 1000, // 5 分钟后重试
data: { originalRunId: run_id }
});
});
}
}
);
// inngest/client.ts
import { Inngest } from "inngest";
export const inngest = new Inngest({
id: "my-app"
});
// 在生产环境中,您必须设置 INNGEST_EVENT_KEY 环境变量
const result = await inngest.send({
name: "order/placed",
data: {
orderId: "ord_123",
customerId: "cus_456",
amount: 2500,
items: [
{ id: "item_1", quantity: 2 },
{ id: "item_2", quantity: 1 }
]
}
});
// 返回用于跟踪的事件 ID
console.log(result.ids); // ["01HQ8PTAESBZPBDS8JTRZZYY3S"]
const orderItems = await getOrderItems(orderId);
// 转换为事件
const events = orderItems.map((item) => ({
name: "inventory/item.reserved",
data: {
itemId: item.id,
orderId: orderId,
quantity: item.quantity,
warehouseId: item.warehouseId
}
}));
// 一次性发送所有事件(最多 512kb)
await inngest.send(events);
inngest.createFunction(
{ id: "process-order", triggers: [{ event: "order/placed" }] },
async ({ event, step }) => {
// 在函数中,使用 step.sendEvent() 而非 inngest.send()
// 以提高可靠性和去重
await step.sendEvent("trigger-fulfillment", {
name: "fulfillment/order.received",
data: {
orderId: event.data.orderId,
priority: event.data.customerTier === "premium" ? "high" : "normal"
}
});
}
);
// 使用版本字段跟踪模式变更
await inngest.send({
name: "user/profile.updated",
v: "2024-01-15.1", // 模式版本
data: {
userId: "user_123",
changes: {
email: "new@example.com",
preferences: { theme: "dark" }
},
// v2 模式中的新字段
auditInfo: {
changedBy: "user_456",
reason: "user_requested"
}
}
});
// 为所有消费者包含足够的上下文
await inngest.send({
name: "payment/charge.succeeded",
data: {
// 主要标识符
chargeId: "ch_123",
customerId: "cus_456",
// 金额详情
amount: 2500,
currency: "usd",
// 针对不同消费者的上下文
subscription: {
id: "sub_789",
plan: "pro_monthly"
},
invoice: {
id: "inv_012",
number: "INV-2024-001"
},
// 用于调试的元数据
paymentMethod: {
type: "card",
last4: "4242",
brand: "visa"
},
metadata: {
source: "stripe_webhook",
environment: "production"
}
}
});
事件设计原则:
每周安装量
282
代码库
GitHub 星标数
15
首次出现
2026年2月17日
安全审计
安装于
codex271
opencode268
gemini-cli267
github-copilot267
amp265
kimi-cli265
Master Inngest event design and delivery patterns. Events are the foundation of Inngest - learn to design robust event schemas, implement idempotency, leverage fan-out patterns, and handle system events effectively.
These skills are focused on TypeScript. For Python or Go, refer to the Inngest documentation for language-specific guidance. Core concepts apply across all languages.
Every Inngest event is a JSON object with required and optional properties:
type Event = {
name: string; // Event type (triggers functions)
data: object; // Payload data (any nested JSON)
};
type EventPayload = {
name: string; // Required: event type
data: Record<string, any>; // Required: event data
id?: string; // Optional: deduplication ID
ts?: number; // Optional: timestamp (Unix ms)
v?: string; // Optional: schema version
};
await inngest.send({
name: "billing/invoice.paid",
data: {
customerId: "cus_NffrFeUfNV2Hib",
invoiceId: "in_1J5g2n2eZvKYlo2C0Z1Z2Z3Z",
userId: "user_03028hf09j2d02",
amount: 1000,
metadata: {
accountId: "acct_1J5g2n2eZvKYlo2C0Z1Z2Z3Z",
accountName: "Acme.ai"
}
}
});
Use the Object-Action pattern: domain/noun.verb
// ✅ Good: Clear object-action pattern
"billing/invoice.paid";
"user/profile.updated";
"order/item.shipped";
"ai/summary.completed";
// ✅ Good: Domain prefixes for organization
"stripe/customer.created";
"intercom/conversation.assigned";
"slack/message.posted";
// ❌ Avoid: Unclear or inconsistent
"payment"; // What happened?
"user_update"; // Use dots, not underscores
"invoiceWasPaid"; // Too verbose
created, updated, failed)billing/invoice.paid)api/user.created, webhook/stripe.received)When to use IDs: Prevent duplicate processing when events might be sent multiple times.
await inngest.send({
id: "cart-checkout-completed-ed12c8bde", // Unique per event type
name: "storefront/cart.checkout.completed",
data: {
cartId: "ed12c8bde",
items: ["item1", "item2"]
}
});
// ✅ Good: Specific to event type and instance
id: `invoice-paid-${invoiceId}`;
id: `user-signup-${userId}-${timestamp}`;
id: `order-shipped-${orderId}-${trackingNumber}`;
// ❌ Bad: Generic IDs shared across event types
id: invoiceId; // Could conflict with other events
id: "user-action"; // Too generic
id: customerId; // Same customer, different events
Deduplication window: 24 hours from first event reception
See inngest-durable-functions for idempotency configuration.
ts Parameter for Delayed DeliveryWhen to use: Schedule events for future processing or maintain event ordering.
const oneHourFromNow = Date.now() + 60 * 60 * 1000;
await inngest.send({
name: "trial/reminder.send",
ts: oneHourFromNow, // Deliver in 1 hour
data: {
userId: "user_123",
trialExpiresAt: "2024-02-15T12:00:00Z"
}
});
// Events with timestamps are processed in chronological order
const events = [
{
name: "user/action.performed",
ts: 1640995200000, // Earlier
data: { action: "login" }
},
{
name: "user/action.performed",
ts: 1640995260000, // Later
data: { action: "purchase" }
}
];
await inngest.send(events);
Use case: One event triggers multiple independent functions for reliability and parallel processing.
// Send single event
await inngest.send({
name: "user/signup.completed",
data: {
userId: "user_123",
email: "user@example.com",
plan: "pro"
}
});
// Multiple functions respond to same event
const sendWelcomeEmail = inngest.createFunction(
{ id: "send-welcome-email", triggers: [{ event: "user/signup.completed" }] },
async ({ event, step }) => {
await step.run("send-email", async () => {
return sendEmail({
to: event.data.email,
template: "welcome"
});
});
}
);
const createTrialSubscription = inngest.createFunction(
{ id: "create-trial", triggers: [{ event: "user/signup.completed" }] },
async ({ event, step }) => {
await step.run("create-subscription", async () => {
return stripe.subscriptions.create({
customer: event.data.stripeCustomerId,
trial_period_days: 14
});
});
}
);
const addToCrm = inngest.createFunction(
{ id: "add-to-crm", triggers: [{ event: "user/signup.completed" }] },
async ({ event, step }) => {
await step.run("crm-sync", async () => {
return crm.contacts.create({
email: event.data.email,
plan: event.data.plan
});
});
}
);
waitForEventIn expressions, event = the original triggering event, async = the new event being matched. See Expression Syntax Reference for full details.
const orchestrateOnboarding = inngest.createFunction(
{ id: "orchestrate-onboarding", triggers: [{ event: "user/signup.completed" }] },
async ({ event, step }) => {
// Fan out to multiple services
await step.sendEvent("fan-out", [
{ name: "email/welcome.send", data: event.data },
{ name: "subscription/trial.create", data: event.data },
{ name: "crm/contact.add", data: event.data }
]);
// Wait for all to complete
const [emailResult, subResult, crmResult] = await Promise.all([
step.waitForEvent("email-sent", {
event: "email/welcome.sent",
timeout: "5m",
if: `event.data.userId == async.data.userId`
}),
step.waitForEvent("subscription-created", {
event: "subscription/trial.created",
timeout: "5m",
if: `event.data.userId == async.data.userId`
}),
step.waitForEvent("crm-synced", {
event: "crm/contact.added",
timeout: "5m",
if: `event.data.userId == async.data.userId`
})
]);
// Complete onboarding
await step.run("complete-onboarding", async () => {
return completeUserOnboarding(event.data.userId);
});
}
);
See inngest-steps for additional patterns including step.invoke.
Inngest emits system events for function lifecycle monitoring:
// Function execution events
"inngest/function.failed"; // Function failed after retries
"inngest/function.finished"; // Function finished - completed or failed
"inngest/function.cancelled"; // Function cancelled before completion
const handleFailures = inngest.createFunction(
{ id: "handle-failed-functions", triggers: [{ event: "inngest/function.failed" }] },
async ({ event, step }) => {
const { function_id, run_id, error } = event.data;
await step.run("log-failure", async () => {
logger.error("Function failed", {
functionId: function_id,
runId: run_id,
error: error.message,
stack: error.stack
});
});
// Alert on critical function failures
if (function_id.includes("critical")) {
await step.run("send-alert", async () => {
return alerting.sendAlert({
title: `Critical function failed: ${function_id}`,
severity: "high",
runId: run_id
});
});
}
// Auto-retry certain failures
if (error.code === "RATE_LIMIT_EXCEEDED") {
await step.run("schedule-retry", async () => {
return inngest.send({
name: "retry/function.requested",
ts: Date.now() + 5 * 60 * 1000, // Retry in 5 minutes
data: { originalRunId: run_id }
});
});
}
}
);
// inngest/client.ts
import { Inngest } from "inngest";
export const inngest = new Inngest({
id: "my-app"
});
// You must set INNGEST_EVENT_KEY environment variable in production
const result = await inngest.send({
name: "order/placed",
data: {
orderId: "ord_123",
customerId: "cus_456",
amount: 2500,
items: [
{ id: "item_1", quantity: 2 },
{ id: "item_2", quantity: 1 }
]
}
});
// Returns event IDs for tracking
console.log(result.ids); // ["01HQ8PTAESBZPBDS8JTRZZYY3S"]
const orderItems = await getOrderItems(orderId);
// Convert to events
const events = orderItems.map((item) => ({
name: "inventory/item.reserved",
data: {
itemId: item.id,
orderId: orderId,
quantity: item.quantity,
warehouseId: item.warehouseId
}
}));
// Send all at once (up to 512kb)
await inngest.send(events);
inngest.createFunction(
{ id: "process-order", triggers: [{ event: "order/placed" }] },
async ({ event, step }) => {
// Use step.sendEvent() instead of inngest.send() in functions
// for reliability and deduplication
await step.sendEvent("trigger-fulfillment", {
name: "fulfillment/order.received",
data: {
orderId: event.data.orderId,
priority: event.data.customerTier === "premium" ? "high" : "normal"
}
});
}
);
// Use version field to track schema changes
await inngest.send({
name: "user/profile.updated",
v: "2024-01-15.1", // Schema version
data: {
userId: "user_123",
changes: {
email: "new@example.com",
preferences: { theme: "dark" }
},
// New field in v2 schema
auditInfo: {
changedBy: "user_456",
reason: "user_requested"
}
}
});
// Include enough context for all consumers
await inngest.send({
name: "payment/charge.succeeded",
data: {
// Primary identifiers
chargeId: "ch_123",
customerId: "cus_456",
// Amount details
amount: 2500,
currency: "usd",
// Context for different consumers
subscription: {
id: "sub_789",
plan: "pro_monthly"
},
invoice: {
id: "inv_012",
number: "INV-2024-001"
},
// Metadata for debugging
paymentMethod: {
type: "card",
last4: "4242",
brand: "visa"
},
metadata: {
source: "stripe_webhook",
environment: "production"
}
}
});
Event design principles:
Weekly Installs
282
Repository
GitHub Stars
15
First Seen
Feb 17, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex271
opencode268
gemini-cli267
github-copilot267
amp265
kimi-cli265
Skills CLI 使用指南:AI Agent 技能包管理器安装与管理教程
27,400 周安装