重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
etl-sync-job-builder by patricio0312rev/skills
npx skills add https://github.com/patricio0312rev/skills --skill etl-sync-job-builder构建可靠、增量式的数据同步管道。
// jobs/sync-users.ts
interface SyncJob {
name: string;
source: "database" | "api" | "file";
destination: "database" | "warehouse" | "s3";
schedule: string;
}
export class ETLJob {
constructor(private name: string, private watermarkKey: string) {}
async run() {
console.log(`🔄 Starting ${this.name}...`);
try {
// 1. 获取上次水位标记
const lastSync = await this.getWatermark();
console.log(` Last sync: ${lastSync}`);
// 2. 提取数据
const data = await this.extract(lastSync);
console.log(` Extracted ${data.length} records`);
// 3. 转换数据
const transformed = await this.transform(data);
// 4. 加载数据
await this.load(transformed);
// 5. 更新水位标记
await this.updateWatermark(new Date());
console.log(`✅ ${this.name} complete`);
} catch (error) {
console.error(`❌ ${this.name} failed:`, error);
throw error;
}
}
private async extract(since: Date) {
// 提取逻辑
return [];
}
private async transform(data: any[]) {
// 转换逻辑
return data;
}
private async load(data: any[]) {
// 加载逻辑
}
private async getWatermark(): Promise<Date> {
const watermark = await prisma.syncWatermark.findUnique({
where: { key: this.watermarkKey },
});
return watermark?.lastSync || new Date(0);
}
private async updateWatermark(timestamp: Date) {
await prisma.syncWatermark.upsert({
where: { key: this.watermarkKey },
create: { key: this.watermarkKey, lastSync: timestamp },
update: { lastSync: timestamp },
});
}
}
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
// 跟踪同步进度
model SyncWatermark {
key String @id
lastSync DateTime
metadata Json?
@@index([lastSync])
}
// 使用水位标记进行增量同步
async function syncOrdersIncremental() {
// 获取上次同步时间
const watermark = await prisma.syncWatermark.findUnique({
where: { key: "orders_sync" },
});
const lastSync = watermark?.lastSync || new Date(0);
// 仅获取新增/更新的记录
const newOrders = await sourceDb.order.findMany({
where: {
updated_at: { gt: lastSync },
},
orderBy: { updated_at: "asc" },
});
console.log(`📦 Syncing ${newOrders.length} orders...`);
// 分批处理
for (let i = 0; i < newOrders.length; i += 100) {
const batch = newOrders.slice(i, i + 100);
await destinationDb.order.createMany({
data: batch,
skipDuplicates: true, // 幂等性
});
}
// 将水位标记更新到最新记录
if (newOrders.length > 0) {
const latestTimestamp = newOrders[newOrders.length - 1].updated_at;
await prisma.syncWatermark.upsert({
where: { key: "orders_sync" },
create: { key: "orders_sync", lastSync: latestTimestamp },
update: { lastSync: latestTimestamp },
});
}
console.log(`✅ Sync complete`);
}
// 幂等同步 - 可安全运行多次
async function syncUsersIdempotent(users: User[]) {
for (const user of users) {
await prisma.user.upsert({
where: { id: user.id },
create: user,
update: {
email: user.email,
name: user.name,
updated_at: user.updated_at,
},
});
}
}
// 批量更新插入以获得更好的性能
async function syncUsersBatch(users: User[]) {
// PostgreSQL: 使用 ON CONFLICT
await prisma.$executeRaw`
INSERT INTO users (id, email, name, updated_at)
SELECT * FROM UNNEST(
${users.map((u) => u.id)}::bigint[],
${users.map((u) => u.email)}::text[],
${users.map((u) => u.name)}::text[],
${users.map((u) => u.updated_at)}::timestamp[]
)
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
name = EXCLUDED.name,
updated_at = EXCLUDED.updated_at
WHERE users.updated_at < EXCLUDED.updated_at
`;
}
async function syncWithRetry<T>(
operation: () => Promise<T>,
maxRetries: number = 3,
baseDelay: number = 1000
): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
if (attempt === maxRetries) throw error;
const delay = baseDelay * Math.pow(2, attempt);
console.log(` Retry ${attempt + 1}/${maxRetries} after ${delay}ms`);
await sleep(delay);
}
}
throw new Error("Max retries exceeded");
}
// 用法
await syncWithRetry(
async () => {
return await syncOrders();
},
3,
1000
);
// 监听数据库变更
import { PrismaClient } from "@prisma/client";
const prisma = new PrismaClient();
// PostgreSQL: 监听逻辑复制
async function setupCDC() {
await prisma.$executeRaw`
CREATE PUBLICATION orders_publication FOR TABLE orders;
`;
// 订阅变更(使用 pg 库)
const client = await pg.connect();
client.query("LISTEN orders_changed;");
client.on("notification", async (msg) => {
const change = JSON.parse(msg.payload);
if (change.operation === "INSERT" || change.operation === "UPDATE") {
await syncOrder(change.data);
}
});
}
interface ConflictResolution {
strategy: "source-wins" | "dest-wins" | "latest-wins" | "merge";
}
async function syncWithConflictResolution(
sourceRecord: any,
destRecord: any,
strategy: ConflictResolution["strategy"]
) {
if (strategy === "source-wins") {
return sourceRecord;
}
if (strategy === "dest-wins") {
return destRecord;
}
if (strategy === "latest-wins") {
return sourceRecord.updated_at > destRecord.updated_at
? sourceRecord
: destRecord;
}
if (strategy === "merge") {
// 合并非空字段
return {
...destRecord,
...Object.fromEntries(
Object.entries(sourceRecord).filter(([_, v]) => v != null)
),
};
}
}
// 跟踪同步作业指标
interface SyncMetrics {
jobName: string;
startTime: Date;
endTime: Date;
recordsProcessed: number;
recordsInserted: number;
recordsUpdated: number;
recordsSkipped: number;
errors: number;
durationMs: number;
}
async function logSyncMetrics(metrics: SyncMetrics) {
await prisma.syncMetric.create({
data: metrics,
});
console.log(`
📊 Sync Metrics
Job: ${metrics.jobName}
Records: ${metrics.recordsProcessed}
Inserted: ${metrics.recordsInserted}
Updated: ${metrics.recordsUpdated}
Errors: ${metrics.errors}
Duration: ${metrics.durationMs}ms
`);
}
// jobs/sync-orders-to-warehouse.ts
export class OrdersETLJob extends ETLJob {
constructor() {
super("orders-etl", "orders_warehouse_sync");
}
async extract(since: Date): Promise<Order[]> {
return prisma.order.findMany({
where: {
updated_at: { gt: since },
},
include: {
items: true,
user: true,
},
orderBy: { updated_at: "asc" },
});
}
async transform(orders: Order[]): Promise<WarehouseOrder[]> {
return orders.map((order) => ({
order_id: order.id,
user_email: order.user.email,
total_amount: order.total,
item_count: order.items.length,
status: order.status,
order_date: order.created_at,
synced_at: new Date(),
}));
}
async load(data: WarehouseOrder[]): Promise<void> {
const batchSize = 100;
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
await warehouseDb.$executeRaw`
INSERT INTO orders_fact (
order_id, user_email, total_amount, item_count,
status, order_date, synced_at
)
VALUES ${batch
.map(
(o) => `(
${o.order_id}, '${o.user_email}', ${o.total_amount},
${o.item_count}, '${o.status}', '${o.order_date}',
'${o.synced_at}'
)`
)
.join(",")}
ON CONFLICT (order_id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status,
synced_at = EXCLUDED.synced_at
`;
}
}
}
// 运行作业
new OrdersETLJob().run();
// 调度 ETL 作业
import cron from "node-cron";
// 每小时运行一次
cron.schedule("0 * * * *", async () => {
await new OrdersETLJob().run();
});
// 每 15 分钟运行一次
cron.schedule("*/15 * * * *", async () => {
await syncUsersIncremental();
});
// 每天凌晨 2 点运行
cron.schedule("0 2 * * *", async () => {
await fullDataSync();
});
async function syncWithErrorHandling() {
const checkpoint = await getCheckpoint();
let processedRecords = 0;
try {
const records = await fetchRecords(checkpoint);
for (const record of records) {
try {
await processRecord(record);
processedRecords++;
// 每处理 100 条记录保存一次检查点
if (processedRecords % 100 === 0) {
await saveCheckpoint(record.id);
}
} catch (error) {
// 记录错误但继续执行
console.error(`Failed to process record ${record.id}:`, error);
await logFailedRecord(record.id, error);
}
}
await saveCheckpoint("completed");
} catch (error) {
// 关键故障 - 作业将从检查点重试
console.error("Job failed:", error);
throw error;
}
}
每周安装量
48
代码仓库
GitHub 星标数
21
首次出现
2026 年 1 月 24 日
安全审计
安装于
codex39
opencode38
gemini-cli37
github-copilot35
cursor35
claude-code32
Build reliable, incremental data synchronization pipelines.
// jobs/sync-users.ts
interface SyncJob {
name: string;
source: "database" | "api" | "file";
destination: "database" | "warehouse" | "s3";
schedule: string;
}
export class ETLJob {
constructor(private name: string, private watermarkKey: string) {}
async run() {
console.log(`🔄 Starting ${this.name}...`);
try {
// 1. Get last watermark
const lastSync = await this.getWatermark();
console.log(` Last sync: ${lastSync}`);
// 2. Extract data
const data = await this.extract(lastSync);
console.log(` Extracted ${data.length} records`);
// 3. Transform data
const transformed = await this.transform(data);
// 4. Load data
await this.load(transformed);
// 5. Update watermark
await this.updateWatermark(new Date());
console.log(`✅ ${this.name} complete`);
} catch (error) {
console.error(`❌ ${this.name} failed:`, error);
throw error;
}
}
private async extract(since: Date) {
// Extract logic
return [];
}
private async transform(data: any[]) {
// Transform logic
return data;
}
private async load(data: any[]) {
// Load logic
}
private async getWatermark(): Promise<Date> {
const watermark = await prisma.syncWatermark.findUnique({
where: { key: this.watermarkKey },
});
return watermark?.lastSync || new Date(0);
}
private async updateWatermark(timestamp: Date) {
await prisma.syncWatermark.upsert({
where: { key: this.watermarkKey },
create: { key: this.watermarkKey, lastSync: timestamp },
update: { lastSync: timestamp },
});
}
}
// Track sync progress
model SyncWatermark {
key String @id
lastSync DateTime
metadata Json?
@@index([lastSync])
}
// Incremental sync using watermark
async function syncOrdersIncremental() {
// Get last sync time
const watermark = await prisma.syncWatermark.findUnique({
where: { key: "orders_sync" },
});
const lastSync = watermark?.lastSync || new Date(0);
// Fetch only new/updated records
const newOrders = await sourceDb.order.findMany({
where: {
updated_at: { gt: lastSync },
},
orderBy: { updated_at: "asc" },
});
console.log(`📦 Syncing ${newOrders.length} orders...`);
// Process in batches
for (let i = 0; i < newOrders.length; i += 100) {
const batch = newOrders.slice(i, i + 100);
await destinationDb.order.createMany({
data: batch,
skipDuplicates: true, // Idempotency
});
}
// Update watermark to latest record
if (newOrders.length > 0) {
const latestTimestamp = newOrders[newOrders.length - 1].updated_at;
await prisma.syncWatermark.upsert({
where: { key: "orders_sync" },
create: { key: "orders_sync", lastSync: latestTimestamp },
update: { lastSync: latestTimestamp },
});
}
console.log(`✅ Sync complete`);
}
// Idempotent sync - safe to run multiple times
async function syncUsersIdempotent(users: User[]) {
for (const user of users) {
await prisma.user.upsert({
where: { id: user.id },
create: user,
update: {
email: user.email,
name: user.name,
updated_at: user.updated_at,
},
});
}
}
// Batch upsert for better performance
async function syncUsersBatch(users: User[]) {
// PostgreSQL: Use ON CONFLICT
await prisma.$executeRaw`
INSERT INTO users (id, email, name, updated_at)
SELECT * FROM UNNEST(
${users.map((u) => u.id)}::bigint[],
${users.map((u) => u.email)}::text[],
${users.map((u) => u.name)}::text[],
${users.map((u) => u.updated_at)}::timestamp[]
)
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
name = EXCLUDED.name,
updated_at = EXCLUDED.updated_at
WHERE users.updated_at < EXCLUDED.updated_at
`;
}
async function syncWithRetry<T>(
operation: () => Promise<T>,
maxRetries: number = 3,
baseDelay: number = 1000
): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
if (attempt === maxRetries) throw error;
const delay = baseDelay * Math.pow(2, attempt);
console.log(` Retry ${attempt + 1}/${maxRetries} after ${delay}ms`);
await sleep(delay);
}
}
throw new Error("Max retries exceeded");
}
// Usage
await syncWithRetry(
async () => {
return await syncOrders();
},
3,
1000
);
// Listen to database changes
import { PrismaClient } from "@prisma/client";
const prisma = new PrismaClient();
// PostgreSQL: Listen to logical replication
async function setupCDC() {
await prisma.$executeRaw`
CREATE PUBLICATION orders_publication FOR TABLE orders;
`;
// Subscribe to changes (using pg library)
const client = await pg.connect();
client.query("LISTEN orders_changed;");
client.on("notification", async (msg) => {
const change = JSON.parse(msg.payload);
if (change.operation === "INSERT" || change.operation === "UPDATE") {
await syncOrder(change.data);
}
});
}
interface ConflictResolution {
strategy: "source-wins" | "dest-wins" | "latest-wins" | "merge";
}
async function syncWithConflictResolution(
sourceRecord: any,
destRecord: any,
strategy: ConflictResolution["strategy"]
) {
if (strategy === "source-wins") {
return sourceRecord;
}
if (strategy === "dest-wins") {
return destRecord;
}
if (strategy === "latest-wins") {
return sourceRecord.updated_at > destRecord.updated_at
? sourceRecord
: destRecord;
}
if (strategy === "merge") {
// Merge non-null fields
return {
...destRecord,
...Object.fromEntries(
Object.entries(sourceRecord).filter(([_, v]) => v != null)
),
};
}
}
// Track sync job metrics
interface SyncMetrics {
jobName: string;
startTime: Date;
endTime: Date;
recordsProcessed: number;
recordsInserted: number;
recordsUpdated: number;
recordsSkipped: number;
errors: number;
durationMs: number;
}
async function logSyncMetrics(metrics: SyncMetrics) {
await prisma.syncMetric.create({
data: metrics,
});
console.log(`
📊 Sync Metrics
Job: ${metrics.jobName}
Records: ${metrics.recordsProcessed}
Inserted: ${metrics.recordsInserted}
Updated: ${metrics.recordsUpdated}
Errors: ${metrics.errors}
Duration: ${metrics.durationMs}ms
`);
}
// jobs/sync-orders-to-warehouse.ts
export class OrdersETLJob extends ETLJob {
constructor() {
super("orders-etl", "orders_warehouse_sync");
}
async extract(since: Date): Promise<Order[]> {
return prisma.order.findMany({
where: {
updated_at: { gt: since },
},
include: {
items: true,
user: true,
},
orderBy: { updated_at: "asc" },
});
}
async transform(orders: Order[]): Promise<WarehouseOrder[]> {
return orders.map((order) => ({
order_id: order.id,
user_email: order.user.email,
total_amount: order.total,
item_count: order.items.length,
status: order.status,
order_date: order.created_at,
synced_at: new Date(),
}));
}
async load(data: WarehouseOrder[]): Promise<void> {
const batchSize = 100;
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
await warehouseDb.$executeRaw`
INSERT INTO orders_fact (
order_id, user_email, total_amount, item_count,
status, order_date, synced_at
)
VALUES ${batch
.map(
(o) => `(
${o.order_id}, '${o.user_email}', ${o.total_amount},
${o.item_count}, '${o.status}', '${o.order_date}',
'${o.synced_at}'
)`
)
.join(",")}
ON CONFLICT (order_id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status,
synced_at = EXCLUDED.synced_at
`;
}
}
}
// Run job
new OrdersETLJob().run();
// Schedule ETL jobs
import cron from "node-cron";
// Run every hour
cron.schedule("0 * * * *", async () => {
await new OrdersETLJob().run();
});
// Run every 15 minutes
cron.schedule("*/15 * * * *", async () => {
await syncUsersIncremental();
});
// Run nightly at 2 AM
cron.schedule("0 2 * * *", async () => {
await fullDataSync();
});
async function syncWithErrorHandling() {
const checkpoint = await getCheckpoint();
let processedRecords = 0;
try {
const records = await fetchRecords(checkpoint);
for (const record of records) {
try {
await processRecord(record);
processedRecords++;
// Save checkpoint every 100 records
if (processedRecords % 100 === 0) {
await saveCheckpoint(record.id);
}
} catch (error) {
// Log error but continue
console.error(`Failed to process record ${record.id}:`, error);
await logFailedRecord(record.id, error);
}
}
await saveCheckpoint("completed");
} catch (error) {
// Critical failure - job will retry from checkpoint
console.error("Job failed:", error);
throw error;
}
}
Weekly Installs
48
Repository
GitHub Stars
21
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex39
opencode38
gemini-cli37
github-copilot35
cursor35
claude-code32
Skills CLI 使用指南:AI Agent 技能包管理器安装与管理教程
52,700 周安装
Xcode Build Direct:使用原生 CLI 工具构建、测试与自动化 iOS/macOS 项目
191 周安装
代码审查专家:严格检测代码缺陷、低效与不良实践,提升代码质量
193 周安装
Supabase Auth 身份验证技能:邮箱密码注册登录、会话管理、用户元数据操作
193 周安装
Notion API 集成指南:使用 Membrane CLI 自动化笔记、数据库与项目管理
192 周安装
使用Polars进行高效数据分析 - 数据加载、清洗、转换与可视化完整指南
192 周安装
构建dbt语义层完整指南:语义模型、实体、维度与指标创建教程
191 周安装