analytics-engineer by borghei/claude-skills
npx skills add https://github.com/borghei/claude-skills --skill analytics-engineer面向可扩展数据转换的专业级数据分析工程。
SOURCES → INGESTION → WAREHOUSE → TRANSFORMATION → SEMANTIC → BI
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
APIs Fivetran Snowflake dbt Looker Tableau
DBs Airbyte BigQuery Dataform Transform PBI
Files Stitch Redshift Spark SQL dbt ML Metabase
analytics/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/ # 原始数据 → 清洗后数据
│ │ ├── stg_*.sql
│ │ └── _stg_*.yml
│ ├── intermediate/ # 业务逻辑
│ │ ├── int_*.sql
│ │ └── _int_*.yml
│ └── marts/ # 最终模型
│ ├── core/
│ │ ├── dim_*.sql
│ │ └── fct_*.sql
│ ├── marketing/
│ └── finance/
├── macros/
├── tests/
├── seeds/
├── snapshots/
└── analyses/
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
星型模式:
┌──────────────┐
│ dim_date │
└──────┬───────┘
│
┌──────────────┐ ┌──────┴───────┐ ┌──────────────┐
│ dim_customer │────│ fct_orders │────│ dim_product │
└──────────────┘ └──────┬───────┘ └──────────────┘
│
┌──────┴───────┐
│ dim_store │
└──────────────┘
维度表模式:
-- models/marts/core/dim_customer.sql
WITH customers AS (
SELECT * FROM {{ ref('stg_crm__customers') }}
),
addresses AS (
SELECT * FROM {{ ref('stg_crm__addresses') }}
),
customer_orders AS (
SELECT
customer_id,
MIN(order_date) AS first_order_date,
MAX(order_date) AS most_recent_order_date,
COUNT(*) AS lifetime_orders,
SUM(order_amount) AS lifetime_value
FROM {{ ref('stg_orders__orders') }}
GROUP BY customer_id
),
final AS (
SELECT
customers.customer_id,
customers.customer_name,
customers.email,
customers.created_at,
addresses.city,
addresses.state,
addresses.country,
customer_orders.first_order_date,
customer_orders.most_recent_order_date,
customer_orders.lifetime_orders,
customer_orders.lifetime_value,
CASE
WHEN customer_orders.lifetime_value >= 10000 THEN 'platinum'
WHEN customer_orders.lifetime_value >= 5000 THEN 'gold'
WHEN customer_orders.lifetime_value >= 1000 THEN 'silver'
ELSE 'bronze'
END AS customer_tier
FROM customers
LEFT JOIN addresses
ON customers.address_id = addresses.address_id
LEFT JOIN customer_orders
ON customers.customer_id = customer_orders.customer_id
)
SELECT * FROM final
事实表模式:
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
partition_by={'field': 'order_date', 'data_type': 'date'},
cluster_by=['customer_id', 'product_id']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders__orders') }}
{% if is_incremental() %}
WHERE order_date >= (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
),
order_items AS (
SELECT * FROM {{ ref('stg_orders__order_items') }}
),
final AS (
SELECT
orders.order_id,
orders.order_date,
orders.customer_id,
order_items.product_id,
orders.store_id,
order_items.quantity,
order_items.unit_price,
order_items.quantity * order_items.unit_price AS line_total,
orders.discount_amount,
orders.tax_amount,
orders.shipping_amount,
orders.total_amount
FROM orders
INNER JOIN order_items
ON orders.order_id = order_items.order_id
)
SELECT * FROM final
-- models/staging/crm/stg_crm__customers.sql
WITH source AS (
SELECT * FROM {{ source('crm', 'customers') }}
),
renamed AS (
SELECT
-- 主键
id AS customer_id,
-- 字符串
TRIM(LOWER(name)) AS customer_name,
TRIM(LOWER(email)) AS email,
-- 日期
created_at::timestamp AS created_at,
updated_at::timestamp AS updated_at,
-- 布尔值
is_active::boolean AS is_active,
-- 元数据
_fivetran_synced AS _loaded_at
FROM source
WHERE _fivetran_deleted = false
)
SELECT * FROM renamed
# models/staging/crm/_crm__sources.yml
version: 2
sources:
- name: crm
description: 客户关系管理系统
database: raw
schema: crm
loader: fivetran
loaded_at_field: _fivetran_synced
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
tables:
- name: customers
description: 客户主数据
columns:
- name: id
description: 主键
tests:
- unique
- not_null
- name: email
tests:
- unique
# models/marts/core/_core__models.yml
version: 2
models:
- name: dim_customer
description: 客户维度表
columns:
- name: customer_id
description: 主键
tests:
- unique
- not_null
- name: email
tests:
- unique
- not_null
- name: customer_tier
tests:
- accepted_values:
values: ['platinum', 'gold', 'silver', 'bronze']
- name: lifetime_value
tests:
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: fct_orders
description: 订单事实表
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- order_id
- product_id
columns:
- name: customer_id
tests:
- relationships:
to: ref('dim_customer')
field: customer_id
-- tests/assert_positive_amounts.sql
{% test positive_amount(model, column_name) %}
SELECT
{{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} < 0
{% endtest %}
-- tests/generic/assert_row_count_equal.sql
{% test row_count_equal(model, compare_model) %}
WITH source_count AS (
SELECT COUNT(*) AS cnt FROM {{ model }}
),
compare_count AS (
SELECT COUNT(*) AS cnt FROM {{ ref(compare_model) }}
)
SELECT *
FROM source_count
CROSS JOIN compare_count
WHERE source_count.cnt != compare_count.cnt
{% endtest %}
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
({{ column_name }} / 100.0)::decimal(18,2)
{% endmacro %}
-- macros/pivot_values.sql
{% macro pivot_values(column_name, values, alias_prefix='') %}
{% for value in values %}
SUM(CASE WHEN {{ column_name }} = '{{ value }}' THEN 1 ELSE 0 END)
AS {{ alias_prefix }}{{ value | lower | replace(' ', '_') }}
{% if not loop.last %},{% endif %}
{% endfor %}
{% endmacro %}
-- macros/incremental_filter.sql
{% macro get_incremental_filter(column_name, lookback_days=3) %}
{% if is_incremental() %}
WHERE {{ column_name }} >= (
SELECT DATEADD(day, -{{ lookback_days }}, MAX({{ column_name }}))
FROM {{ this }}
)
{% endif %}
{% endmacro %}
# models/marts/core/_core__metrics.yml
version: 2
metrics:
- name: revenue
label: 总收入
model: ref('fct_orders')
description: 所有订单金额的总和
calculation_method: sum
expression: total_amount
timestamp: order_date
time_grains: [day, week, month, quarter, year]
dimensions:
- customer_tier
- product_category
- store_region
filters:
- field: is_cancelled
operator: '='
value: 'false'
- name: average_order_value
label: 平均订单价值
model: ref('fct_orders')
description: 平均订单金额
calculation_method: average
expression: total_amount
timestamp: order_date
time_grains: [day, week, month]
- name: customer_count
label: 客户数量
model: ref('dim_customer')
calculation_method: count_distinct
expression: customer_id
# models/exposures.yml
version: 2
exposures:
- name: executive_dashboard
type: dashboard
maturity: high
url: https://tableau.company.com/views/executive
description: 执行层 KPI 仪表板
depends_on:
- ref('fct_orders')
- ref('dim_customer')
- ref('dim_product')
owner:
name: 分析团队
email: analytics@company.com
- name: marketing_report
type: notebook
maturity: medium
url: https://databricks.company.com/notebooks/marketing
depends_on:
- ref('fct_marketing_events')
- ref('dim_campaign')
owner:
name: 营销分析
email: marketing-analytics@company.com
| 层级 | 物化方式 | 原因 |
|---|---|---|
| 暂存层 | 视图 | 原始数据,无聚合 |
| 中间层 | 临时表/视图 | 业务逻辑,被多次引用 |
| 集市层(小) | 表 | 最终模型,查询性能 |
| 集市层(大) | 增量表 | 大型事实表,效率 |
-- 之前:在整个表上使用昂贵的窗口函数
SELECT
order_id,
customer_id,
order_date,
SUM(amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
) AS running_total
FROM orders;
-- 之后:先预聚合再连接
WITH daily_totals AS (
SELECT
customer_id,
order_date,
SUM(amount) AS daily_amount
FROM orders
GROUP BY customer_id, order_date
),
running_totals AS (
SELECT
customer_id,
order_date,
SUM(daily_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
) AS running_total
FROM daily_totals
)
SELECT
o.order_id,
o.customer_id,
o.order_date,
rt.running_total
FROM orders o
JOIN running_totals rt
ON o.customer_id = rt.customer_id
AND o.order_date = rt.order_date;
{{
config(
materialized='incremental',
unique_key='event_id',
partition_by={
'field': 'event_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['user_id', 'event_type']
)
}}
# .github/workflows/dbt.yml
name: dbt CI/CD
on:
pull_request:
branches: [main]
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: pip install dbt-snowflake
- name: dbt deps
run: dbt deps
- name: dbt compile
run: dbt compile --target ci
- name: dbt test
run: dbt test --target ci
deploy:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: dbt run
run: dbt run --target prod
- name: dbt test
run: dbt test --target prod
# 仅运行修改的模型及其下游模型
dbt run --select state:modified+ --defer --state ./target-base
dbt test --select state:modified+ --defer --state ./target-base
# models/marts/core/_core__models.yml
version: 2
models:
- name: fct_orders
description: |
订单事实表,每行代表一个订单行项目。
## 业务逻辑
- 状态为 'cancelled' 的订单被排除
- 金额单位为美元
- 税费在订单生成时计算
## 使用示例
```sql
SELECT * FROM {{ ref('fct_orders') }}
WHERE order_date >= '2024-01-01'
```
## 依赖项
- stg_orders__orders
- stg_orders__order_items
# 生成并提供文档服务
dbt docs generate
dbt docs serve --port 8080
references/modeling_patterns.md - 数据建模最佳实践references/dbt_style_guide.md - SQL 和 dbt 规范references/testing_guide.md - 测试策略references/optimization.md - 性能调优# 模型影响分析器
python scripts/impact_analyzer.py --model dim_customer
# 模式变更检测器
python scripts/schema_diff.py --source prod --target dev
# 文档生成器
python scripts/doc_generator.py --format markdown
# 数据质量评分器
python scripts/quality_scorer.py --model fct_orders
每周安装次数
78
代码仓库
GitHub 星标数
32
首次出现
2026年1月24日
安全审计
安装于
claude-code60
opencode55
gemini-cli49
codex46
github-copilot45
cursor45
Expert-level analytics engineering for scalable data transformation.
SOURCES → INGESTION → WAREHOUSE → TRANSFORMATION → SEMANTIC → BI
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
APIs Fivetran Snowflake dbt Looker Tableau
DBs Airbyte BigQuery Dataform Transform PBI
Files Stitch Redshift Spark SQL dbt ML Metabase
analytics/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/ # Raw → Cleaned
│ │ ├── stg_*.sql
│ │ └── _stg_*.yml
│ ├── intermediate/ # Business logic
│ │ ├── int_*.sql
│ │ └── _int_*.yml
│ └── marts/ # Final models
│ ├── core/
│ │ ├── dim_*.sql
│ │ └── fct_*.sql
│ ├── marketing/
│ └── finance/
├── macros/
├── tests/
├── seeds/
├── snapshots/
└── analyses/
Star Schema:
┌──────────────┐
│ dim_date │
└──────┬───────┘
│
┌──────────────┐ ┌──────┴───────┐ ┌──────────────┐
│ dim_customer │────│ fct_orders │────│ dim_product │
└──────────────┘ └──────┬───────┘ └──────────────┘
│
┌──────┴───────┐
│ dim_store │
└──────────────┘
Dimension Table Pattern:
-- models/marts/core/dim_customer.sql
WITH customers AS (
SELECT * FROM {{ ref('stg_crm__customers') }}
),
addresses AS (
SELECT * FROM {{ ref('stg_crm__addresses') }}
),
customer_orders AS (
SELECT
customer_id,
MIN(order_date) AS first_order_date,
MAX(order_date) AS most_recent_order_date,
COUNT(*) AS lifetime_orders,
SUM(order_amount) AS lifetime_value
FROM {{ ref('stg_orders__orders') }}
GROUP BY customer_id
),
final AS (
SELECT
customers.customer_id,
customers.customer_name,
customers.email,
customers.created_at,
addresses.city,
addresses.state,
addresses.country,
customer_orders.first_order_date,
customer_orders.most_recent_order_date,
customer_orders.lifetime_orders,
customer_orders.lifetime_value,
CASE
WHEN customer_orders.lifetime_value >= 10000 THEN 'platinum'
WHEN customer_orders.lifetime_value >= 5000 THEN 'gold'
WHEN customer_orders.lifetime_value >= 1000 THEN 'silver'
ELSE 'bronze'
END AS customer_tier
FROM customers
LEFT JOIN addresses
ON customers.address_id = addresses.address_id
LEFT JOIN customer_orders
ON customers.customer_id = customer_orders.customer_id
)
SELECT * FROM final
Fact Table Pattern:
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
partition_by={'field': 'order_date', 'data_type': 'date'},
cluster_by=['customer_id', 'product_id']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders__orders') }}
{% if is_incremental() %}
WHERE order_date >= (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
),
order_items AS (
SELECT * FROM {{ ref('stg_orders__order_items') }}
),
final AS (
SELECT
orders.order_id,
orders.order_date,
orders.customer_id,
order_items.product_id,
orders.store_id,
order_items.quantity,
order_items.unit_price,
order_items.quantity * order_items.unit_price AS line_total,
orders.discount_amount,
orders.tax_amount,
orders.shipping_amount,
orders.total_amount
FROM orders
INNER JOIN order_items
ON orders.order_id = order_items.order_id
)
SELECT * FROM final
-- models/staging/crm/stg_crm__customers.sql
WITH source AS (
SELECT * FROM {{ source('crm', 'customers') }}
),
renamed AS (
SELECT
-- Primary key
id AS customer_id,
-- Strings
TRIM(LOWER(name)) AS customer_name,
TRIM(LOWER(email)) AS email,
-- Dates
created_at::timestamp AS created_at,
updated_at::timestamp AS updated_at,
-- Booleans
is_active::boolean AS is_active,
-- Metadata
_fivetran_synced AS _loaded_at
FROM source
WHERE _fivetran_deleted = false
)
SELECT * FROM renamed
# models/staging/crm/_crm__sources.yml
version: 2
sources:
- name: crm
description: Customer relationship management system
database: raw
schema: crm
loader: fivetran
loaded_at_field: _fivetran_synced
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
tables:
- name: customers
description: Customer master data
columns:
- name: id
description: Primary key
tests:
- unique
- not_null
- name: email
tests:
- unique
# models/marts/core/_core__models.yml
version: 2
models:
- name: dim_customer
description: Customer dimension table
columns:
- name: customer_id
description: Primary key
tests:
- unique
- not_null
- name: email
tests:
- unique
- not_null
- name: customer_tier
tests:
- accepted_values:
values: ['platinum', 'gold', 'silver', 'bronze']
- name: lifetime_value
tests:
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: fct_orders
description: Order fact table
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- order_id
- product_id
columns:
- name: customer_id
tests:
- relationships:
to: ref('dim_customer')
field: customer_id
-- tests/assert_positive_amounts.sql
{% test positive_amount(model, column_name) %}
SELECT
{{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} < 0
{% endtest %}
-- tests/generic/assert_row_count_equal.sql
{% test row_count_equal(model, compare_model) %}
WITH source_count AS (
SELECT COUNT(*) AS cnt FROM {{ model }}
),
compare_count AS (
SELECT COUNT(*) AS cnt FROM {{ ref(compare_model) }}
)
SELECT *
FROM source_count
CROSS JOIN compare_count
WHERE source_count.cnt != compare_count.cnt
{% endtest %}
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
({{ column_name }} / 100.0)::decimal(18,2)
{% endmacro %}
-- macros/pivot_values.sql
{% macro pivot_values(column_name, values, alias_prefix='') %}
{% for value in values %}
SUM(CASE WHEN {{ column_name }} = '{{ value }}' THEN 1 ELSE 0 END)
AS {{ alias_prefix }}{{ value | lower | replace(' ', '_') }}
{% if not loop.last %},{% endif %}
{% endfor %}
{% endmacro %}
-- macros/incremental_filter.sql
{% macro get_incremental_filter(column_name, lookback_days=3) %}
{% if is_incremental() %}
WHERE {{ column_name }} >= (
SELECT DATEADD(day, -{{ lookback_days }}, MAX({{ column_name }}))
FROM {{ this }}
)
{% endif %}
{% endmacro %}
# models/marts/core/_core__metrics.yml
version: 2
metrics:
- name: revenue
label: Total Revenue
model: ref('fct_orders')
description: Sum of all order amounts
calculation_method: sum
expression: total_amount
timestamp: order_date
time_grains: [day, week, month, quarter, year]
dimensions:
- customer_tier
- product_category
- store_region
filters:
- field: is_cancelled
operator: '='
value: 'false'
- name: average_order_value
label: Average Order Value
model: ref('fct_orders')
description: Average order amount
calculation_method: average
expression: total_amount
timestamp: order_date
time_grains: [day, week, month]
- name: customer_count
label: Customer Count
model: ref('dim_customer')
calculation_method: count_distinct
expression: customer_id
# models/exposures.yml
version: 2
exposures:
- name: executive_dashboard
type: dashboard
maturity: high
url: https://tableau.company.com/views/executive
description: Executive KPI dashboard
depends_on:
- ref('fct_orders')
- ref('dim_customer')
- ref('dim_product')
owner:
name: Analytics Team
email: analytics@company.com
- name: marketing_report
type: notebook
maturity: medium
url: https://databricks.company.com/notebooks/marketing
depends_on:
- ref('fct_marketing_events')
- ref('dim_campaign')
owner:
name: Marketing Analytics
email: marketing-analytics@company.com
| Layer | Materialization | Reason |
|---|---|---|
| Staging | View | Raw data, no aggregation |
| Intermediate | Ephemeral/View | Business logic, referenced multiple times |
| Marts (small) | Table | Final models, query performance |
| Marts (large) | Incremental | Large fact tables, efficiency |
-- Before: Expensive window function on full table
SELECT
order_id,
customer_id,
order_date,
SUM(amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
) AS running_total
FROM orders;
-- After: Pre-aggregate then join
WITH daily_totals AS (
SELECT
customer_id,
order_date,
SUM(amount) AS daily_amount
FROM orders
GROUP BY customer_id, order_date
),
running_totals AS (
SELECT
customer_id,
order_date,
SUM(daily_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
) AS running_total
FROM daily_totals
)
SELECT
o.order_id,
o.customer_id,
o.order_date,
rt.running_total
FROM orders o
JOIN running_totals rt
ON o.customer_id = rt.customer_id
AND o.order_date = rt.order_date;
{{
config(
materialized='incremental',
unique_key='event_id',
partition_by={
'field': 'event_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['user_id', 'event_type']
)
}}
# .github/workflows/dbt.yml
name: dbt CI/CD
on:
pull_request:
branches: [main]
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: pip install dbt-snowflake
- name: dbt deps
run: dbt deps
- name: dbt compile
run: dbt compile --target ci
- name: dbt test
run: dbt test --target ci
deploy:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: dbt run
run: dbt run --target prod
- name: dbt test
run: dbt test --target prod
# Only run modified models and downstream
dbt run --select state:modified+ --defer --state ./target-base
dbt test --select state:modified+ --defer --state ./target-base
# models/marts/core/_core__models.yml
version: 2
models:
- name: fct_orders
description: |
Order fact table containing one row per order line item.
## Business Logic
- Orders with status 'cancelled' are excluded
- Amounts are in USD
- Tax is calculated at time of order
## Usage
```sql
SELECT * FROM {{ ref('fct_orders') }}
WHERE order_date >= '2024-01-01'
```
## Dependencies
- stg_orders__orders
- stg_orders__order_items
# Generate and serve documentation
dbt docs generate
dbt docs serve --port 8080
references/modeling_patterns.md - Data modeling best practicesreferences/dbt_style_guide.md - SQL and dbt conventionsreferences/testing_guide.md - Testing strategiesreferences/optimization.md - Performance tuning# Model impact analyzer
python scripts/impact_analyzer.py --model dim_customer
# Schema change detector
python scripts/schema_diff.py --source prod --target dev
# Documentation generator
python scripts/doc_generator.py --format markdown
# Data quality scorer
python scripts/quality_scorer.py --model fct_orders
Weekly Installs
78
Repository
GitHub Stars
32
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
claude-code60
opencode55
gemini-cli49
codex46
github-copilot45
cursor45
Excel财务建模规范与xlsx文件处理指南:专业格式、零错误公式与数据分析
46,700 周安装
TanStack Query (React Query) 教程:React 异步状态管理与数据获取库
239 周安装
科学问题选择技能:基于Cell论文的科研项目决策框架,提升研究影响力
251 周安装
MongoDB与PostgreSQL数据库指南:选择、查询、优化与部署实战
236 周安装
资深架构师AI助手 - Claude高级架构设计技能,提升系统架构与代码质量
235 周安装
Polymarket交易员分析工具:追踪链上交易活动与持仓数据
237 周安装
Next.js开发技能:构建现代全栈Web应用指南(App Router/服务器组件/SEO优化)
70 周安装