annotating-task-lineage by astronomer/agents
npx skills add https://github.com/astronomer/agents --skill annotating-task-lineage此技能指导您如何使用 inlets 和 outlets 为 Airflow 任务添加手动血缘关系标注。
参考: 有关最新支持的运算符和模式,请参阅 OpenLineage provider developer guide。
使用 inlets 和 outlets 定义的血缘关系标注会在 Astro 增强的 Lineage 标签页 中可视化,该标签页提供跨 DAG 和跨部署的血缘关系视图。这意味着您的标注会立即在 Astro UI 中可见,为您提供整个 Astro 组织内数据流的统一视图。
| 场景 | 是否使用 Inlets/Outlets? |
|---|---|
运算符具有 OpenLineage 方法 (get_openlineage_facets_on_*) | ❌ 直接修改 OL 方法 |
| 运算符没有内置的 OpenLineage 提取器 | ✅ 是 |
| 简单的表级血缘关系已足够 | ✅ 是 |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 无需自定义代码快速设置血缘关系 | ✅ 是 |
| 需要列级血缘关系 | ❌ 使用 OpenLineage 方法或自定义提取器 |
| 需要复杂的提取逻辑 | ❌ 使用 OpenLineage 方法或自定义提取器 |
注意: Inlets/outlets 是优先级最低的备选方案。如果运算符存在 OpenLineage 提取器或方法,则优先使用它们。对于没有提取器的运算符,请使用此方法。
您可以使用 OpenLineage Dataset 对象或 Airflow Assets 作为 inlets 和 outlets:
from openlineage.client.event_v2 import Dataset
# 数据库表
source_table = Dataset(
namespace="postgres://mydb:5432",
name="public.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
# 文件
input_file = Dataset(
namespace="s3://my-bucket",
name="raw/events/2024-01-01.json",
)
from airflow.sdk import Asset
# 使用 Airflow 的原生 Asset 类型
orders_asset = Asset(uri="s3://my-bucket/data/orders")
from airflow.datasets import Dataset
# 使用 Airflow 的 Dataset 类型(Airflow 2.4-2.x)
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")
from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum
# 定义您的血缘关系数据集
source_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="raw.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
output_file = Dataset(
namespace="s3://my-bucket",
name="exports/orders.parquet",
)
with DAG(
dag_id="etl_with_lineage",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
schedule="@daily",
) as dag:
transform = BashOperator(
task_id="transform_orders",
bash_command="echo 'transforming...'",
inlets=[source_table], # 此任务读取的内容
outlets=[target_table], # 此任务写入的内容
)
export = BashOperator(
task_id="export_to_s3",
bash_command="echo 'exporting...'",
inlets=[target_table], # 从先前的输出读取
outlets=[output_file], # 写入到 S3
)
transform >> export
任务通常从多个源读取并写入多个目标:
from openlineage.client.event_v2 import Dataset
# 多个源表
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")
# 多个输出表
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")
aggregate_task = PythonOperator(
task_id="build_daily_aggregates",
python_callable=build_aggregates,
inlets=[customers, orders, products], # 所有输入
outlets=[daily_summary, customer_metrics], # 所有输出
)
构建自定义运算符时,您有两个选择:
这是首选方法,因为它让您完全控制血缘关系提取:
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# ... 执行实际工作 ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")
def get_openlineage_facets_on_complete(self, task_instance):
"""成功执行后返回血缘关系。"""
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
)
对于简单情况,在 execute 方法内设置血缘关系(仅限非可延迟运算符):
from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# 根据运算符参数动态设置血缘关系
self.inlets = [
Dataset(namespace="warehouse://db", name=self.source_table)
]
self.outlets = [
Dataset(namespace="warehouse://db", name=self.target_table)
]
# ... 执行实际工作 ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")
使用 OpenLineage dataset naming helpers 以确保跨平台命名的一致性:
from openlineage.client.event_v2 import Dataset
# Snowflake
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
naming = SnowflakeDatasetNaming(
account_identifier="myorg-myaccount",
database="mydb",
schema="myschema",
table="mytable",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"
# BigQuery
from openlineage.client.naming.bigquery import BigQueryDatasetNaming
naming = BigQueryDatasetNaming(
project="my-project",
dataset="my_dataset",
table="my_table",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "bigquery", name: "my-project.my_dataset.my_table"
# S3
from openlineage.client.naming.s3 import S3DatasetNaming
naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet")
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "s3://my-bucket", name: "path/to/file.parquet"
# PostgreSQL
from openlineage.client.naming.postgres import PostgresDatasetNaming
naming = PostgresDatasetNaming(
host="localhost",
port=5432,
database="mydb",
schema="public",
table="users",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "postgres://localhost:5432", name: "mydb.public.users"
注意: 始终使用命名辅助工具,而不是手动构建命名空间。如果您的平台缺少辅助工具,请查看 OpenLineage repo 或请求添加。
OpenLineage 使用以下优先级进行血缘关系提取:
get_openlineage_facets_on_*HookLineageCollector 从 hooks 收集的血缘关系注意: 如果提取器或方法存在但未返回数据集,OpenLineage 将检查 hook 级血缘关系,然后回退到 inlets/outlets。
始终使用 OpenLineage 命名辅助工具来一致地创建数据集:
from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
def snowflake_dataset(schema: str, table: str) -> Dataset:
"""使用命名辅助工具创建 Snowflake 数据集。"""
naming = SnowflakeDatasetNaming(
account_identifier="mycompany",
database="analytics",
schema=schema,
table=table,
)
return Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# 用法
source = snowflake_dataset("raw", "orders")
target = snowflake_dataset("staging", "orders_clean")
添加注释解释数据流:
transform = SqlOperator(
task_id="transform_orders",
sql="...",
# 血缘关系:读取原始订单,与客户表连接,写入到暂存区
inlets=[
snowflake_dataset("raw", "orders"),
snowflake_dataset("raw", "customers"),
],
outlets=[
snowflake_dataset("staging", "order_details"),
],
)
| 限制 | 解决方法 |
|---|---|
| 仅表级(无列级血缘关系) | 使用 OpenLineage 方法或自定义提取器 |
| 会被提取器/方法覆盖 | 仅用于没有提取器的运算符 |
| 在 DAG 解析时是静态的 | 在 execute() 中动态设置或使用 OL 方法 |
| 可延迟运算符会丢失动态血缘关系 | 改用 OL 方法;在 execute() 中设置的属性在延迟时会丢失 |
每周安装数
393
代码仓库
GitHub 星标数
290
首次出现
2026年2月2日
安全审计
安装于
opencode310
cursor303
github-copilot298
codex298
gemini-cli291
amp280
This skill guides you through adding manual lineage annotations to Airflow tasks using inlets and outlets.
Reference: See the OpenLineage provider developer guide for the latest supported operators and patterns.
Lineage annotations defined with inlets and outlets are visualized in Astro's enhanced Lineage tab , which provides cross-DAG and cross-deployment lineage views. This means your annotations are immediately visible in the Astro UI, giving you a unified view of data flow across your entire Astro organization.
| Scenario | Use Inlets/Outlets? |
|---|---|
Operator has OpenLineage methods (get_openlineage_facets_on_*) | ❌ Modify the OL method directly |
| Operator has no built-in OpenLineage extractor | ✅ Yes |
| Simple table-level lineage is sufficient | ✅ Yes |
| Quick lineage setup without custom code | ✅ Yes |
| Need column-level lineage | ❌ Use OpenLineage methods or custom extractor |
| Complex extraction logic needed | ❌ Use OpenLineage methods or custom extractor |
Note: Inlets/outlets are the lowest-priority fallback. If an OpenLineage extractor or method exists for the operator, it takes precedence. Use this approach for operators without extractors.
You can use OpenLineage Dataset objects or Airflow Assets for inlets and outlets:
from openlineage.client.event_v2 import Dataset
# Database tables
source_table = Dataset(
namespace="postgres://mydb:5432",
name="public.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
# Files
input_file = Dataset(
namespace="s3://my-bucket",
name="raw/events/2024-01-01.json",
)
from airflow.sdk import Asset
# Using Airflow's native Asset type
orders_asset = Asset(uri="s3://my-bucket/data/orders")
from airflow.datasets import Dataset
# Using Airflow's Dataset type (Airflow 2.4-2.x)
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")
from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum
# Define your lineage datasets
source_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="raw.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
output_file = Dataset(
namespace="s3://my-bucket",
name="exports/orders.parquet",
)
with DAG(
dag_id="etl_with_lineage",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
schedule="@daily",
) as dag:
transform = BashOperator(
task_id="transform_orders",
bash_command="echo 'transforming...'",
inlets=[source_table], # What this task reads
outlets=[target_table], # What this task writes
)
export = BashOperator(
task_id="export_to_s3",
bash_command="echo 'exporting...'",
inlets=[target_table], # Reads from previous output
outlets=[output_file], # Writes to S3
)
transform >> export
Tasks often read from multiple sources and write to multiple destinations:
from openlineage.client.event_v2 import Dataset
# Multiple source tables
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")
# Multiple output tables
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")
aggregate_task = PythonOperator(
task_id="build_daily_aggregates",
python_callable=build_aggregates,
inlets=[customers, orders, products], # All inputs
outlets=[daily_summary, customer_metrics], # All outputs
)
When building custom operators, you have two options:
This is the preferred approach as it gives you full control over lineage extraction:
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# ... perform the actual work ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")
def get_openlineage_facets_on_complete(self, task_instance):
"""Return lineage after successful execution."""
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
)
For simpler cases, set lineage within the execute method (non-deferrable operators only):
from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# Set lineage dynamically based on operator parameters
self.inlets = [
Dataset(namespace="warehouse://db", name=self.source_table)
]
self.outlets = [
Dataset(namespace="warehouse://db", name=self.target_table)
]
# ... perform the actual work ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")
Use the OpenLineage dataset naming helpers to ensure consistent naming across platforms:
from openlineage.client.event_v2 import Dataset
# Snowflake
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
naming = SnowflakeDatasetNaming(
account_identifier="myorg-myaccount",
database="mydb",
schema="myschema",
table="mytable",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"
# BigQuery
from openlineage.client.naming.bigquery import BigQueryDatasetNaming
naming = BigQueryDatasetNaming(
project="my-project",
dataset="my_dataset",
table="my_table",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "bigquery", name: "my-project.my_dataset.my_table"
# S3
from openlineage.client.naming.s3 import S3DatasetNaming
naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet")
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "s3://my-bucket", name: "path/to/file.parquet"
# PostgreSQL
from openlineage.client.naming.postgres import PostgresDatasetNaming
naming = PostgresDatasetNaming(
host="localhost",
port=5432,
database="mydb",
schema="public",
table="users",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "postgres://localhost:5432", name: "mydb.public.users"
Note: Always use the naming helpers instead of constructing namespaces manually. If a helper is missing for your platform, check the OpenLineage repo or request it.
OpenLineage uses this precedence for lineage extraction:
get_openlineage_facets_on_* in operatorHookLineageCollectorNote: If an extractor or method exists but returns no datasets, OpenLineage will check hook-level lineage, then fall back to inlets/outlets.
Always use OpenLineage naming helpers for consistent dataset creation:
from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
def snowflake_dataset(schema: str, table: str) -> Dataset:
"""Create a Snowflake Dataset using the naming helper."""
naming = SnowflakeDatasetNaming(
account_identifier="mycompany",
database="analytics",
schema=schema,
table=table,
)
return Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# Usage
source = snowflake_dataset("raw", "orders")
target = snowflake_dataset("staging", "orders_clean")
Add comments explaining the data flow:
transform = SqlOperator(
task_id="transform_orders",
sql="...",
# Lineage: Reads raw orders, joins with customers, writes to staging
inlets=[
snowflake_dataset("raw", "orders"),
snowflake_dataset("raw", "customers"),
],
outlets=[
snowflake_dataset("staging", "order_details"),
],
)
| Limitation | Workaround |
|---|---|
| Table-level only (no column lineage) | Use OpenLineage methods or custom extractor |
| Overridden by extractors/methods | Only use for operators without extractors |
| Static at DAG parse time | Set dynamically in execute() or use OL methods |
| Deferrable operators lose dynamic lineage | Use OL methods instead; attributes set in execute() are lost when deferring |
Weekly Installs
393
Repository
GitHub Stars
290
First Seen
Feb 2, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode310
cursor303
github-copilot298
codex298
gemini-cli291
amp280
Azure Data Explorer (Kusto) 查询技能:KQL数据分析、日志遥测与时间序列处理
100,500 周安装
社交媒体内容策略指南:LinkedIn、Twitter、Instagram、TikTok、Facebook平台优化与内容创作模板
303 周安装
data-extractor 数据提取技能:从PDF、Word、Excel等文档自动提取结构化数据
304 周安装
架构决策框架:需求驱动架构设计,ADR记录决策,权衡分析指南
304 周安装
使用reveal.js创建HTML幻灯片 | 交互式演示文稿制作工具 | 代码高亮与动画效果
304 周安装
移动应用发布策略指南:从ASO优化到推广渠道的完整发布计划
304 周安装
计分卡营销系统:四步法生成高转化率潜在客户,互动评估提升线索质量
304 周安装