npx skills add https://github.com/claude-office-skills/skills --skill data-pipeline构建数据管道和 ETL 工作流,用于数据集成、转换和分析自动化。基于 n8n 的数据工作流模板。
此技能涵盖:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 提取 │───▶│ 转换 │───▶│ 加载 │
│ │ │ │ │ │
│ • APIs │ │ • 清洗 │ │ • 数据库 │
│ • 数据库 │ │ • 映射 │ │ • 数据仓库 │
│ • 文件 │ │ • 聚合 │ │ • 文件 │
│ • Webhooks │ │ • 丰富 │ │ • APIs │
└─────────────┘ └─────────────┘ └─────────────┘
workflow: "Daily Sales ETL"
schedule: "2am daily"
nodes:
# EXTRACT
- name: "Extract from Shopify"
type: shopify
action: get_orders
filter: created_at >= yesterday
- name: "Extract from Stripe"
type: stripe
action: get_payments
filter: created >= yesterday
# TRANSFORM
- name: "Merge Data"
type: merge
mode: combine_by_key
key: order_id
- name: "Transform"
type: code
code: |
return items.map(item => ({
date: item.created_at.split('T')[0],
order_id: item.id,
customer_email: item.email,
total: parseFloat(item.total_price),
currency: item.currency,
items: item.line_items.length,
source: item.source_name,
payment_status: item.payment.status
}));
# LOAD
- name: "Load to BigQuery"
type: google_bigquery
action: insert_rows
table: sales_daily
- name: "Update Google Sheets"
type: google_sheets
action: append_rows
spreadsheet: "Daily Sales Report"
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
extractors:
databases:
- postgresql:
connection: connection_string
query: "SELECT * FROM orders WHERE date >= $1"
- mysql:
connection: connection_string
query: custom_sql
- mongodb:
connection: connection_string
collection: orders
filter: {date: {$gte: yesterday}}
apis:
- rest_api:
url: "https://api.example.com/data"
method: GET
headers: {Authorization: "Bearer {token}"}
pagination: handle_automatically
- graphql:
url: "https://api.example.com/graphql"
query: graphql_query
files:
- csv:
source: sftp/s3/google_drive
delimiter: ","
encoding: utf-8
- excel:
source: file_path
sheet: "Sheet1"
- json:
source: api/file
path: "data.items"
saas:
- salesforce: get_objects
- hubspot: get_contacts/deals
- stripe: get_charges
- shopify: get_orders
transformations:
cleaning:
- remove_nulls: drop_or_fill
- trim_whitespace: all_string_fields
- deduplicate: by_key
- validate: against_schema
mapping:
- rename_fields: {old_name: new_name}
- convert_types: {date_string: date}
- map_values: {status_code: status_name}
aggregation:
- group_by: [date, category]
- sum: [revenue, quantity]
- count: orders
- average: order_value
enrichment:
- lookup: from_reference_table
- geocode: from_address
- calculate: derived_fields
filtering:
- where: condition
- limit: n_rows
- sample: percentage
// 清洗和规范化数据
function transform(items) {
return items.map(item => ({
// 清洗字符串
name: item.name?.trim().toLowerCase(),
// 解析日期
date: new Date(item.created_at).toISOString().split('T')[0],
// 转换类型
amount: parseFloat(item.amount) || 0,
// 映射值
status: statusMap[item.status_code] || 'unknown',
// 计算字段
total: item.quantity * item.unit_price,
// 过滤嵌套项
tags: item.tags?.filter(t => t.active).map(t => t.name),
// 默认值
source: item.source || 'direct'
}));
}
// 聚合数据
function aggregate(items) {
const grouped = {};
items.forEach(item => {
const key = `${item.date}_${item.category}`;
if (!grouped[key]) {
grouped[key] = {
date: item.date,
category: item.category,
total_revenue: 0,
order_count: 0
};
}
grouped[key].total_revenue += item.amount;
grouped[key].order_count += 1;
});
return Object.values(grouped);
}
loaders:
data_warehouses:
- bigquery:
project: project_id
dataset: analytics
table: sales
write_mode: append/truncate
- snowflake:
account: account_id
warehouse: compute_wh
database: analytics
schema: public
- redshift:
cluster: cluster_id
database: analytics
databases:
- postgresql:
upsert: on_conflict_update
- mysql:
batch_insert: 1000_rows
files:
- s3:
bucket: data-lake
path: /processed/{date}/
format: parquet
- google_cloud_storage:
bucket: data-bucket
spreadsheets:
- google_sheets:
mode: append/overwrite
- airtable:
base: base_id
table: table_name
apis:
- webhook:
url: destination_url
batch_size: 100
scheduling:
patterns:
hourly:
cron: "0 * * * *"
use_for: real_time_dashboards
daily:
cron: "0 2 * * *"
use_for: daily_reports
weekly:
cron: "0 3 * * 1"
use_for: weekly_summaries
on_demand:
trigger: webhook/manual
use_for: ad_hoc_analysis
dependencies:
- pipeline_a: must_complete_before pipeline_b
- wait_for: all_extracts_complete
retries:
max_attempts: 3
delay: exponential_backoff
alert_on: final_failure
monitoring:
metrics:
- rows_processed
- execution_time
- error_count
- data_freshness
alerts:
pipeline_failed:
channels: [slack, pagerduty]
template: |
🚨 *管道失败*
管道: {pipeline_name}
阶段: {failed_stage}
错误: {error_message}
[查看日志]({logs_url})
data_quality:
trigger: anomaly_detected
conditions:
- row_count: differs_by > 50%
- null_rate: exceeds_threshold
- schema: changed_unexpectedly
stale_data:
trigger: last_update > threshold
threshold: 2_hours
data_quality:
schema_validation:
- required_fields: [id, date, amount]
- field_types:
id: integer
date: date
amount: number
- allowed_values:
status: [active, pending, closed]
statistical_checks:
- null_rate: < 5%
- duplicate_rate: < 1%
- value_range:
amount: [0, 1000000]
business_rules:
- total_equals_sum_of_line_items
- dates_are_not_in_future
- email_format_valid
trend_analysis:
- row_count: within_2_std_of_mean
- total_value: within_expected_range
请求:"创建一个每日销售数据管道"
输出:
# 每日销售数据管道
## 管道概述
Shopify + Stripe → 转换 → BigQuery + Sheets
## 调度
- 运行时间: 每日凌晨 2 点
- 时区: UTC
- 重试: 3 次尝试
## 提取
### Shopify 订单
```yaml
source: shopify
filter: created_at >= yesterday
fields: [id, email, total_price, line_items, created_at]
source: stripe
filter: created >= yesterday
fields: [id, amount, status, metadata.order_id]
// 连接和清洗数据
{
date: order.created_at.split('T')[0],
order_id: order.id,
customer: order.email,
revenue: parseFloat(order.total_price),
items: order.line_items.length,
payment_status: payment.status
}
analytics.sales_daily数据管道技能 - Claude Office Skills 的一部分
每周安装次数
22
代码仓库
GitHub 星标
5
首次出现
5 天前
安全审计
安装于
claude-code20
opencode5
gemini-cli5
github-copilot5
codex5
amp5
Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates.
This skill covers:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ EXTRACT │───▶│ TRANSFORM │───▶│ LOAD │
│ │ │ │ │ │
│ • APIs │ │ • Clean │ │ • Database │
│ • Databases │ │ • Map │ │ • Warehouse │
│ • Files │ │ • Aggregate │ │ • Files │
│ • Webhooks │ │ • Enrich │ │ • APIs │
└─────────────┘ └─────────────┘ └─────────────┘
workflow: "Daily Sales ETL"
schedule: "2am daily"
nodes:
# EXTRACT
- name: "Extract from Shopify"
type: shopify
action: get_orders
filter: created_at >= yesterday
- name: "Extract from Stripe"
type: stripe
action: get_payments
filter: created >= yesterday
# TRANSFORM
- name: "Merge Data"
type: merge
mode: combine_by_key
key: order_id
- name: "Transform"
type: code
code: |
return items.map(item => ({
date: item.created_at.split('T')[0],
order_id: item.id,
customer_email: item.email,
total: parseFloat(item.total_price),
currency: item.currency,
items: item.line_items.length,
source: item.source_name,
payment_status: item.payment.status
}));
# LOAD
- name: "Load to BigQuery"
type: google_bigquery
action: insert_rows
table: sales_daily
- name: "Update Google Sheets"
type: google_sheets
action: append_rows
spreadsheet: "Daily Sales Report"
extractors:
databases:
- postgresql:
connection: connection_string
query: "SELECT * FROM orders WHERE date >= $1"
- mysql:
connection: connection_string
query: custom_sql
- mongodb:
connection: connection_string
collection: orders
filter: {date: {$gte: yesterday}}
apis:
- rest_api:
url: "https://api.example.com/data"
method: GET
headers: {Authorization: "Bearer {token}"}
pagination: handle_automatically
- graphql:
url: "https://api.example.com/graphql"
query: graphql_query
files:
- csv:
source: sftp/s3/google_drive
delimiter: ","
encoding: utf-8
- excel:
source: file_path
sheet: "Sheet1"
- json:
source: api/file
path: "data.items"
saas:
- salesforce: get_objects
- hubspot: get_contacts/deals
- stripe: get_charges
- shopify: get_orders
transformations:
cleaning:
- remove_nulls: drop_or_fill
- trim_whitespace: all_string_fields
- deduplicate: by_key
- validate: against_schema
mapping:
- rename_fields: {old_name: new_name}
- convert_types: {date_string: date}
- map_values: {status_code: status_name}
aggregation:
- group_by: [date, category]
- sum: [revenue, quantity]
- count: orders
- average: order_value
enrichment:
- lookup: from_reference_table
- geocode: from_address
- calculate: derived_fields
filtering:
- where: condition
- limit: n_rows
- sample: percentage
// Clean and normalize data
function transform(items) {
return items.map(item => ({
// Clean strings
name: item.name?.trim().toLowerCase(),
// Parse dates
date: new Date(item.created_at).toISOString().split('T')[0],
// Convert types
amount: parseFloat(item.amount) || 0,
// Map values
status: statusMap[item.status_code] || 'unknown',
// Calculate fields
total: item.quantity * item.unit_price,
// Filter nested
tags: item.tags?.filter(t => t.active).map(t => t.name),
// Default values
source: item.source || 'direct'
}));
}
// Aggregate data
function aggregate(items) {
const grouped = {};
items.forEach(item => {
const key = `${item.date}_${item.category}`;
if (!grouped[key]) {
grouped[key] = {
date: item.date,
category: item.category,
total_revenue: 0,
order_count: 0
};
}
grouped[key].total_revenue += item.amount;
grouped[key].order_count += 1;
});
return Object.values(grouped);
}
loaders:
data_warehouses:
- bigquery:
project: project_id
dataset: analytics
table: sales
write_mode: append/truncate
- snowflake:
account: account_id
warehouse: compute_wh
database: analytics
schema: public
- redshift:
cluster: cluster_id
database: analytics
databases:
- postgresql:
upsert: on_conflict_update
- mysql:
batch_insert: 1000_rows
files:
- s3:
bucket: data-lake
path: /processed/{date}/
format: parquet
- google_cloud_storage:
bucket: data-bucket
spreadsheets:
- google_sheets:
mode: append/overwrite
- airtable:
base: base_id
table: table_name
apis:
- webhook:
url: destination_url
batch_size: 100
scheduling:
patterns:
hourly:
cron: "0 * * * *"
use_for: real_time_dashboards
daily:
cron: "0 2 * * *"
use_for: daily_reports
weekly:
cron: "0 3 * * 1"
use_for: weekly_summaries
on_demand:
trigger: webhook/manual
use_for: ad_hoc_analysis
dependencies:
- pipeline_a: must_complete_before pipeline_b
- wait_for: all_extracts_complete
retries:
max_attempts: 3
delay: exponential_backoff
alert_on: final_failure
monitoring:
metrics:
- rows_processed
- execution_time
- error_count
- data_freshness
alerts:
pipeline_failed:
channels: [slack, pagerduty]
template: |
🚨 *Pipeline Failed*
Pipeline: {pipeline_name}
Stage: {failed_stage}
Error: {error_message}
[View Logs]({logs_url})
data_quality:
trigger: anomaly_detected
conditions:
- row_count: differs_by > 50%
- null_rate: exceeds_threshold
- schema: changed_unexpectedly
stale_data:
trigger: last_update > threshold
threshold: 2_hours
data_quality:
schema_validation:
- required_fields: [id, date, amount]
- field_types:
id: integer
date: date
amount: number
- allowed_values:
status: [active, pending, closed]
statistical_checks:
- null_rate: < 5%
- duplicate_rate: < 1%
- value_range:
amount: [0, 1000000]
business_rules:
- total_equals_sum_of_line_items
- dates_are_not_in_future
- email_format_valid
trend_analysis:
- row_count: within_2_std_of_mean
- total_value: within_expected_range
Request : "Create a daily sales data pipeline"
Output :
# Daily Sales Data Pipeline
## Pipeline Overview
Shopify + Stripe → Transform → BigQuery + Sheets
## Schedule
- Runs: 2am daily
- Timezone: UTC
- Retry: 3 attempts
## Extract
### Shopify Orders
```yaml
source: shopify
filter: created_at >= yesterday
fields: [id, email, total_price, line_items, created_at]
source: stripe
filter: created >= yesterday
fields: [id, amount, status, metadata.order_id]
// Join and clean data
{
date: order.created_at.split('T')[0],
order_id: order.id,
customer: order.email,
revenue: parseFloat(order.total_price),
items: order.line_items.length,
payment_status: payment.status
}
analytics.sales_dailySlack: #data-alerts
On failure: @data-team
Data Pipeline Skill - Part of Claude Office Skills
Weekly Installs
22
Repository
GitHub Stars
5
First Seen
5 days ago
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
claude-code20
opencode5
gemini-cli5
github-copilot5
codex5
amp5
Python PDF处理教程:合并拆分、提取文本表格、创建PDF文件
55,400 周安装