creating-openlineage-extractors by astronomer/agents
npx skills add https://github.com/astronomer/agents --skill creating-openlineage-extractors本技能指导您创建自定义 OpenLineage 提取器,以捕获那些没有内置支持的 Airflow 操作器的血缘关系。
参考: 有关最新模式和受支持的操作器/钩子列表,请参阅 OpenLineage 提供者开发者指南。
| 场景 | 方法 |
|---|---|
| 您拥有/维护的操作器 | OpenLineage 方法(推荐,最简单) |
| 您无法修改的第三方操作器 | 自定义提取器 |
| 需要列级血缘 | OpenLineage 方法或自定义提取器 |
| 复杂的提取逻辑 | OpenLineage 方法或自定义提取器 |
| 简单的表级血缘 | 入口/出口(最简单,但优先级最低) |
重要提示: 在可能的情况下,应始终优先使用 OpenLineage 方法而非自定义提取器。提取器编写难度更大,操作器行为变更后更容易出现不一致,并且更难调试。
Astro 包含内置的 OpenLineage 集成——无需额外的传输配置。血缘事件会自动收集并显示在 Astro UI 的血缘关系选项卡中。部署到 Astro 项目的自定义提取器会自动被拾取,因此您只需在 airflow.cfg 中或通过环境变量注册它们并部署即可。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
当您可以直接向自定义操作器添加方法时使用。这是您拥有的操作器的首选解决方案。
当您需要从无法修改的第三方或提供者操作器获取血缘时使用。
当您拥有操作器时,直接添加 OpenLineage 方法:
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
"""Custom operator with built-in OpenLineage support."""
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self._rows_processed = 0 # Set during execution
def execute(self, context):
# Do the actual work
self._rows_processed = self._process_data()
return self._rows_processed
def get_openlineage_facets_on_start(self):
"""Called when task starts. Return known inputs/outputs."""
# Import locally to avoid circular imports
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
)
def get_openlineage_facets_on_complete(self, task_instance):
"""Called after success. Add runtime metadata."""
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[
Dataset(
namespace="postgres://db",
name=self.target_table,
facets={
"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
rowCount=self._rows_processed
)
},
)
],
)
def get_openlineage_facets_on_failure(self, task_instance):
"""Called after failure. Optional - for partial lineage."""
return None
| 方法 | 何时调用 | 必需 |
|---|---|---|
get_openlineage_facets_on_start() | 任务进入 RUNNING 状态 | 否 |
get_openlineage_facets_on_complete(ti) | 任务成功 | 否 |
get_openlineage_facets_on_failure(ti) | 任务失败 | 否 |
只需实现您需要的方法。未实现的方法会回退到钩子级血缘或入口/出口。
仅在无法修改操作器(例如,第三方或提供者操作器)时使用此方法。
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyOperatorExtractor(BaseExtractor):
"""Extract lineage from MyCustomOperator."""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Return operator class names this extractor handles."""
return ["MyCustomOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
"""Called BEFORE operator executes. Use for known inputs/outputs."""
# Access operator properties via self.operator
source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage(
inputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{source_table}",
)
],
outputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{target_table}",
)
],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""Called AFTER operator executes. Use for runtime-determined lineage."""
# Access properties set during execution
# Useful for operators that determine outputs at runtime
return None
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
lineage = OperatorLineage(
inputs=[Dataset(namespace="...", name="...")], # Input datasets
outputs=[Dataset(namespace="...", name="...")], # Output datasets
run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")}, # Run metadata
job_facets={}, # Job metadata
)
| 方法 | 何时调用 | 用途 |
|---|---|---|
_execute_extraction() | 操作器运行前 | 静态/已知的血缘 |
extract_on_complete(task_instance) | 成功后 | 运行时确定的血缘 |
extract_on_failure(task_instance) | 失败后 | 错误时的部分血缘 |
选项 1:配置文件 (airflow.cfg)
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor
选项 2:环境变量
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'
重要提示: 路径必须可以从 Airflow worker 导入。将提取器放在您的 DAGs 文件夹或已安装的包中。
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
class MySqlOperatorExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["MySqlOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
sql = self.operator.sql
conn_id = self.operator.conn_id
# Parse SQL to find tables (simplified example)
# In practice, use a SQL parser like sqlglot
inputs, outputs = self._parse_sql(sql)
namespace = f"postgres://{conn_id}"
return OperatorLineage(
inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
job_facets={
"sql": sql_job.SQLJobFacet(query=sql)
},
)
def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
"""Parse SQL to extract table names. Use sqlglot for real parsing."""
# Simplified example - use proper SQL parser in production
inputs = []
outputs = []
# ... parsing logic ...
return inputs, outputs
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class S3ToSnowflakeExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["S3ToSnowflakeOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
s3_bucket = self.operator.s3_bucket
s3_key = self.operator.s3_key
table = self.operator.table
schema = self.operator.schema
return OperatorLineage(
inputs=[
Dataset(
namespace=f"s3://{s3_bucket}",
name=s3_key,
)
],
outputs=[
Dataset(
namespace="snowflake://myaccount.snowflakecomputing.com",
name=f"{schema}.{table}",
)
],
)
from openlineage.client.event_v2 import Dataset
class DynamicOutputExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["DynamicOutputOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
# Only inputs known before execution
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
# Outputs determined during execution
# Access via operator properties set in execute()
outputs = self.operator.created_tables # Set during execute()
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
outputs=[Dataset(namespace="...", name=t) for t in outputs],
)
问题: 在顶层导入 Airflow 模块会导致循环导入。
# ❌ 错误 - 可能导致循环导入问题
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset
class MyExtractor(BaseExtractor):
...
# ✅ 正确 - 在方法内部导入
class MyExtractor(BaseExtractor):
def _execute_extraction(self):
from openlineage.client.event_v2 import Dataset
# ...
问题: 提取器路径与实际模块位置不匹配。
# ❌ 错误 - 路径不存在
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'
# ✅ 正确 - 完整的可导入路径
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'
问题: 当操作器属性为 None 时,提取失败。
# ✅ 处理可选属性
def _execute_extraction(self) -> OperatorLineage | None:
if not self.operator.source_table:
return None # 跳过提取
return OperatorLineage(...)
import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor
def test_extractor():
# Mock the operator
operator = MagicMock()
operator.source_table = "input_table"
operator.target_table = "output_table"
# Create extractor
extractor = MyOperatorExtractor(operator)
# Test extraction
lineage = extractor._execute_extraction()
assert len(lineage.inputs) == 1
assert lineage.inputs[0].name == "input_table"
assert len(lineage.outputs) == 1
assert lineage.outputs[0].name == "output_table"
OpenLineage 按以下顺序检查血缘:
HookLineageCollector)如果存在自定义提取器,它将覆盖内置提取和入口/出口。
每周安装次数
329
代码仓库
GitHub 星标数
276
首次出现
2026年2月2日
安全审计
已安装于
opencode261
codex251
github-copilot250
cursor245
gemini-cli242
amp233
This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support.
Reference: See the OpenLineage provider developer guide for the latest patterns and list of supported operators/hooks.
| Scenario | Approach |
|---|---|
| Operator you own/maintain | OpenLineage Methods (recommended, simplest) |
| Third-party operator you can't modify | Custom Extractor |
| Need column-level lineage | OpenLineage Methods or Custom Extractor |
| Complex extraction logic | OpenLineage Methods or Custom Extractor |
| Simple table-level lineage | Inlets/Outlets (simplest, but lowest priority) |
Important: Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.
Astro includes built-in OpenLineage integration — no additional transport configuration is needed. Lineage events are automatically collected and displayed in the Astro UI's Lineage tab. Custom extractors deployed to an Astro project are automatically picked up, so you only need to register them in airflow.cfg or via environment variable and deploy.
Use when you can add methods directly to your custom operator. This is the go-to solution for operators you own.
Use when you need lineage from third-party or provider operators that you cannot modify.
When you own the operator, add OpenLineage methods directly:
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
"""Custom operator with built-in OpenLineage support."""
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self._rows_processed = 0 # Set during execution
def execute(self, context):
# Do the actual work
self._rows_processed = self._process_data()
return self._rows_processed
def get_openlineage_facets_on_start(self):
"""Called when task starts. Return known inputs/outputs."""
# Import locally to avoid circular imports
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
)
def get_openlineage_facets_on_complete(self, task_instance):
"""Called after success. Add runtime metadata."""
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[
Dataset(
namespace="postgres://db",
name=self.target_table,
facets={
"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
rowCount=self._rows_processed
)
},
)
],
)
def get_openlineage_facets_on_failure(self, task_instance):
"""Called after failure. Optional - for partial lineage."""
return None
| Method | When Called | Required |
|---|---|---|
get_openlineage_facets_on_start() | Task enters RUNNING | No |
get_openlineage_facets_on_complete(ti) | Task succeeds | No |
get_openlineage_facets_on_failure(ti) | Task fails | No |
Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.
Use this approach only when you cannot modify the operator (e.g., third-party or provider operators).
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyOperatorExtractor(BaseExtractor):
"""Extract lineage from MyCustomOperator."""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Return operator class names this extractor handles."""
return ["MyCustomOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
"""Called BEFORE operator executes. Use for known inputs/outputs."""
# Access operator properties via self.operator
source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage(
inputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{source_table}",
)
],
outputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{target_table}",
)
],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""Called AFTER operator executes. Use for runtime-determined lineage."""
# Access properties set during execution
# Useful for operators that determine outputs at runtime
return None
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
lineage = OperatorLineage(
inputs=[Dataset(namespace="...", name="...")], # Input datasets
outputs=[Dataset(namespace="...", name="...")], # Output datasets
run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")}, # Run metadata
job_facets={}, # Job metadata
)
| Method | When Called | Use For |
|---|---|---|
_execute_extraction() | Before operator runs | Static/known lineage |
extract_on_complete(task_instance) | After success | Runtime-determined lineage |
extract_on_failure(task_instance) | After failure | Partial lineage on errors |
Option 1: Configuration file (airflow.cfg)
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor
Option 2: Environment variable
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'
Important: The path must be importable from the Airflow worker. Place extractors in your DAGs folder or installed package.
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
class MySqlOperatorExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["MySqlOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
sql = self.operator.sql
conn_id = self.operator.conn_id
# Parse SQL to find tables (simplified example)
# In practice, use a SQL parser like sqlglot
inputs, outputs = self._parse_sql(sql)
namespace = f"postgres://{conn_id}"
return OperatorLineage(
inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
job_facets={
"sql": sql_job.SQLJobFacet(query=sql)
},
)
def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
"""Parse SQL to extract table names. Use sqlglot for real parsing."""
# Simplified example - use proper SQL parser in production
inputs = []
outputs = []
# ... parsing logic ...
return inputs, outputs
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class S3ToSnowflakeExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["S3ToSnowflakeOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
s3_bucket = self.operator.s3_bucket
s3_key = self.operator.s3_key
table = self.operator.table
schema = self.operator.schema
return OperatorLineage(
inputs=[
Dataset(
namespace=f"s3://{s3_bucket}",
name=s3_key,
)
],
outputs=[
Dataset(
namespace="snowflake://myaccount.snowflakecomputing.com",
name=f"{schema}.{table}",
)
],
)
from openlineage.client.event_v2 import Dataset
class DynamicOutputExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["DynamicOutputOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
# Only inputs known before execution
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
# Outputs determined during execution
# Access via operator properties set in execute()
outputs = self.operator.created_tables # Set during execute()
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
outputs=[Dataset(namespace="...", name=t) for t in outputs],
)
Problem: Importing Airflow modules at the top level causes circular imports.
# ❌ BAD - can cause circular import issues
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset
class MyExtractor(BaseExtractor):
...
# ✅ GOOD - import inside methods
class MyExtractor(BaseExtractor):
def _execute_extraction(self):
from openlineage.client.event_v2 import Dataset
# ...
Problem: Extractor path doesn't match actual module location.
# ❌ Wrong - path doesn't exist
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'
# ✅ Correct - full importable path
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'
Problem: Extraction fails when operator properties are None.
# ✅ Handle optional properties
def _execute_extraction(self) -> OperatorLineage | None:
if not self.operator.source_table:
return None # Skip extraction
return OperatorLineage(...)
import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor
def test_extractor():
# Mock the operator
operator = MagicMock()
operator.source_table = "input_table"
operator.target_table = "output_table"
# Create extractor
extractor = MyOperatorExtractor(operator)
# Test extraction
lineage = extractor._execute_extraction()
assert len(lineage.inputs) == 1
assert lineage.inputs[0].name == "input_table"
assert len(lineage.outputs) == 1
assert lineage.outputs[0].name == "output_table"
OpenLineage checks for lineage in this order:
HookLineageCollector)If a custom extractor exists, it overrides built-in extraction and inlets/outlets.
Weekly Installs
329
Repository
GitHub Stars
276
First Seen
Feb 2, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode261
codex251
github-copilot250
cursor245
gemini-cli242
amp233
Azure Data Explorer (Kusto) 查询技能:KQL数据分析、日志遥测与时间序列处理
100,500 周安装