etl-pipelines by claude-dev-suite/claude-dev-suite
npx skills add https://github.com/claude-dev-suite/claude-dev-suite --skill etl-pipelinesimport { Transform, pipeline } from 'stream';
import { promisify } from 'util';
const pipelineAsync = promisify(pipeline);
// Extract → Transform → Load
await pipelineAsync(
// Extract: read from source
db.query('SELECT * FROM legacy_orders').stream(),
// Transform
new Transform({
objectMode: true,
transform(row, _, callback) {
callback(null, {
id: row.order_id,
customer: row.cust_name.trim(),
amount: parseFloat(row.total_amount),
date: new Date(row.order_date).toISOString(),
});
},
}),
// Load: batch insert to destination
new BatchWriter(targetDb, 'orders', { batchSize: 1000 }),
);
class BatchWriter extends Writable {
private batch: any[] = [];
constructor(private db: Database, private table: string, private opts: { batchSize: number }) {
super({ objectMode: true });
}
async _write(record: any, _: string, callback: () => void) {
this.batch.push(record);
if (this.batch.length >= this.opts.batchSize) {
await this.flush();
}
callback();
}
async _final(callback: () => void) {
if (this.batch.length > 0) await this.flush();
callback();
}
private async flush() {
await this.db.batchInsert(this.table, this.batch);
this.batch = [];
}
}
import { Transform, pipeline } from 'stream';
import { promisify } from 'util';
const pipelineAsync = promisify(pipeline);
// Extract → Transform → Load
await pipelineAsync(
// Extract: read from source
db.query('SELECT * FROM legacy_orders').stream(),
// Transform
new Transform({
objectMode: true,
transform(row, _, callback) {
callback(null, {
id: row.order_id,
customer: row.cust_name.trim(),
amount: parseFloat(row.total_amount),
date: new Date(row.order_date).toISOString(),
});
},
}),
// Load: batch insert to destination
new BatchWriter(targetDb, 'orders', { batchSize: 1000 }),
);
class BatchWriter extends Writable {
private batch: any[] = [];
constructor(private db: Database, private table: string, private opts: { batchSize: number }) {
super({ objectMode: true });
}
async _write(record: any, _: string, callback: () => void) {
this.batch.push(record);
if (this.batch.length >= this.opts.batchSize) {
await this.flush();
}
callback();
}
async _final(callback: () => void) {
if (this.batch.length > 0) await this.flush();
callback();
}
private async flush() {
await this.db.batchInsert(this.table, this.batch);
this.batch = [];
}
}
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
import polars as pl
# Extract
df = pl.read_csv("data/legacy_orders.csv")
# Or from database:
# df = pl.read_database("SELECT * FROM orders", connection_uri)
# Transform
transformed = (
df
.with_columns([
pl.col("customer_name").str.strip_chars().alias("customer"),
pl.col("total_amount").cast(pl.Float64).alias("amount"),
pl.col("order_date").str.to_datetime().alias("date"),
])
.filter(pl.col("amount") > 0)
.drop("customer_name", "total_amount", "order_date")
)
# Load
transformed.write_database("orders", connection_uri, if_table_exists="append")
# Or to Parquet for data lake:
transformed.write_parquet("output/orders.parquet")
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
default_args = {
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG('daily_order_sync', default_args=default_args,
schedule_interval='0 2 * * *', start_date=datetime(2026, 1, 1),
catchup=False) as dag:
def extract(**context):
hook = PostgresHook('source_db')
df = hook.get_pandas_df("SELECT * FROM orders WHERE date = %(ds)s", parameters={'ds': context['ds']})
context['ti'].xcom_push(key='row_count', value=len(df))
df.to_parquet('/tmp/orders.parquet')
def transform():
import polars as pl
df = pl.read_parquet('/tmp/orders.parquet')
transformed = df.with_columns(pl.col("amount").cast(pl.Float64))
transformed.write_parquet('/tmp/orders_clean.parquet')
def load():
hook = PostgresHook('warehouse_db')
import polars as pl
df = pl.read_parquet('/tmp/orders_clean.parquet')
df.write_database('fact_orders', hook.get_uri(), if_table_exists='append')
extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = PythonOperator(task_id='load', python_callable=load)
extract_task >> transform_task >> load_task
-- Extract + Load raw data, then transform in warehouse
-- Step 1: Load raw (use COPY or bulk insert)
COPY raw_orders FROM 's3://bucket/orders.csv' CREDENTIALS '...' CSV HEADER;
-- Step 2: Transform in place
INSERT INTO fact_orders (id, customer, amount, order_date)
SELECT order_id, TRIM(customer_name), CAST(total AS DECIMAL(10,2)), TO_DATE(date_str, 'YYYY-MM-DD')
FROM raw_orders
WHERE total > 0 AND NOT EXISTS (SELECT 1 FROM fact_orders WHERE id = raw_orders.order_id);
| 反模式 | 修复方案 |
|---|---|
| 将所有数据加载到内存中 | 使用流式处理 (Node.js 流、生成器) |
| 缺乏幂等性 (重跑会导致数据重复) | 使用 upsert 或在插入前进行去重 |
| 缺少每条记录的错误处理 | 记录错误记录,继续处理正常数据 |
| 缺少数据验证 | 在加载前验证模式和类型 |
| 单体式 ETL 脚本 | 拆分为提取、转换、加载阶段 |
每周安装数
1
代码仓库
首次出现
3 天前
安全审计
安装于
amp1
cline1
openclaw1
opencode1
cursor1
kimi-cli1
import polars as pl
# Extract
df = pl.read_csv("data/legacy_orders.csv")
# Or from database:
# df = pl.read_database("SELECT * FROM orders", connection_uri)
# Transform
transformed = (
df
.with_columns([
pl.col("customer_name").str.strip_chars().alias("customer"),
pl.col("total_amount").cast(pl.Float64).alias("amount"),
pl.col("order_date").str.to_datetime().alias("date"),
])
.filter(pl.col("amount") > 0)
.drop("customer_name", "total_amount", "order_date")
)
# Load
transformed.write_database("orders", connection_uri, if_table_exists="append")
# Or to Parquet for data lake:
transformed.write_parquet("output/orders.parquet")
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
default_args = {
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG('daily_order_sync', default_args=default_args,
schedule_interval='0 2 * * *', start_date=datetime(2026, 1, 1),
catchup=False) as dag:
def extract(**context):
hook = PostgresHook('source_db')
df = hook.get_pandas_df("SELECT * FROM orders WHERE date = %(ds)s", parameters={'ds': context['ds']})
context['ti'].xcom_push(key='row_count', value=len(df))
df.to_parquet('/tmp/orders.parquet')
def transform():
import polars as pl
df = pl.read_parquet('/tmp/orders.parquet')
transformed = df.with_columns(pl.col("amount").cast(pl.Float64))
transformed.write_parquet('/tmp/orders_clean.parquet')
def load():
hook = PostgresHook('warehouse_db')
import polars as pl
df = pl.read_parquet('/tmp/orders_clean.parquet')
df.write_database('fact_orders', hook.get_uri(), if_table_exists='append')
extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = PythonOperator(task_id='load', python_callable=load)
extract_task >> transform_task >> load_task
-- Extract + Load raw data, then transform in warehouse
-- Step 1: Load raw (use COPY or bulk insert)
COPY raw_orders FROM 's3://bucket/orders.csv' CREDENTIALS '...' CSV HEADER;
-- Step 2: Transform in place
INSERT INTO fact_orders (id, customer, amount, order_date)
SELECT order_id, TRIM(customer_name), CAST(total AS DECIMAL(10,2)), TO_DATE(date_str, 'YYYY-MM-DD')
FROM raw_orders
WHERE total > 0 AND NOT EXISTS (SELECT 1 FROM fact_orders WHERE id = raw_orders.order_id);
| Anti-Pattern | Fix |
|---|---|
| Loading all data into memory | Use streaming (Node.js streams, generators) |
| No idempotency (re-runs duplicate data) | Use upserts or dedup before insert |
| No error handling per record | Log bad records, continue processing good ones |
| No data validation | Validate schema and types before loading |
| Monolithic ETL script | Split into extract, transform, load stages |
Weekly Installs
1
Repository
First Seen
3 days ago
Security Audits
Installed on
amp1
cline1
openclaw1
opencode1
cursor1
kimi-cli1
FastAPI官方技能:Python Web开发最佳实践与CLI工具使用指南
940 周安装
Docnify自动化:通过Rube MCP和Composio工具包实现文档操作自动化
1 周安装
Docmosis自动化集成指南:通过Rube MCP与Composio实现文档生成自动化
1 周安装
Dictionary API自动化教程:通过Rube MCP和Composio实现词典API操作自动化
1 周安装
detrack-automation:自动化追踪技能,集成Claude AI提升开发效率
1 周安装
Demio自动化工具包:通过Rube MCP和Composio实现Demio操作自动化
1 周安装
Deel自动化工具:通过Rube MCP与Composio实现HR与薪资操作自动化
1 周安装