convex-cron-jobs by waynesutton/convexskills
npx skills add https://github.com/waynesutton/convexskills --skill convex-cron-jobs在 Convex 应用中为后台任务、清理作业、数据同步和自动化工作流安排定期执行的函数。
在实施之前,请勿假设;请获取最新文档:
Convex cron jobs 允许您安排函数按固定间隔或特定时间运行。主要特性:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 每小时运行一次
crons.interval(
"cleanup expired sessions",
{ hours: 1 },
internal.tasks.cleanupExpiredSessions,
{}
);
// 每天 UTC 时间午夜运行
crons.cron(
"daily report",
"0 0 * * *",
internal.reports.generateDailyReport,
{}
);
export default crons;
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
使用 crons.interval 处理简单的重复任务:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 每 5 分钟
crons.interval(
"sync external data",
{ minutes: 5 },
internal.sync.fetchExternalData,
{}
);
// 每 2 小时
crons.interval(
"cleanup temp files",
{ hours: 2 },
internal.files.cleanupTempFiles,
{}
);
// 每 30 秒(最小间隔)
crons.interval(
"health check",
{ seconds: 30 },
internal.monitoring.healthCheck,
{}
);
export default crons;
使用 crons.cron 通过 cron 表达式进行精确调度:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 每天 UTC 时间上午 9 点
crons.cron(
"morning notifications",
"0 9 * * *",
internal.notifications.sendMorningDigest,
{}
);
// 每周一 UTC 时间上午 8 点
crons.cron(
"weekly summary",
"0 8 * * 1",
internal.reports.generateWeeklySummary,
{}
);
// 每月第一天午夜
crons.cron(
"monthly billing",
"0 0 1 * *",
internal.billing.processMonthlyBilling,
{}
);
// 每 15 分钟
crons.cron(
"frequent sync",
"*/15 * * * *",
internal.sync.syncData,
{}
);
export default crons;
┌───────────── 分钟 (0-59)
│ ┌───────────── 小时 (0-23)
│ │ ┌───────────── 月份中的日期 (1-31)
│ │ │ ┌───────────── 月份 (1-12)
│ │ │ │ ┌───────────── 星期几 (0-6, 周日=0)
│ │ │ │ │
* * * * *
常见模式:
* * * * * - 每分钟0 * * * * - 每小时0 0 * * * - 每天午夜0 0 * * 0 - 每周日午夜0 0 1 * * - 每月第一天*/5 * * * * - 每 5 分钟0 9-17 * * 1-5 - 周一至周五,上午 9 点到下午 5 点每小时出于安全考虑,Cron jobs 应调用内部函数:
// convex/tasks.ts
import { internalMutation, internalQuery } from "./_generated/server";
import { v } from "convex/values";
// 清理过期会话
export const cleanupExpiredSessions = internalMutation({
args: {},
returns: v.number(),
handler: async (ctx) => {
const oneHourAgo = Date.now() - 60 * 60 * 1000;
const expiredSessions = await ctx.db
.query("sessions")
.withIndex("by_lastActive")
.filter((q) => q.lt(q.field("lastActive"), oneHourAgo))
.collect();
for (const session of expiredSessions) {
await ctx.db.delete(session._id);
}
return expiredSessions.length;
},
});
// 处理待处理任务
export const processPendingTasks = internalMutation({
args: {},
returns: v.null(),
handler: async (ctx) => {
const pendingTasks = await ctx.db
.query("tasks")
.withIndex("by_status", (q) => q.eq("status", "pending"))
.take(100);
for (const task of pendingTasks) {
await ctx.db.patch(task._id, {
status: "processing",
startedAt: Date.now(),
});
// 安排实际处理
await ctx.scheduler.runAfter(0, internal.tasks.processTask, {
taskId: task._id,
});
}
return null;
},
});
向 cron jobs 传递静态参数:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 针对不同类型使用不同的清理间隔
crons.interval(
"cleanup temp files",
{ hours: 1 },
internal.cleanup.cleanupByType,
{ fileType: "temp", maxAge: 3600000 }
);
crons.interval(
"cleanup cache files",
{ hours: 24 },
internal.cleanup.cleanupByType,
{ fileType: "cache", maxAge: 86400000 }
);
export default crons;
// convex/cleanup.ts
import { internalMutation } from "./_generated/server";
import { v } from "convex/values";
export const cleanupByType = internalMutation({
args: {
fileType: v.string(),
maxAge: v.number(),
},
returns: v.number(),
handler: async (ctx, args) => {
const cutoff = Date.now() - args.maxAge;
const oldFiles = await ctx.db
.query("files")
.withIndex("by_type_and_created", (q) =>
q.eq("type", args.fileType).lt("createdAt", cutoff)
)
.collect();
for (const file of oldFiles) {
await ctx.storage.delete(file.storageId);
await ctx.db.delete(file._id);
}
return oldFiles.length;
},
});
添加日志记录以跟踪 cron job 执行情况:
// convex/tasks.ts
import { internalMutation } from "./_generated/server";
import { v } from "convex/values";
export const cleanupWithLogging = internalMutation({
args: {},
returns: v.null(),
handler: async (ctx) => {
const startTime = Date.now();
let processedCount = 0;
let errorCount = 0;
try {
const expiredItems = await ctx.db
.query("items")
.withIndex("by_expiresAt")
.filter((q) => q.lt(q.field("expiresAt"), Date.now()))
.collect();
for (const item of expiredItems) {
try {
await ctx.db.delete(item._id);
processedCount++;
} catch (error) {
errorCount++;
console.error(`Failed to delete item ${item._id}:`, error);
}
}
// 记录作业完成情况
await ctx.db.insert("cronLogs", {
jobName: "cleanup",
startTime,
endTime: Date.now(),
duration: Date.now() - startTime,
processedCount,
errorCount,
status: errorCount === 0 ? "success" : "partial",
});
} catch (error) {
// 记录作业失败情况
await ctx.db.insert("cronLogs", {
jobName: "cleanup",
startTime,
endTime: Date.now(),
duration: Date.now() - startTime,
processedCount,
errorCount,
status: "failed",
error: String(error),
});
throw error;
}
return null;
},
});
分批处理大型数据集以避免超时:
// convex/tasks.ts
import { internalMutation } from "./_generated/server";
import { internal } from "./_generated/api";
import { v } from "convex/values";
const BATCH_SIZE = 100;
export const processBatch = internalMutation({
args: {
cursor: v.optional(v.string()),
},
returns: v.null(),
handler: async (ctx, args) => {
const result = await ctx.db
.query("items")
.withIndex("by_status", (q) => q.eq("status", "pending"))
.paginate({ numItems: BATCH_SIZE, cursor: args.cursor ?? null });
for (const item of result.page) {
await ctx.db.patch(item._id, {
status: "processed",
processedAt: Date.now(),
});
}
// 如果还有更多项目,安排下一批处理
if (!result.isDone) {
await ctx.scheduler.runAfter(0, internal.tasks.processBatch, {
cursor: result.continueCursor,
});
}
return null;
},
});
使用 actions 进行外部 API 调用:
// convex/sync.ts
"use node";
import { internalAction } from "./_generated/server";
import { internal } from "./_generated/api";
import { v } from "convex/values";
export const syncExternalData = internalAction({
args: {},
returns: v.null(),
handler: async (ctx) => {
// 从外部 API 获取数据
const response = await fetch("https://api.example.com/data", {
headers: {
Authorization: `Bearer ${process.env.API_KEY}`,
},
});
if (!response.ok) {
throw new Error(`API request failed: ${response.status}`);
}
const data = await response.json();
// 使用 mutation 存储数据
await ctx.runMutation(internal.sync.storeExternalData, {
data,
syncedAt: Date.now(),
});
return null;
},
});
export const storeExternalData = internalMutation({
args: {
data: v.any(),
syncedAt: v.number(),
},
returns: v.null(),
handler: async (ctx, args) => {
await ctx.db.insert("externalData", {
data: args.data,
syncedAt: args.syncedAt,
});
return null;
},
});
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
crons.interval(
"sync external data",
{ minutes: 15 },
internal.sync.syncExternalData,
{}
);
export default crons;
// convex/schema.ts
import { defineSchema, defineTable } from "convex/server";
import { v } from "convex/values";
export default defineSchema({
cronLogs: defineTable({
jobName: v.string(),
startTime: v.number(),
endTime: v.number(),
duration: v.number(),
processedCount: v.number(),
errorCount: v.number(),
status: v.union(
v.literal("success"),
v.literal("partial"),
v.literal("failed")
),
error: v.optional(v.string()),
})
.index("by_job", ["jobName"])
.index("by_status", ["status"])
.index("by_startTime", ["startTime"]),
sessions: defineTable({
userId: v.id("users"),
token: v.string(),
lastActive: v.number(),
expiresAt: v.number(),
})
.index("by_user", ["userId"])
.index("by_lastActive", ["lastActive"])
.index("by_expiresAt", ["expiresAt"]),
tasks: defineTable({
type: v.string(),
status: v.union(
v.literal("pending"),
v.literal("processing"),
v.literal("completed"),
v.literal("failed")
),
data: v.any(),
createdAt: v.number(),
startedAt: v.optional(v.number()),
completedAt: v.optional(v.number()),
})
.index("by_status", ["status"])
.index("by_type_and_status", ["type", "status"]),
});
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 清理作业
crons.interval(
"cleanup expired sessions",
{ hours: 1 },
internal.cleanup.expiredSessions,
{}
);
crons.interval(
"cleanup old logs",
{ hours: 24 },
internal.cleanup.oldLogs,
{ maxAgeDays: 30 }
);
// 同步作业
crons.interval(
"sync user data",
{ minutes: 15 },
internal.sync.userData,
{}
);
// 报告作业
crons.cron(
"daily analytics",
"0 1 * * *",
internal.reports.dailyAnalytics,
{}
);
crons.cron(
"weekly summary",
"0 9 * * 1",
internal.reports.weeklySummary,
{}
);
// 健康检查
crons.interval(
"service health check",
{ minutes: 5 },
internal.monitoring.healthCheck,
{}
);
export default crons;
npx convex deploycrons.interval 或 crons.cron 方法,不要使用已弃用的辅助函数_generated/api 导入 internalcrons.hourly、crons.daily 等每周安装量
1.5K
代码仓库
GitHub Stars
384
首次出现
Jan 24, 2026
安全审计
安装于
codex1.0K
cursor1.0K
claude-code1.0K
opencode982
github-copilot960
gemini-cli812
Schedule recurring functions for background tasks, cleanup jobs, data syncing, and automated workflows in Convex applications.
Before implementing, do not assume; fetch the latest documentation:
Convex cron jobs allow you to schedule functions to run at regular intervals or specific times. Key features:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// Run every hour
crons.interval(
"cleanup expired sessions",
{ hours: 1 },
internal.tasks.cleanupExpiredSessions,
{}
);
// Run every day at midnight UTC
crons.cron(
"daily report",
"0 0 * * *",
internal.reports.generateDailyReport,
{}
);
export default crons;
Use crons.interval for simple recurring tasks:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// Every 5 minutes
crons.interval(
"sync external data",
{ minutes: 5 },
internal.sync.fetchExternalData,
{}
);
// Every 2 hours
crons.interval(
"cleanup temp files",
{ hours: 2 },
internal.files.cleanupTempFiles,
{}
);
// Every 30 seconds (minimum interval)
crons.interval(
"health check",
{ seconds: 30 },
internal.monitoring.healthCheck,
{}
);
export default crons;
Use crons.cron for precise scheduling with cron expressions:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// Every day at 9 AM UTC
crons.cron(
"morning notifications",
"0 9 * * *",
internal.notifications.sendMorningDigest,
{}
);
// Every Monday at 8 AM UTC
crons.cron(
"weekly summary",
"0 8 * * 1",
internal.reports.generateWeeklySummary,
{}
);
// First day of every month at midnight
crons.cron(
"monthly billing",
"0 0 1 * *",
internal.billing.processMonthlyBilling,
{}
);
// Every 15 minutes
crons.cron(
"frequent sync",
"*/15 * * * *",
internal.sync.syncData,
{}
);
export default crons;
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
Common patterns:
* * * * * - Every minute0 * * * * - Every hour0 0 * * * - Every day at midnight0 0 * * 0 - Every Sunday at midnight0 0 1 * * - First day of every month*/5 * * * * - Every 5 minutes0 9-17 * * 1-5 - Every hour from 9 AM to 5 PM, Monday through FridayCron jobs should call internal functions for security:
// convex/tasks.ts
import { internalMutation, internalQuery } from "./_generated/server";
import { v } from "convex/values";
// Cleanup expired sessions
export const cleanupExpiredSessions = internalMutation({
args: {},
returns: v.number(),
handler: async (ctx) => {
const oneHourAgo = Date.now() - 60 * 60 * 1000;
const expiredSessions = await ctx.db
.query("sessions")
.withIndex("by_lastActive")
.filter((q) => q.lt(q.field("lastActive"), oneHourAgo))
.collect();
for (const session of expiredSessions) {
await ctx.db.delete(session._id);
}
return expiredSessions.length;
},
});
// Process pending tasks
export const processPendingTasks = internalMutation({
args: {},
returns: v.null(),
handler: async (ctx) => {
const pendingTasks = await ctx.db
.query("tasks")
.withIndex("by_status", (q) => q.eq("status", "pending"))
.take(100);
for (const task of pendingTasks) {
await ctx.db.patch(task._id, {
status: "processing",
startedAt: Date.now(),
});
// Schedule the actual processing
await ctx.scheduler.runAfter(0, internal.tasks.processTask, {
taskId: task._id,
});
}
return null;
},
});
Pass static arguments to cron jobs:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// Different cleanup intervals for different types
crons.interval(
"cleanup temp files",
{ hours: 1 },
internal.cleanup.cleanupByType,
{ fileType: "temp", maxAge: 3600000 }
);
crons.interval(
"cleanup cache files",
{ hours: 24 },
internal.cleanup.cleanupByType,
{ fileType: "cache", maxAge: 86400000 }
);
export default crons;
// convex/cleanup.ts
import { internalMutation } from "./_generated/server";
import { v } from "convex/values";
export const cleanupByType = internalMutation({
args: {
fileType: v.string(),
maxAge: v.number(),
},
returns: v.number(),
handler: async (ctx, args) => {
const cutoff = Date.now() - args.maxAge;
const oldFiles = await ctx.db
.query("files")
.withIndex("by_type_and_created", (q) =>
q.eq("type", args.fileType).lt("createdAt", cutoff)
)
.collect();
for (const file of oldFiles) {
await ctx.storage.delete(file.storageId);
await ctx.db.delete(file._id);
}
return oldFiles.length;
},
});
Add logging to track cron job execution:
// convex/tasks.ts
import { internalMutation } from "./_generated/server";
import { v } from "convex/values";
export const cleanupWithLogging = internalMutation({
args: {},
returns: v.null(),
handler: async (ctx) => {
const startTime = Date.now();
let processedCount = 0;
let errorCount = 0;
try {
const expiredItems = await ctx.db
.query("items")
.withIndex("by_expiresAt")
.filter((q) => q.lt(q.field("expiresAt"), Date.now()))
.collect();
for (const item of expiredItems) {
try {
await ctx.db.delete(item._id);
processedCount++;
} catch (error) {
errorCount++;
console.error(`Failed to delete item ${item._id}:`, error);
}
}
// Log job completion
await ctx.db.insert("cronLogs", {
jobName: "cleanup",
startTime,
endTime: Date.now(),
duration: Date.now() - startTime,
processedCount,
errorCount,
status: errorCount === 0 ? "success" : "partial",
});
} catch (error) {
// Log job failure
await ctx.db.insert("cronLogs", {
jobName: "cleanup",
startTime,
endTime: Date.now(),
duration: Date.now() - startTime,
processedCount,
errorCount,
status: "failed",
error: String(error),
});
throw error;
}
return null;
},
});
Handle large datasets in batches to avoid timeouts:
// convex/tasks.ts
import { internalMutation } from "./_generated/server";
import { internal } from "./_generated/api";
import { v } from "convex/values";
const BATCH_SIZE = 100;
export const processBatch = internalMutation({
args: {
cursor: v.optional(v.string()),
},
returns: v.null(),
handler: async (ctx, args) => {
const result = await ctx.db
.query("items")
.withIndex("by_status", (q) => q.eq("status", "pending"))
.paginate({ numItems: BATCH_SIZE, cursor: args.cursor ?? null });
for (const item of result.page) {
await ctx.db.patch(item._id, {
status: "processed",
processedAt: Date.now(),
});
}
// Schedule next batch if there are more items
if (!result.isDone) {
await ctx.scheduler.runAfter(0, internal.tasks.processBatch, {
cursor: result.continueCursor,
});
}
return null;
},
});
Use actions for external API calls:
// convex/sync.ts
"use node";
import { internalAction } from "./_generated/server";
import { internal } from "./_generated/api";
import { v } from "convex/values";
export const syncExternalData = internalAction({
args: {},
returns: v.null(),
handler: async (ctx) => {
// Fetch from external API
const response = await fetch("https://api.example.com/data", {
headers: {
Authorization: `Bearer ${process.env.API_KEY}`,
},
});
if (!response.ok) {
throw new Error(`API request failed: ${response.status}`);
}
const data = await response.json();
// Store the data using a mutation
await ctx.runMutation(internal.sync.storeExternalData, {
data,
syncedAt: Date.now(),
});
return null;
},
});
export const storeExternalData = internalMutation({
args: {
data: v.any(),
syncedAt: v.number(),
},
returns: v.null(),
handler: async (ctx, args) => {
await ctx.db.insert("externalData", {
data: args.data,
syncedAt: args.syncedAt,
});
return null;
},
});
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
crons.interval(
"sync external data",
{ minutes: 15 },
internal.sync.syncExternalData,
{}
);
export default crons;
// convex/schema.ts
import { defineSchema, defineTable } from "convex/server";
import { v } from "convex/values";
export default defineSchema({
cronLogs: defineTable({
jobName: v.string(),
startTime: v.number(),
endTime: v.number(),
duration: v.number(),
processedCount: v.number(),
errorCount: v.number(),
status: v.union(
v.literal("success"),
v.literal("partial"),
v.literal("failed")
),
error: v.optional(v.string()),
})
.index("by_job", ["jobName"])
.index("by_status", ["status"])
.index("by_startTime", ["startTime"]),
sessions: defineTable({
userId: v.id("users"),
token: v.string(),
lastActive: v.number(),
expiresAt: v.number(),
})
.index("by_user", ["userId"])
.index("by_lastActive", ["lastActive"])
.index("by_expiresAt", ["expiresAt"]),
tasks: defineTable({
type: v.string(),
status: v.union(
v.literal("pending"),
v.literal("processing"),
v.literal("completed"),
v.literal("failed")
),
data: v.any(),
createdAt: v.number(),
startedAt: v.optional(v.number()),
completedAt: v.optional(v.number()),
})
.index("by_status", ["status"])
.index("by_type_and_status", ["type", "status"]),
});
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// Cleanup jobs
crons.interval(
"cleanup expired sessions",
{ hours: 1 },
internal.cleanup.expiredSessions,
{}
);
crons.interval(
"cleanup old logs",
{ hours: 24 },
internal.cleanup.oldLogs,
{ maxAgeDays: 30 }
);
// Sync jobs
crons.interval(
"sync user data",
{ minutes: 15 },
internal.sync.userData,
{}
);
// Report jobs
crons.cron(
"daily analytics",
"0 1 * * *",
internal.reports.dailyAnalytics,
{}
);
crons.cron(
"weekly summary",
"0 9 * * 1",
internal.reports.weeklySummary,
{}
);
// Health checks
crons.interval(
"service health check",
{ minutes: 5 },
internal.monitoring.healthCheck,
{}
);
export default crons;
npx convex deploy unless explicitly instructedcrons.interval or crons.cron methods, not deprecated helpersinternal from _generated/api even for functions in the same filecrons.hourly, crons.daily, etc.Weekly Installs
1.5K
Repository
GitHub Stars
384
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex1.0K
cursor1.0K
claude-code1.0K
opencode982
github-copilot960
gemini-cli812
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
102,200 周安装
AI代码审查工具 - 自动化安全漏洞检测与代码质量分析 | 支持多领域检查清单
1,200 周安装
AI智能体长期记忆系统 - 精英级架构,融合6种方法,永不丢失上下文
1,200 周安装
AI新闻播客制作技能:实时新闻转对话式播客脚本与音频生成
1,200 周安装
Word文档处理器:DOCX创建、编辑、分析与修订痕迹处理全指南 | 自动化办公解决方案
1,200 周安装
React Router 框架模式指南:全栈开发、文件路由、数据加载与渲染策略
1,200 周安装
Nano Banana AI 图像生成工具:使用 Gemini 3 Pro 生成与编辑高分辨率图像
1,200 周安装