重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
npx skills add https://github.com/personamanagmentlayer/pcl --skill dbt-expert您是一位 dbt(数据构建工具)专家,在数据建模、测试、文档编写、增量模型、宏、Jinja 模板和数据分析工程最佳实践方面拥有深厚的知识。您设计可维护、经过测试且有完善文档的数据转换流水线。
dbt_project.yml:
name: 'analytics'
version: '1.0.0'
config-version: 2
profile: 'analytics'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets:
- "target"
- "dbt_packages"
models:
analytics:
# 暂存模型(源系统副本)
staging:
+materialized: view
+schema: staging
+tags: ["staging"]
# 中间模型(业务逻辑)
intermediate:
+materialized: ephemeral
+schema: intermediate
+tags: ["intermediate"]
# 集市模型(用于 BI 的最终表)
marts:
+materialized: table
+schema: marts
+tags: ["marts"]
finance:
+schema: finance
marketing:
+schema: marketing
# 模型特定配置
models:
staging:
+persist_docs:
relation: true
columns: true
vars:
# 全局变量
start_date: '2024-01-01'
exclude_test_data: true
on-run-start:
- "{{ log('Starting dbt run...', info=true) }}"
on-run-end:
- "{{ log('dbt run completed!', info=true) }}"
profiles.yml:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
analytics:
target: dev
outputs:
dev:
type: postgres
host: localhost
port: 5432
user: "{{ env_var('DBT_USER') }}"
password: "{{ env_var('DBT_PASSWORD') }}"
dbname: analytics_dev
schema: dbt_{{ env_var('USER') }}
threads: 4
keepalives_idle: 0
prod:
type: postgres
host: prod-db.company.com
port: 5432
user: "{{ env_var('DBT_PROD_USER') }}"
password: "{{ env_var('DBT_PROD_PASSWORD') }}"
dbname: analytics_prod
schema: analytics
threads: 8
keepalives_idle: 0
snowflake:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
role: transformer
database: analytics
warehouse: transforming
schema: dbt_{{ env_var('USER') }}
threads: 8
sources.yml:
version: 2
sources:
- name: raw_postgres
description: 来自生产 PostgreSQL 数据库的原始数据
database: production
schema: public
tables:
- name: users
description: 用户账户信息
columns:
- name: id
description: 主键
tests:
- unique
- not_null
- name: email
description: 用户电子邮件地址
tests:
- unique
- not_null
- name: created_at
description: 账户创建时间戳
tests:
- not_null
# 新鲜度检查
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
# 加载时间戳
loaded_at_field: _synced_at
- name: orders
description: 订单交易
columns:
- name: id
tests:
- unique
- not_null
- name: user_id
description: 指向 users 表的外键
tests:
- not_null
- relationships:
to: source('raw_postgres', 'users')
field: id
- name: total_amount
tests:
- not_null
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
- name: raw_s3
description: 来自 S3 的原始数据文件
meta:
external_location: 's3://company-data/raw/'
tables:
- name: events
description: 事件跟踪数据
external:
location: 's3://company-data/raw/events/'
file_format: parquet
暂存模型:
-- models/staging/stg_users.sql
{{
config(
materialized='view',
tags=['daily']
)
}}
with source as (
select * from {{ source('raw_postgres', 'users') }}
),
renamed as (
select
-- 主键
id as user_id,
-- 属性
email,
first_name,
last_name,
{{ dbt_utils.generate_surrogate_key(['email']) }} as user_key,
-- 标志位
is_active,
is_deleted,
-- 时间戳
created_at,
updated_at,
deleted_at,
-- 元数据
_synced_at as dbt_loaded_at
from source
where not is_deleted or deleted_at is null
)
select * from renamed
-- models/staging/stg_orders.sql
{{
config(
materialized='view'
)
}}
with source as (
select * from {{ source('raw_postgres', 'orders') }}
),
renamed as (
select
-- 主键
id as order_id,
-- 外键
user_id,
-- 指标
total_amount,
tax_amount,
shipping_amount,
total_amount - tax_amount - shipping_amount as subtotal,
-- 维度
status,
payment_method,
-- 时间戳
created_at as order_created_at,
updated_at as order_updated_at,
completed_at
from source
)
select * from renamed
中间模型:
-- models/intermediate/int_order_items_joined.sql
{{
config(
materialized='ephemeral'
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
order_items as (
select * from {{ ref('stg_order_items') }}
),
products as (
select * from {{ ref('stg_products') }}
),
joined as (
select
orders.order_id,
orders.user_id,
orders.order_created_at,
order_items.order_item_id,
order_items.quantity,
order_items.unit_price,
products.product_id,
products.product_name,
products.category,
order_items.quantity * order_items.unit_price as line_total
from orders
inner join order_items
on orders.order_id = order_items.order_id
inner join products
on order_items.product_id = products.product_id
)
select * from joined
集市模型:
-- models/marts/fct_orders.sql
{{
config(
materialized='table',
tags=['fact']
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
order_items as (
select
order_id,
count(*) as item_count,
sum(quantity) as total_quantity,
sum(line_total) as items_subtotal
from {{ ref('int_order_items_joined') }}
group by order_id
),
final as (
select
-- 主键
orders.order_id,
-- 外键
orders.user_id,
-- 指标
orders.total_amount,
orders.subtotal,
orders.tax_amount,
orders.shipping_amount,
order_items.item_count,
order_items.total_quantity,
-- 维度
orders.status,
orders.payment_method,
-- 时间戳
orders.order_created_at,
orders.completed_at,
-- 元数据
current_timestamp() as dbt_updated_at
from orders
left join order_items
on orders.order_id = order_items.order_id
)
select * from final
-- models/marts/dim_customers.sql
{{
config(
materialized='table',
tags=['dimension']
)
}}
with users as (
select * from {{ ref('stg_users') }}
),
orders as (
select * from {{ ref('fct_orders') }}
),
customer_orders as (
select
user_id,
count(*) as lifetime_orders,
sum(total_amount) as lifetime_value,
avg(total_amount) as avg_order_value,
min(order_created_at) as first_order_at,
max(order_created_at) as last_order_at,
max(completed_at) as last_completed_at
from orders
where status = 'completed'
group by user_id
),
final as (
select
-- 主键
users.user_id,
users.user_key,
-- 属性
users.email,
users.first_name,
users.last_name,
users.first_name || ' ' || users.last_name as full_name,
-- 客户指标
coalesce(customer_orders.lifetime_orders, 0) as lifetime_orders,
coalesce(customer_orders.lifetime_value, 0) as lifetime_value,
customer_orders.avg_order_value,
-- 细分
case
when customer_orders.lifetime_value >= 10000 then 'VIP'
when customer_orders.lifetime_value >= 5000 then '高价值'
when customer_orders.lifetime_value >= 1000 then '中等价值'
when customer_orders.lifetime_value > 0 then '低价值'
else '无订单'
end as customer_segment,
-- 时间戳
users.created_at as user_created_at,
customer_orders.first_order_at,
customer_orders.last_order_at,
-- 元数据
current_timestamp() as dbt_updated_at
from users
left join customer_orders
on users.user_id = customer_orders.user_id
where users.is_active
)
select * from final
增量加载:
-- models/marts/fct_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
on_schema_change='fail',
incremental_strategy='merge'
)
}}
with events as (
select * from {{ ref('stg_events') }}
{% if is_incremental() %}
-- 仅加载新事件
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}
),
enriched as (
select
event_id,
user_id,
event_type,
event_timestamp,
{{ dbt_utils.generate_surrogate_key(['user_id', 'event_timestamp']) }} as event_key,
properties,
current_timestamp() as dbt_loaded_at
from events
)
select * from enriched
-- 使用 delete + insert 的增量加载
{{
config(
materialized='incremental',
unique_key='date',
incremental_strategy='delete+insert'
)
}}
with daily_metrics as (
select
date_trunc('day', order_created_at) as date,
count(*) as order_count,
sum(total_amount) as revenue
from {{ ref('fct_orders') }}
{% if is_incremental() %}
where date_trunc('day', order_created_at) >= date_trunc('day', current_date - interval '7 days')
{% endif %}
group by 1
)
select * from daily_metrics
模式测试:
# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: 订单交易事实表
columns:
- name: order_id
description: 唯一订单标识符
tests:
- unique
- not_null
- name: user_id
description: 客户标识符
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: user_id
- name: total_amount
description: 订单总金额
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
- name: dim_customers
description: 客户维度表
tests:
# 表级测试
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- user_id
- email
自定义测试:
-- tests/assert_positive_revenue.sql
-- 如果任何日收入为负,此测试将失败
select
date,
sum(total_amount) as revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by date
having sum(total_amount) < 0
-- tests/assert_order_counts_match.sql
-- 检查表之间的订单计数是否匹配
with orders_table as (
select count(*) as order_count
from {{ ref('fct_orders') }}
),
events_table as (
select count(distinct order_id) as order_count
from {{ ref('fct_events') }}
where event_type = 'order_completed'
)
select *
from orders_table
cross join events_table
where orders_table.order_count != events_table.order_count
数据测试:
-- tests/generic/test_valid_percentage.sql
{% test valid_percentage(model, column_name) %}
select *
from {{ model }}
where {{ column_name }} < 0 or {{ column_name }} > 1
{% endtest %}
-- 在 schema.yml 中的用法
# - name: conversion_rate
# tests:
# - valid_percentage
可重用宏:
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, scale=2) %}
({{ column_name }} / 100.0)::numeric(16, {{ scale }})
{% endmacro %}
-- 用法: {{ cents_to_dollars('price_cents') }}
-- macros/generate_alias_name.sql
{% macro generate_alias_name(custom_alias_name=none, node=none) -%}
{%- if custom_alias_name is none -%}
{{ node.name }}
{%- else -%}
{{ custom_alias_name | trim }}
{%- endif -%}
{%- endmacro %}
-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}
with date_spine as (
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('" ~ start_date ~ "' as date)",
end_date="cast('" ~ end_date ~ "' as date)"
) }}
)
select date_day
from date_spine
{% endmacro %}
-- macros/grant_select.sql
{% macro grant_select(schema, role) %}
{% set sql %}
grant select on all tables in schema {{ schema }} to {{ role }};
{% endset %}
{% do run_query(sql) %}
{% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}
{% endmacro %}
-- 在 on-run-end 钩子中的用法
-- {{ grant_select('analytics', 'analyst') }}
高级宏:
-- macros/pivot_metrics.sql
{% macro pivot_metrics(column, metric, values) %}
{% for value in values %}
sum(case when {{ column }} = '{{ value }}' then {{ metric }} else 0 end)
as {{ value | replace(' ', '_') | lower }}
{%- if not loop.last -%},{%- endif %}
{% endfor %}
{% endmacro %}
-- 用法:
-- select
-- date,
-- {{ pivot_metrics('status', 'total_amount', ['pending', 'completed', 'cancelled']) }}
-- from orders
-- group by date
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' and custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
时间戳策略:
-- snapshots/orders_snapshot.sql
{% snapshot orders_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='order_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
select * from {{ source('raw_postgres', 'orders') }}
{% endsnapshot %}
检查策略:
-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='check',
check_cols=['email', 'status', 'plan_type'],
invalidate_hard_deletes=True
)
}}
select * from {{ source('raw_postgres', 'customers') }}
{% endsnapshot %}
模型文档:
# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: |
# 订单交易事实表
此表包含每个订单的一行记录,以及相关的指标和维度。
## 粒度
每个订单一行
## 新鲜度
通过增量加载每小时更新
## 用途
用于订单分析和报告的主要表
columns:
- name: order_id
description: 唯一订单标识符(主键)
tests:
- unique
- not_null
- name: total_amount
description: |
订单总金额(含税和运费)。
公式:`subtotal + tax_amount + shipping_amount`
- name: customer_segment
description: 客户价值细分
meta:
dimension:
type: category
label: 客户细分
自定义文档:
<!-- docs/overview.md -->
{% docs __overview__ %}
# 分析 dbt 项目
此 dbt 项目将来自我们生产系统的原始数据转换为可供 BI 和数据科学用例使用的分析就绪模型。
## 数据源
- PostgreSQL(生产数据库)
- S3(事件跟踪)
- Snowflake(外部数据)
## 模型层
1. **暂存层**:轻度转换,重命名
2. **中间层**:业务逻辑,连接
3. **集市层**:供消费的最终表
{% enddocs %}
packages.yml:
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
- package: calogica/dbt_expectations
version: 0.10.0
- package: dbt-labs/codegen
version: 0.12.1
- git: "https://github.com/dbt-labs/dbt-audit-helper.git"
revision: 0.9.0
使用包:
-- 使用 dbt_utils
select
{{ dbt_utils.generate_surrogate_key(['user_id', 'order_id']) }} as order_key,
{{ dbt_utils.safe_divide('revenue', 'orders') }} as avg_order_value,
{{ dbt_utils.star(from=ref('stg_orders'), except=['_synced_at']) }}
from {{ ref('stg_orders') }}
-- 使用 dbt_expectations
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100
-- 反面:许多嵌套的 CTE
with cte1 as (...), cte2 as (...), cte3 as (...)
-- 还有 20 多个 CTE
select * from cte23
-- 正面:拆分为中间模型
select * from {{ ref('int_cleaned_data') }}
-- 反面:直接表引用
select * from analytics.staging.stg_orders
-- 正面:使用 ref
select * from {{ ref('stg_orders') }}
-- 反面:没有测试
-- 正面:始终测试主键和外键
columns:
- name: id
tests: [unique, not_null]
-- 反面:硬编码日期
where created_at >= '2024-01-01'
-- 正面:使用变量
where created_at >= '{{ var("start_date") }}'
每周安装次数
57
代码仓库
GitHub 星标数
12
首次出现
2026年1月24日
安全审计
安装于
opencode48
codex45
gemini-cli44
cursor41
claude-code40
github-copilot40
You are an expert in dbt (data build tool) with deep knowledge of data modeling, testing, documentation, incremental models, macros, Jinja templating, and analytics engineering best practices. You design maintainable, tested, and documented data transformation pipelines.
dbt_project.yml:
name: 'analytics'
version: '1.0.0'
config-version: 2
profile: 'analytics'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets:
- "target"
- "dbt_packages"
models:
analytics:
# Staging models (source system copies)
staging:
+materialized: view
+schema: staging
+tags: ["staging"]
# Intermediate models (business logic)
intermediate:
+materialized: ephemeral
+schema: intermediate
+tags: ["intermediate"]
# Mart models (final tables for BI)
marts:
+materialized: table
+schema: marts
+tags: ["marts"]
finance:
+schema: finance
marketing:
+schema: marketing
# Model-specific configs
models:
staging:
+persist_docs:
relation: true
columns: true
vars:
# Global variables
start_date: '2024-01-01'
exclude_test_data: true
on-run-start:
- "{{ log('Starting dbt run...', info=true) }}"
on-run-end:
- "{{ log('dbt run completed!', info=true) }}"
profiles.yml:
analytics:
target: dev
outputs:
dev:
type: postgres
host: localhost
port: 5432
user: "{{ env_var('DBT_USER') }}"
password: "{{ env_var('DBT_PASSWORD') }}"
dbname: analytics_dev
schema: dbt_{{ env_var('USER') }}
threads: 4
keepalives_idle: 0
prod:
type: postgres
host: prod-db.company.com
port: 5432
user: "{{ env_var('DBT_PROD_USER') }}"
password: "{{ env_var('DBT_PROD_PASSWORD') }}"
dbname: analytics_prod
schema: analytics
threads: 8
keepalives_idle: 0
snowflake:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
role: transformer
database: analytics
warehouse: transforming
schema: dbt_{{ env_var('USER') }}
threads: 8
sources.yml:
version: 2
sources:
- name: raw_postgres
description: Raw data from production PostgreSQL database
database: production
schema: public
tables:
- name: users
description: User account information
columns:
- name: id
description: Primary key
tests:
- unique
- not_null
- name: email
description: User email address
tests:
- unique
- not_null
- name: created_at
description: Account creation timestamp
tests:
- not_null
# Freshness checks
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
# Loaded at timestamp
loaded_at_field: _synced_at
- name: orders
description: Order transactions
columns:
- name: id
tests:
- unique
- not_null
- name: user_id
description: Foreign key to users
tests:
- not_null
- relationships:
to: source('raw_postgres', 'users')
field: id
- name: total_amount
tests:
- not_null
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
- name: raw_s3
description: Raw data files from S3
meta:
external_location: 's3://company-data/raw/'
tables:
- name: events
description: Event tracking data
external:
location: 's3://company-data/raw/events/'
file_format: parquet
Staging Models:
-- models/staging/stg_users.sql
{{
config(
materialized='view',
tags=['daily']
)
}}
with source as (
select * from {{ source('raw_postgres', 'users') }}
),
renamed as (
select
-- Primary key
id as user_id,
-- Attributes
email,
first_name,
last_name,
{{ dbt_utils.generate_surrogate_key(['email']) }} as user_key,
-- Flags
is_active,
is_deleted,
-- Timestamps
created_at,
updated_at,
deleted_at,
-- Metadata
_synced_at as dbt_loaded_at
from source
where not is_deleted or deleted_at is null
)
select * from renamed
-- models/staging/stg_orders.sql
{{
config(
materialized='view'
)
}}
with source as (
select * from {{ source('raw_postgres', 'orders') }}
),
renamed as (
select
-- Primary key
id as order_id,
-- Foreign keys
user_id,
-- Metrics
total_amount,
tax_amount,
shipping_amount,
total_amount - tax_amount - shipping_amount as subtotal,
-- Dimensions
status,
payment_method,
-- Timestamps
created_at as order_created_at,
updated_at as order_updated_at,
completed_at
from source
)
select * from renamed
Intermediate Models:
-- models/intermediate/int_order_items_joined.sql
{{
config(
materialized='ephemeral'
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
order_items as (
select * from {{ ref('stg_order_items') }}
),
products as (
select * from {{ ref('stg_products') }}
),
joined as (
select
orders.order_id,
orders.user_id,
orders.order_created_at,
order_items.order_item_id,
order_items.quantity,
order_items.unit_price,
products.product_id,
products.product_name,
products.category,
order_items.quantity * order_items.unit_price as line_total
from orders
inner join order_items
on orders.order_id = order_items.order_id
inner join products
on order_items.product_id = products.product_id
)
select * from joined
Mart Models:
-- models/marts/fct_orders.sql
{{
config(
materialized='table',
tags=['fact']
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
order_items as (
select
order_id,
count(*) as item_count,
sum(quantity) as total_quantity,
sum(line_total) as items_subtotal
from {{ ref('int_order_items_joined') }}
group by order_id
),
final as (
select
-- Primary key
orders.order_id,
-- Foreign keys
orders.user_id,
-- Metrics
orders.total_amount,
orders.subtotal,
orders.tax_amount,
orders.shipping_amount,
order_items.item_count,
order_items.total_quantity,
-- Dimensions
orders.status,
orders.payment_method,
-- Timestamps
orders.order_created_at,
orders.completed_at,
-- Metadata
current_timestamp() as dbt_updated_at
from orders
left join order_items
on orders.order_id = order_items.order_id
)
select * from final
-- models/marts/dim_customers.sql
{{
config(
materialized='table',
tags=['dimension']
)
}}
with users as (
select * from {{ ref('stg_users') }}
),
orders as (
select * from {{ ref('fct_orders') }}
),
customer_orders as (
select
user_id,
count(*) as lifetime_orders,
sum(total_amount) as lifetime_value,
avg(total_amount) as avg_order_value,
min(order_created_at) as first_order_at,
max(order_created_at) as last_order_at,
max(completed_at) as last_completed_at
from orders
where status = 'completed'
group by user_id
),
final as (
select
-- Primary key
users.user_id,
users.user_key,
-- Attributes
users.email,
users.first_name,
users.last_name,
users.first_name || ' ' || users.last_name as full_name,
-- Customer metrics
coalesce(customer_orders.lifetime_orders, 0) as lifetime_orders,
coalesce(customer_orders.lifetime_value, 0) as lifetime_value,
customer_orders.avg_order_value,
-- Segmentation
case
when customer_orders.lifetime_value >= 10000 then 'VIP'
when customer_orders.lifetime_value >= 5000 then 'High Value'
when customer_orders.lifetime_value >= 1000 then 'Medium Value'
when customer_orders.lifetime_value > 0 then 'Low Value'
else 'No Orders'
end as customer_segment,
-- Timestamps
users.created_at as user_created_at,
customer_orders.first_order_at,
customer_orders.last_order_at,
-- Metadata
current_timestamp() as dbt_updated_at
from users
left join customer_orders
on users.user_id = customer_orders.user_id
where users.is_active
)
select * from final
Incremental Loading:
-- models/marts/fct_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
on_schema_change='fail',
incremental_strategy='merge'
)
}}
with events as (
select * from {{ ref('stg_events') }}
{% if is_incremental() %}
-- Only load new events
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}
),
enriched as (
select
event_id,
user_id,
event_type,
event_timestamp,
{{ dbt_utils.generate_surrogate_key(['user_id', 'event_timestamp']) }} as event_key,
properties,
current_timestamp() as dbt_loaded_at
from events
)
select * from enriched
-- Incremental with delete + insert
{{
config(
materialized='incremental',
unique_key='date',
incremental_strategy='delete+insert'
)
}}
with daily_metrics as (
select
date_trunc('day', order_created_at) as date,
count(*) as order_count,
sum(total_amount) as revenue
from {{ ref('fct_orders') }}
{% if is_incremental() %}
where date_trunc('day', order_created_at) >= date_trunc('day', current_date - interval '7 days')
{% endif %}
group by 1
)
select * from daily_metrics
Schema Tests:
# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: Order transactions fact table
columns:
- name: order_id
description: Unique order identifier
tests:
- unique
- not_null
- name: user_id
description: Customer identifier
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: user_id
- name: total_amount
description: Order total amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
- name: dim_customers
description: Customer dimension table
tests:
# Table-level test
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- user_id
- email
Custom Tests:
-- tests/assert_positive_revenue.sql
-- This test fails if any daily revenue is negative
select
date,
sum(total_amount) as revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by date
having sum(total_amount) < 0
-- tests/assert_order_counts_match.sql
-- Check that order counts match between tables
with orders_table as (
select count(*) as order_count
from {{ ref('fct_orders') }}
),
events_table as (
select count(distinct order_id) as order_count
from {{ ref('fct_events') }}
where event_type = 'order_completed'
)
select *
from orders_table
cross join events_table
where orders_table.order_count != events_table.order_count
Data Tests:
-- tests/generic/test_valid_percentage.sql
{% test valid_percentage(model, column_name) %}
select *
from {{ model }}
where {{ column_name }} < 0 or {{ column_name }} > 1
{% endtest %}
-- Usage in schema.yml
# - name: conversion_rate
# tests:
# - valid_percentage
Reusable Macros:
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, scale=2) %}
({{ column_name }} / 100.0)::numeric(16, {{ scale }})
{% endmacro %}
-- Usage: {{ cents_to_dollars('price_cents') }}
-- macros/generate_alias_name.sql
{% macro generate_alias_name(custom_alias_name=none, node=none) -%}
{%- if custom_alias_name is none -%}
{{ node.name }}
{%- else -%}
{{ custom_alias_name | trim }}
{%- endif -%}
{%- endmacro %}
-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}
with date_spine as (
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('" ~ start_date ~ "' as date)",
end_date="cast('" ~ end_date ~ "' as date)"
) }}
)
select date_day
from date_spine
{% endmacro %}
-- macros/grant_select.sql
{% macro grant_select(schema, role) %}
{% set sql %}
grant select on all tables in schema {{ schema }} to {{ role }};
{% endset %}
{% do run_query(sql) %}
{% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}
{% endmacro %}
-- Usage in on-run-end hook
-- {{ grant_select('analytics', 'analyst') }}
Advanced Macros:
-- macros/pivot_metrics.sql
{% macro pivot_metrics(column, metric, values) %}
{% for value in values %}
sum(case when {{ column }} = '{{ value }}' then {{ metric }} else 0 end)
as {{ value | replace(' ', '_') | lower }}
{%- if not loop.last -%},{%- endif %}
{% endfor %}
{% endmacro %}
-- Usage:
-- select
-- date,
-- {{ pivot_metrics('status', 'total_amount', ['pending', 'completed', 'cancelled']) }}
-- from orders
-- group by date
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' and custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
Timestamp Strategy:
-- snapshots/orders_snapshot.sql
{% snapshot orders_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='order_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
select * from {{ source('raw_postgres', 'orders') }}
{% endsnapshot %}
Check Strategy:
-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='check',
check_cols=['email', 'status', 'plan_type'],
invalidate_hard_deletes=True
)
}}
select * from {{ source('raw_postgres', 'customers') }}
{% endsnapshot %}
Model Documentation:
# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: |
# Order Transactions Fact Table
This table contains one row per order with associated metrics and dimensions.
## Grain
One row per order
## Freshness
Updated hourly via incremental load
## Usage
Primary table for order analysis and reporting
columns:
- name: order_id
description: Unique order identifier (PK)
tests:
- unique
- not_null
- name: total_amount
description: |
Total order amount including tax and shipping.
Formula: `subtotal + tax_amount + shipping_amount`
- name: customer_segment
description: Customer value segment
meta:
dimension:
type: category
label: Customer Segment
Custom Documentation:
<!-- docs/overview.md -->
{% docs __overview__ %}
# Analytics dbt Project
This dbt project transforms raw data from our production systems into
analytics-ready models for BI and data science use cases.
## Data Sources
- PostgreSQL (production database)
- S3 (event tracking)
- Snowflake (external data)
## Model Layers
1. **Staging**: Light transformations, renaming
2. **Intermediate**: Business logic, joins
3. **Marts**: Final tables for consumption
{% enddocs %}
packages.yml:
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
- package: calogica/dbt_expectations
version: 0.10.0
- package: dbt-labs/codegen
version: 0.12.1
- git: "https://github.com/dbt-labs/dbt-audit-helper.git"
revision: 0.9.0
Using Packages:
-- Using dbt_utils
select
{{ dbt_utils.generate_surrogate_key(['user_id', 'order_id']) }} as order_key,
{{ dbt_utils.safe_divide('revenue', 'orders') }} as avg_order_value,
{{ dbt_utils.star(from=ref('stg_orders'), except=['_synced_at']) }}
from {{ ref('stg_orders') }}
-- Using dbt_expectations
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100
-- Bad: Many nested CTEs
with cte1 as (...), cte2 as (...), cte3 as (...)
-- 20 more CTEs
select * from cte23
-- Good: Break into intermediate models
select * from {{ ref('int_cleaned_data') }}
-- Bad: Direct table reference
select * from analytics.staging.stg_orders
-- Good: Use ref
select * from {{ ref('stg_orders') }}
-- Bad: No tests
-- Good: Always test PKs and FKs
columns:
- name: id
tests: [unique, not_null]
-- Bad: Hardcoded date
where created_at >= '2024-01-01'
-- Good: Use variables
where created_at >= '{{ var("start_date") }}'
Weekly Installs
57
Repository
GitHub Stars
12
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode48
codex45
gemini-cli44
cursor41
claude-code40
github-copilot40
前端代码审计工具 - 自动化检测可访问性、性能、响应式设计、主题化与反模式
53,100 周安装
技术文档最佳实践与模板:文档即代码、AI辅助、GEO优化(2026)
103 周安装
阿里云AI多模态QVQ测试技能:验证视觉推理模型与示例载荷
102 周安装
Wagmi 3.x 以太坊 React Hooks 使用指南 - 配置、连接器与核心Hooks详解
102 周安装
zapper技能 - Bankrbot开源AI技能库中的自动化工具,支持多平台安装
101 周安装
AR/VR开发专家 | WebXR与Vision Pro空间应用开发 | Three.js沉浸式3D体验
102 周安装
Elixir 1.17 & Phoenix 1.7 开发专家 | 函数式编程、Web开发、OTP模式
102 周安装