create-custom-dagster-component by c00ldudenoonan/economic-data-project
npx skills add https://github.com/c00ldudenoonan/economic-data-project --skill create-custom-dagster-component此技能使用 dg CLI 工具(以 uv 作为包管理器)自动化创建和验证新的自定义 Dagster 组件。它集成了演示模式功能,用于创建可在本地运行而无需外部依赖的逼真演示。关于创建良好组件的文档可在此处找到 https://docs.dagster.io/guides/build/components/creating-new-components/creating-and-registering-a-component,以及此处 https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-dbt/dagster_dbt/components/dbt_project/component.py 查看复杂组件的示例。
调用此技能时,它将:
dg scaffold component ComponentName 创建新的 Dagster 组件项目build_defs() 函数中填充组件逻辑,包含真实和演示模式的实现demo_mode 布尔标志,用于在真实和本地演示实现之间切换dg scaffold defs my_module.components.ComponentName my_component 命令填充它广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
dg check defs 和 dg list defs 验证组件是否正确加载,以确保所有预期的组件实例都已加载运行此技能前,请确保:
uv(使用 uv --version 检查)向用户询问:
MyDagsterComponent。验证:
使用 dg 创建组件
uv run dg scaffold component <ComponentName>
这将:
defs/components 中搭建新的 Dagster 组件component_name.py 文件填充组件文件中的 build_defs() 函数。组件应:
demo_mode 参数(默认:False)defs/ 文件夹中的 resources.py 文件中配置,使用 dg scaffold defs dagster.resource resources.pyassets 字段来描述底层组件中使用了哪些资产。有关如何设计良好 DSL 的最佳实践,请参阅 https://dagster.io/blog/dsls-to-the-rescue。参考 https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-dbt/dagster_dbt/components/dbt_project/component.py 和 https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-fivetran/dagster_fivetran/components/workspace_component/component.py 了解两种具有多资产的良好组件设计的参考架构。kinds 参数指示所使用的技术示例资产结构:
关键: 创建自定义组件时,请考虑什么将消费您组件的资产。您生成的资产键应与下游组件的期望保持一致,以避免需要按资产进行配置。
您的组件(上游)应生成一种结构的资产键,使下游组件能够自然地引用。这消除了对 meta.dagster.asset_key 或复杂转换配置的需求。
如果 dbt 将消费您的资产:
["<source_name>", "<table_name>"]["fivetran_raw", "customers"] 或 ["api_raw", "users"]source('fivetran_raw', 'customers')如果自定义 Dagster 资产将消费它们:
deps 中期望的键结构["category", "name"])["system", "subsystem", "type", "name"]如果另一个集成组件将消费它们:
如果您的资产是中间资产并由您自己的组件消费:
["raw", "table"] → ["processed", "table"] → ["enriched", "table"]import dagster as dg
class APIIngestionComponent(dg.Component, dg.Model, dg.Resolvable):
"""从 REST API 摄取数据。"""
api_endpoint: str
tables: list[str]
demo_mode: bool = False
def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
assets = []
for table in self.tables:
# 为 dbt 消费设计键:["api_raw", "table_name"]
# 不要:["api", "ingestion", "raw", "table_name"]
@dg.asset(
key=dg.AssetKey(["api_raw", table]), # ← 扁平化以便下游引用
kinds={"api", "python"},
)
def ingest_table(context: dg.AssetExecutionContext):
if self.demo_mode:
context.log.info(f"演示模式:模拟 API 调用 {table}")
return {"status": "demo", "rows": 100}
else:
# 真实 API 调用
pass
assets.append(ingest_table)
return dg.Definitions(assets=assets)
结果: dbt 可以自然地引用这些资产:
# sources.yml
sources:
- name: api_raw
tables:
- name: customers # 匹配 ["api_raw", "customers"]
始终验证资产键是否与下游依赖项对齐:
# 检查资产键及其依赖项
uv run dg list defs --json | uv run python -c "
import sys, json
assets = json.load(sys.stdin)['assets']
print('\\n'.join([f\"{a['key']}: deps={a.get('deps', [])}\" for a in assets]))
"
需要验证的内容:
deps 数组中列出了您的资产["category", "name"])❌ 嵌套过深: ["company", "team", "project", "environment", "table"]
❌ 结构不一致: 一些资产有 2 级,其他有 4 级
❌ 通用名称: ["data", "table1"], ["output", "result"]
✅ 良好模式:
["source_system", "entity"]: ["fivetran_raw", "customers"]["integration", "object"]: ["salesforce", "accounts"]["stage", "table"]: ["staging", "orders"]重要: 无论 demo_mode 是 True 还是 False,资产键都应完全相同。只有资产实现(函数体)应在不同模式之间有所不同。
为什么这很重要:
示例 - 正确的方法:
def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
@dg.asset(
key=dg.AssetKey(["fivetran_raw", "customers"]), # ← 两种模式中相同的键
kinds={"fivetran"},
)
def customers_sync(context: dg.AssetExecutionContext):
if self.demo_mode:
# 演示实现 - 模拟数据
context.log.info("演示模式:创建空表")
# ... 创建模拟表
else:
# 生产实现 - 真实的 Fivetran 同步
context.log.info("生产:从 Fivetran 同步")
# ... 调用 Fivetran API
return dg.Definitions(assets=[customers_sync])
示例 - 错误的方法:
def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
if self.demo_mode:
@dg.asset(
key=dg.AssetKey(["demo", "customers"]), # ❌ 不同的键!
)
def demo_customers():
pass
return dg.Definitions(assets=[demo_customers])
else:
@dg.asset(
key=dg.AssetKey(["fivetran_raw", "customers"]), # ❌ 不同的键!
)
def prod_customers():
pass
return dg.Definitions(assets=[prod_customers])
参考文档:
交叉参考 https://docs.dagster.io/llms.txt 获取最新的标题和描述
使用 https://docs.dagster.io/llms-full.txt 获取完整的 API 详细信息
检查可用集成:
uv run dg docs integrations --json
在组件中创建资产时,始终添加 kinds 参数,以按技术/集成类型正确分类资产。这有助于:
常见集成种类:
kinds={"fivetran"} 用于 Fivetran 资产kinds={"dbt"} 用于 dbt 资产kinds={"census"} 用于 Census 资产kinds={"sling"} 用于 Sling 资产kinds={"powerbi"} 用于 PowerBI 资产kinds={"looker"} 用于 Looker 资产kinds={"airbyte"} 用于 Airbyte 资产kinds={"python"} 用于自定义 Python 处理kinds={"snowflake"} 用于 Snowflake 资产您可以通过运行以下命令验证种类是否正确显示:
uv run dg list defs
“种类”列应显示每个资产的集成类型。
示例组件结构:
from dagster import asset, Definitions, AssetExecutionContext
from pydantic import BaseModel
class MyComponentParams(BaseModel):
demo_mode: bool = False
# ... 其他参数
class MyComponent(Component):
params_schema = MyComponentParams
def build_defs(self, context: ComponentLoadContext) -> Definitions:
params = self.params
@asset(
kinds={"fivetran"}, # ← 必需:添加集成种类
)
def raw_data(context: AssetExecutionContext):
if params.demo_mode:
# 演示实现 - 本地/模拟数据
context.log.info("使用本地数据在演示模式下运行")
pass
else:
# 真实实现 - 连接到实际系统
context.log.info("使用真实数据源运行")
pass
@asset(
deps=[raw_data],
kinds={"dbt"}, # ← 必需:添加集成种类
)
def processed_data(context: AssetExecutionContext):
if params.demo_mode:
context.log.info("处理演示数据")
pass
else:
context.log.info("处理真实数据")
pass
# ... 更多资产
return Definitions(assets=[raw_data, processed_data, ...])
使用 dg scaffold defs 创建组件实例:
uv run dg scaffold defs my_module.components.ComponentName my_component
这将创建一个 YAML 文件,其中应包含 demo_mode 参数:
type: my_module.components.ComponentName
attributes:
demo_mode: true # 对于本地演示设置为 true,对于真实部署设置为 false
# ... 其他参数
如果用户在步骤 1 中请求了自定义脚手架工具,请按照此处的说明操作:https://docs.dagster.io/guides/build/components/creating-new-components/component-customization#customizing-scaffolding-behavior
自定义脚手架工具,为此组件的实例创建提供更好的开发人员体验。
运行这些命令以确保一切正常:
# 检查定义是否加载无误
uv run dg check defs
# 列出所有资产以验证它们已创建
uv run dg list defs
验证:
demo_mode 标志在实现之间正确切换demo_mode: false 实现使用真实的资源并且是生产实现关键:验证资产键对齐
通过运行以下命令检查资产依赖关系是否正确:
uv run dg list defs --json | uv run python -c "
import sys, json
data = json.load(sys.stdin)
assets = data.get('assets', [])
print('资产依赖关系:\\n')
for asset in assets:
key = asset.get('key', 'unknown')
deps = asset.get('deps', [])
if deps:
print(f'{key}')
for dep in deps:
print(f' ← {dep}')
else:
print(f'{key} (无依赖关系)')
print()
"
需要验证的内容:
deps 数组中列出了上游资产["category", "name"])关键原则: 资产键在演示模式和生产模式之间应完全相同。只有资产实现(函数体)应有所不同。这确保:
如果实现了演示模式:
demo_mode: truedg check defs 以验证其在本地工作组件在以下情况下完成:
build_defs() 已实现,具有适当的资产逻辑kinds 元数据dg check defs 通过且无错误dg list defs 显示所有预期资产完成后,告知用户:
每周安装次数
1
仓库
GitHub 星标数
38
首次出现
今天
安全审计
安装于
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1
This skill automates the creation and validation of a new custom Dagster component using the dg CLI tool with uv as a package manager. It incorporates demo mode functionality for creating realistic demonstrations that can run locally without external dependencies. The documentation for creating good components can be found here https://docs.dagster.io/guides/build/components/creating-new-components/creating-and-registering-a-component and here https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-dbt/dagster_dbt/components/dbt_project/component.py for a complex example of a component.
When invoked, this skill will:
dg scaffold component ComponentNamebuild_defs() function with both real and demo mode implementationsdemo_mode boolean flag in the component YAML for toggling between real and local demo implementationsdg scaffold defs my_module.components.ComponentName my_component commanddg check defs and dg list defs to ensure that the expected component instances are all loaded.Before running this skill, ensure:
uv is installed (check with uv --version)Ask the user for:
MyDagsterComponent. Validate that:
Use dg to create the component
uv run dg scaffold component <ComponentName>
This will:
defs/componentscomponent_name.py fileFill in the build_defs() function in the component file. The component should:
demo_mode parameter in the component params (default: False)defs/ folder in a resources.py file, using dg scaffold defs dagster.resource resources.pyassets field in the YAML that describes what assets are used in the underlying component. See https://dagster.io/blog/dsls-to-the-rescue for best practices in how to design a good DSL. Refer to and for two reference architectures for good component design with mutli-assets.Example asset structure:
CRITICAL: When creating a custom component, consider what will consume your component's assets. The asset keys you generate should align with downstream component expectations to avoid requiring per-asset configuration.
Your component (upstream) should generate asset keys in a structure that downstream components naturally reference. This eliminates the need for meta.dagster.asset_key or complex translation configuration.
If dbt will consume your assets:
["<source_name>", "<table_name>"]["fivetran_raw", "customers"] or ["api_raw", "users"]source('fivetran_raw', 'customers')If custom Dagster assets will consume them:
deps["category", "name"])["system", "subsystem", "type", "name"] unless necessaryIf another integration component will consume them:
If your assets are intermediate and consumed by your own component:
["raw", "table"] → ["processed", "table"] → ["enriched", "table"]import dagster as dg
class APIIngestionComponent(dg.Component, dg.Model, dg.Resolvable):
"""Ingests data from REST APIs."""
api_endpoint: str
tables: list[str]
demo_mode: bool = False
def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
assets = []
for table in self.tables:
# Design key for dbt consumption: ["api_raw", "table_name"]
# NOT: ["api", "ingestion", "raw", "table_name"]
@dg.asset(
key=dg.AssetKey(["api_raw", table]), # ← Flattened for easy downstream reference
kinds={"api", "python"},
)
def ingest_table(context: dg.AssetExecutionContext):
if self.demo_mode:
context.log.info(f"Demo mode: Mocking API call for {table}")
return {"status": "demo", "rows": 100}
else:
# Real API call
pass
assets.append(ingest_table)
return dg.Definitions(assets=assets)
Result: dbt can reference these assets naturally:
# sources.yml
sources:
- name: api_raw
tables:
- name: customers # Matches ["api_raw", "customers"]
Always verify asset keys align with downstream dependencies:
# Check asset keys and their dependencies
uv run dg list defs --json | uv run python -c "
import sys, json
assets = json.load(sys.stdin)['assets']
print('\\n'.join([f\"{a['key']}: deps={a.get('deps', [])}\" for a in assets]))
"
What to verify:
deps array["category", "name"])❌ Too deeply nested: ["company", "team", "project", "environment", "table"]
❌ Inconsistent structure: Some assets with 2 levels, others with 4
❌ Generic names: ["data", "table1"], ["output", "result"]
✅ Good patterns:
["source_system", "entity"]: ["fivetran_raw", "customers"]["integration", "object"]: ["salesforce", "accounts"]["stage", "table"]: ["staging", "orders"]IMPORTANT: Asset keys should be exactly the same whether demo_mode is True or False. Only the asset implementation (the function body) should differ between modes.
Why this matters:
Example - CORRECT approach:
def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
@dg.asset(
key=dg.AssetKey(["fivetran_raw", "customers"]), # ← Same key in both modes
kinds={"fivetran"},
)
def customers_sync(context: dg.AssetExecutionContext):
if self.demo_mode:
# Demo implementation - mock data
context.log.info("Demo mode: Creating empty table")
# ... create mock table
else:
# Production implementation - real Fivetran sync
context.log.info("Production: Syncing from Fivetran")
# ... call Fivetran API
return dg.Definitions(assets=[customers_sync])
Example - INCORRECT approach:
def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
if self.demo_mode:
@dg.asset(
key=dg.AssetKey(["demo", "customers"]), # ❌ Different key!
)
def demo_customers():
pass
return dg.Definitions(assets=[demo_customers])
else:
@dg.asset(
key=dg.AssetKey(["fivetran_raw", "customers"]), # ❌ Different key!
)
def prod_customers():
pass
return dg.Definitions(assets=[prod_customers])
Reference Documentation:
Cross-reference https://docs.dagster.io/llms.txt for up-to-date titles and descriptions
Use https://docs.dagster.io/llms-full.txt for full API details
Check available integrations with:
uv run dg docs integrations --json
When creating assets in your component, ALWAYS add thekinds parameter to properly categorize assets by their technology/integration type. This helps with:
Common integration kinds:
kinds={"fivetran"} for Fivetran assetskinds={"dbt"} for dbt assetskinds={"census"} for Census assetskinds={"sling"} for Sling assetskinds={"powerbi"} for PowerBI assetskinds={"looker"} for Looker assetskinds={"airbyte"} for Airbyte assetskinds={"python"} for custom Python processingkinds={"snowflake"} for Snowflake assetsYou can verify kinds are showing correctly by running:
uv run dg list defs
The "Kinds" column should show the integration type for each asset.
Example Component Structure:
from dagster import asset, Definitions, AssetExecutionContext
from pydantic import BaseModel
class MyComponentParams(BaseModel):
demo_mode: bool = False
# ... other params
class MyComponent(Component):
params_schema = MyComponentParams
def build_defs(self, context: ComponentLoadContext) -> Definitions:
params = self.params
@asset(
kinds={"fivetran"}, # ← REQUIRED: Add the integration kind
)
def raw_data(context: AssetExecutionContext):
if params.demo_mode:
# Demo implementation - local/mocked data
context.log.info("Running in demo mode with local data")
pass
else:
# Real implementation - connect to actual systems
context.log.info("Running with real data source")
pass
@asset(
deps=[raw_data],
kinds={"dbt"}, # ← REQUIRED: Add the integration kind
)
def processed_data(context: AssetExecutionContext):
if params.demo_mode:
context.log.info("Processing demo data")
pass
else:
context.log.info("Processing real data")
pass
# ... more assets
return Definitions(assets=[raw_data, processed_data, ...])
Use dg scaffold defs to create the component instance:
uv run dg scaffold defs my_module.components.ComponentName my_component
This creates a YAML file that should include the demo_mode parameter:
type: my_module.components.ComponentName
attributes:
demo_mode: true # Set to true for local demos, false for real deployments
# ... other params
If the user requested a custom scaffolder in Step 1, follow the directions here: https://docs.dagster.io/guides/build/components/creating-new-components/component-customization#customizing-scaffolding-behavior
Customize the scaffolder to provide a better developer experience for creating instances of this component.
Run these commands to ensure everything works:
# Check that definitions load without errors
uv run dg check defs
# List all assets to verify they were created
uv run dg list defs
Verify that:
demo_mode flag toggles between implementations correctlydemo_mode: false implementation uses realistic resources and is a production implementationCRITICAL: Verify Asset Key Alignment
Check that asset dependencies are correct by running:
uv run dg list defs --json | uv run python -c "
import sys, json
data = json.load(sys.stdin)
assets = data.get('assets', [])
print('Asset Dependencies:\n')
for asset in assets:
key = asset.get('key', 'unknown')
deps = asset.get('deps', [])
if deps:
print(f'{key}')
for dep in deps:
print(f' ← {dep}')
else:
print(f'{key} (no dependencies)')
print()
"
What to verify:
deps array["category", "name"])Key Principle: Asset keys should be identical between demo mode and production mode. Only the asset implementation (the function body) should differ. This ensures:
If demo mode was implemented:
demo_mode: truedg check defs to verify it works locallyThe component is complete when:
build_defs() is implemented with proper asset logickinds metadatadg check defs passes without errorsdg list defs shows all expected assetsAfter completion, inform the user:
Weekly Installs
1
Repository
GitHub Stars
38
First Seen
Today
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1
agent-browser 浏览器自动化工具 - Vercel Labs 命令行网页操作与测试
147,400 周安装
kinds argument to indicate technologies in use