cosmos-dbt-core by astronomer/agents
npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-core按顺序执行步骤。优先选择满足用户约束的最简单配置。
版本说明:此技能针对 Cosmos 1.11+ 和 Airflow 3.x。如果用户使用的是 Airflow 2.x,请相应调整导入(参见附录 A)。
开始前,确认:(1) dbt 引擎 = Core(不是 Fusion → 使用 cosmos-dbt-fusion),(2) 数据仓库类型,(3) Airflow 版本,(4) 执行环境(Airflow 环境 / venv / 容器),(5) DbtDag vs DbtTaskGroup vs 独立操作符,(6) manifest 可用性。
| 方法 | 何时使用 | 必需参数 |
|---|---|---|
| 项目路径 | 文件在本地可用 | dbt_project_path |
| 仅 Manifest | dbt_manifest 加载 | + |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
manifest_pathproject_namefrom cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# project_name="my_project", # if using manifest_path without dbt_project_path
# install_dbt_deps=False, # if deps precomputed in CI
)
根据约束选择一种加载模式:
| 加载模式 | 何时使用 | 必需输入 | 约束 |
|---|---|---|---|
dbt_manifest | 大型项目;容器化执行;最快 | ProjectConfig.manifest_path | 远程 manifest 需要 manifest_conn_id |
dbt_ls | 复杂选择器;需要 dbt 原生选择 | 已安装 dbt 或 dbt_executable_path | 也可用于容器化执行 |
dbt_ls_file | 无需每次解析都运行 dbt_ls 的 dbt_ls 选择 | RenderConfig.dbt_ls_path | select/exclude 将不起作用 |
automatic (默认) | 简单设置;让 Cosmos 选择 | (无) | 回退顺序:manifest → dbt_ls → custom |
关键:容器化执行 (
DOCKER/KUBERNETES/等)
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.DBT_MANIFEST, # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)
参考:每种模式的详细配置示例,请参见 reference/cosmos-config.md。
选择一种执行模式:
| 执行模式 | 何时使用 | 速度 | 必需设置 |
|---|---|---|---|
WATCHER | 最快;单个 dbt build 可见性 | 最快 | 环境中有 dbt 适配器 或 dbt_executable_path 或 dbt Fusion |
WATCHER_KUBERNETES | 最快的隔离方法;单个 dbt build 可见性 | 快 | 容器中已安装 dbt |
LOCAL + DBT_RUNNER | dbt + 适配器与 Airflow 在同一个 Python 安装中 | 快 | requirements.txt 中包含 dbt 1.5+ |
LOCAL + SUBPROCESS | dbt + 适配器在 Airflow 部署中可用,位于独立的 Python 安装中 | 中等 | dbt_executable_path |
AIRFLOW_ASYNC | BigQuery + 长时间运行的转换 | 快 | Airflow ≥2.8;提供程序依赖 |
KUBERNETES | Airflow 和 dbt 之间的隔离 | 中等 | Airflow ≥2.8;提供程序依赖 |
VIRTUALENV | 无法修改镜像;运行时 venv | 较慢 | operator_args 中的 py_requirements |
| 其他容器化方法 | 支持 Airflow 和 dbt 隔离 | 中等 | 容器配置 |
from cosmos import ExecutionConfig, ExecutionMode
_execution_config = ExecutionConfig(
execution_mode=ExecutionMode.WATCHER, # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)
参考:详细的 ProfileConfig 选项和所有 ProfileMapping 类,请参见 reference/cosmos-config.md。
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
profile_args={"schema": "my_schema"},
),
)
关键:不要硬编码密钥;使用环境变量。
from cosmos import ProfileConfig
_profile_config = ProfileConfig(
profile_name="my_profile",
target_name="dev",
profiles_yml_filepath="/path/to/profiles.yml",
)
参考:详细的测试选项,请参见 reference/cosmos-config.md。
| 测试行为 | 行为 |
|---|---|
AFTER_EACH (默认) | 每个模型后立即运行测试(默认) |
BUILD | 将运行 + 测试合并为单个 dbt build |
AFTER_ALL | 所有模型完成后运行所有测试 |
NONE | 跳过测试 |
from cosmos import RenderConfig, TestBehavior
_render_config = RenderConfig(
test_behavior=TestBehavior.AFTER_EACH,
)
参考:详细的 operator_args 选项,请参见 reference/cosmos-config.md。
_operator_args = {
# BaseOperator 参数
"retries": 3,
# Cosmos 特定参数
"install_deps": False,
"full_refresh": False,
"quiet": True,
# 运行时 dbt 变量 (XCom / 参数)
"vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
my_cosmos_dag = DbtDag(
dag_id="my_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
operator_args={},
start_date=datetime(2025, 1, 1),
schedule="@daily",
)
from airflow.sdk import dag, task # Airflow 3.x
# from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())
my_dag()
import os
from datetime import datetime
from pathlib import Path
from typing import Any
from airflow import DAG
try:
from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
from airflow.operators.python import PythonOperator
from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profiles_yml_filepath=DBT_PROFILE_PATH,
)
def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
"""检查给定 S3 存储桶中是否存在文件。"""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
print(f"Checking if file {s3_key} exists in S3 bucket...")
hook = S3Hook(aws_conn_id=aws_conn_id)
return hook.check_for_key(key=s3_key, bucket_name=bucket_name)
with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
check_file_uploaded_task = PythonOperator(
task_id="check_file_uploaded_task",
python_callable=check_s3_file,
op_kwargs={
"aws_conn_id": "aws_s3_conn",
"bucket_name": "cosmos-artifacts-upload",
"file_key": "target/run_results.json",
},
)
run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)
clone_operator = DbtCloneLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="clone",
dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
install_deps=True,
append_env=True,
)
seed_operator >> run_operator >> clone_operator
seed_operator >> check_file_uploaded_task
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain
with DbtDag(...) as dag:
@task
def upstream_task():
pass
_upstream = upstream_task()
for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
if dbt_node.resource_type == DbtResourceType.SEED:
my_dbt_task = dag.tasks_map[unique_id]
chain(_upstream, my_dbt_task)
最终确定前,验证:
| Airflow 3.x | Airflow 2.x |
|---|---|
from airflow.sdk import dag, task | from airflow.decorators import dag, task |
from airflow.sdk import chain | from airflow.models.baseoperator import chain |
Cosmos ≤1.9 (Airflow 2 数据集):
postgres://0.0.0.0:5434/postgres.public.orders
Cosmos ≥1.10 (Airflow 3 资产):
postgres://0.0.0.0:5434/postgres/public/orders
关键:升级到 Airflow 3 时更新资产 URI。
Cosmos 缓存工件以加速解析。默认启用。
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True
启用时:
from cosmos.airflow.dag import DbtDag # instead of: from cosmos import DbtDag
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage
my_dag = DbtDag(
# ...
operator_args={"callback": upload_to_cloud_storage},
)
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
"my_project": {
"dir": "s3://bucket/docs/",
"index": "index.html",
"conn_id": "aws_default",
"name": "My Project"
}
}'
每周安装次数
302
代码仓库
GitHub 星标数
269
首次出现
2026年2月4日
安全审计
安装于
github-copilot239
opencode238
gemini-cli232
codex231
cursor225
kimi-cli224
Execute steps in order. Prefer the simplest configuration that meets the user's constraints.
Version note : This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).
Reference : Latest stable: https://pypi.org/project/astronomer-cosmos/
Before starting , confirm: (1) dbt engine = Core (not Fusion → use cosmos-dbt-fusion), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup vs individual operators, (6) manifest availability.
| Approach | When to use | Required param |
|---|---|---|
| Project path | Files available locally | dbt_project_path |
| Manifest only | dbt_manifest load | manifest_path + project_name |
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# project_name="my_project", # if using manifest_path without dbt_project_path
# install_dbt_deps=False, # if deps precomputed in CI
)
Pick ONE load mode based on constraints:
| Load mode | When to use | Required inputs | Constraints |
|---|---|---|---|
dbt_manifest | Large projects; containerized execution; fastest | ProjectConfig.manifest_path | Remote manifest needs manifest_conn_id |
dbt_ls | Complex selectors; need dbt-native selection | dbt installed OR dbt_executable_path | Can also be used with containerized execution |
dbt_ls_file |
CRITICAL : Containerized execution (
DOCKER/KUBERNETES/etc.)
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.DBT_MANIFEST, # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)
Reference : See reference/cosmos-config.md for detailed configuration examples per mode.
Pick ONE execution mode:
| Execution mode | When to use | Speed | Required setup |
|---|---|---|---|
WATCHER | Fastest; single dbt build visibility | Fastest | dbt adapter in env OR dbt_executable_path or dbt Fusion |
WATCHER_KUBERNETES | Fastest isolated method; single dbt build visibility | Fast | dbt installed in container |
LOCAL + DBT_RUNNER |
from cosmos import ExecutionConfig, ExecutionMode
_execution_config = ExecutionConfig(
execution_mode=ExecutionMode.WATCHER, # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)
Reference : See reference/cosmos-config.md for detailed ProfileConfig options and all ProfileMapping classes.
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
profile_args={"schema": "my_schema"},
),
)
CRITICAL : Do not hardcode secrets; use environment variables.
from cosmos import ProfileConfig
_profile_config = ProfileConfig(
profile_name="my_profile",
target_name="dev",
profiles_yml_filepath="/path/to/profiles.yml",
)
Reference : See reference/cosmos-config.md for detailed testing options.
| TestBehavior | Behavior |
|---|---|
AFTER_EACH (default) | Tests run immediately after each model (default) |
BUILD | Combine run + test into single dbt build |
AFTER_ALL | All tests after all models complete |
NONE | Skip tests |
from cosmos import RenderConfig, TestBehavior
_render_config = RenderConfig(
test_behavior=TestBehavior.AFTER_EACH,
)
Reference : See reference/cosmos-config.md for detailed operator_args options.
_operator_args = {
# BaseOperator params
"retries": 3,
# Cosmos-specific params
"install_deps": False,
"full_refresh": False,
"quiet": True,
# Runtime dbt vars (XCom / params)
"vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
my_cosmos_dag = DbtDag(
dag_id="my_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
operator_args={},
start_date=datetime(2025, 1, 1),
schedule="@daily",
)
from airflow.sdk import dag, task # Airflow 3.x
# from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())
my_dag()
import os
from datetime import datetime
from pathlib import Path
from typing import Any
from airflow import DAG
try:
from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
from airflow.operators.python import PythonOperator
from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profiles_yml_filepath=DBT_PROFILE_PATH,
)
def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
"""Check if a file exists in the given S3 bucket."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
print(f"Checking if file {s3_key} exists in S3 bucket...")
hook = S3Hook(aws_conn_id=aws_conn_id)
return hook.check_for_key(key=s3_key, bucket_name=bucket_name)
with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
check_file_uploaded_task = PythonOperator(
task_id="check_file_uploaded_task",
python_callable=check_s3_file,
op_kwargs={
"aws_conn_id": "aws_s3_conn",
"bucket_name": "cosmos-artifacts-upload",
"file_key": "target/run_results.json",
},
)
run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)
clone_operator = DbtCloneLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="clone",
dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
install_deps=True,
append_env=True,
)
seed_operator >> run_operator >> clone_operator
seed_operator >> check_file_uploaded_task
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain
with DbtDag(...) as dag:
@task
def upstream_task():
pass
_upstream = upstream_task()
for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
if dbt_node.resource_type == DbtResourceType.SEED:
my_dbt_task = dag.tasks_map[unique_id]
chain(_upstream, my_dbt_task)
Before finalizing, verify:
| Airflow 3.x | Airflow 2.x |
|---|---|
from airflow.sdk import dag, task | from airflow.decorators import dag, task |
from airflow.sdk import chain | from airflow.models.baseoperator import chain |
Cosmos ≤1.9 (Airflow 2 Datasets):
postgres://0.0.0.0:5434/postgres.public.orders
Cosmos ≥1.10 (Airflow 3 Assets):
postgres://0.0.0.0:5434/postgres/public/orders
CRITICAL : Update asset URIs when upgrading to Airflow 3.
Cosmos caches artifacts to speed up parsing. Enabled by default.
Reference: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True
When enabled:
from cosmos.airflow.dag import DbtDag # instead of: from cosmos import DbtDag
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage
my_dag = DbtDag(
# ...
operator_args={"callback": upload_to_cloud_storage},
)
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
"my_project": {
"dir": "s3://bucket/docs/",
"index": "index.html",
"conn_id": "aws_default",
"name": "My Project"
}
}'
Reference: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html
Weekly Installs
302
Repository
GitHub Stars
269
First Seen
Feb 4, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
github-copilot239
opencode238
gemini-cli232
codex231
cursor225
kimi-cli224
Python类型注解模式指南:现代类型提示与Typing最佳实践
24 周安装
Web应用安全模式指南:OWASP Top 10防护、输入验证、身份认证与授权最佳实践
25 周安装
task-runner任务运行器:使用just简化项目命令执行,替代make的跨平台工具
30 周安装
EdgeOne Pages 一键部署:无需账户,秒级将HTML文件发布到公共URL
35 周安装
Vibe Security 安全扫描器 - 多语言代码漏洞检测与AI智能修复工具
38 周安装
wechat-publisher:一键发布Markdown文章到微信公众号草稿箱工具
323 周安装
| dbt_ls selection without running dbt_ls every parse |
RenderConfig.dbt_ls_path |
select/exclude won't work |
automatic (default) | Simple setups; let Cosmos pick | (none) | Falls back: manifest → dbt_ls → custom |
| dbt + adapter in the same Python installation as Airflow |
| Fast |
dbt 1.5+ in requirements.txt |
LOCAL + SUBPROCESS | dbt + adapter available in the Airflow deployment, in an isolated Python installation | Medium | dbt_executable_path |
AIRFLOW_ASYNC | BigQuery + long-running transforms | Fast | Airflow ≥2.8; provider deps |
KUBERNETES | Isolation between Airflow and dbt | Medium | Airflow ≥2.8; provider deps |
VIRTUALENV | Can't modify image; runtime venv | Slower | py_requirements in operator_args |
| Other containerized approaches | Support Airflow and dbt isolation | Medium | container config |