migrating-airflow-2-to-3 by astronomer/agents
npx skills add https://github.com/astronomer/agents --skill migrating-airflow-2-to-3此技能帮助将 Airflow 2.x DAG 代码 迁移到 Airflow 3.x,重点关注代码变更(导入、操作符、钩子、上下文、API 使用)。
重要提示:在迁移到 Airflow 3 之前,强烈建议先升级到 Airflow 2.11,然后至少升级到 Airflow 3.0.11(理想情况下直接升级到 3.1)。其他升级路径将导致无法回滚。请参阅:https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3。此外,早期的 3.0 版本存在许多错误 - 3.1 版本提供了更好的体验。
ruff check --preview --select AIR --fix --unsafe-fixes .AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True。.airflowignore 语法从正则表达式更改为 glob 模式;如果必须保持正则表达式行为,请设置 。广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp/auth/ 前缀(例如 /auth/oauth-authorized/google)。import common 这样的裸导入(来自 dags/common/)在 Astro 上不再有效。请使用完全限定导入:import dags.common。Airflow 3 改变了组件与元数据数据库通信的方式:
触发器实现注意事项:如果触发器在 asyncio 事件循环内同步调用钩子,可能会失败或阻塞。建议通过 sync_to_async(...) 调用钩子(或以其他方式确保钩子调用是异步安全的)。
关键代码影响:任务代码仍然可以导入 ORM 会话/模型,但任何尝试使用它们与元数据数据库通信的操作都将失败,并出现以下错误:
RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x
在扫描 DAG、自定义操作符和 @task 函数时,请查找:
provide_session、create_session、@provide_sessionfrom airflow.settings import Sessionfrom airflow.settings import enginesession.query(DagModel)...、session.query(DagRun)...适用于丰富的元数据访问模式。添加到 requirements.txt:
apache-airflow-client==<your-airflow-runtime-version>
使用示例:
import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
class ListDagsOperator(BaseOperator):
def execute(self, context):
config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
with airflow_client.client.ApiClient(config) as api_client:
dag_api = DAGApi(api_client)
dags = dag_api.get_dags(limit=10)
self.log.info("Found %d DAGs", len(dags.dags))
对于简单情况,使用 requests 直接调用 REST API:
from airflow.sdk import task
import os
import requests
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
@task
def list_dags_via_api() -> None:
response = requests.get(
f"{_HOST}/api/v2/dags",
headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
params={"limit": 10}
)
response.raise_for_status()
print(response.json())
使用 Ruff 的 Airflow 规则自动检测和修复许多破坏性变更。
针对项目根目录运行的命令(通过 uv):
# 自动修复所有可检测到的 Airflow 问题(安全 + 不安全)
ruff check --preview --select AIR --fix --unsafe-fixes .
# 检查剩余的 Airflow 问题而不修复
ruff check --preview --select AIR .
有关详细的代码示例和迁移模式,请参阅:
| Airflow 2.x | Airflow 3 |
|---|---|
airflow.operators.dummy_operator.DummyOperator | airflow.providers.standard.operators.empty.EmptyOperator |
airflow.operators.bash.BashOperator | airflow.providers.standard.operators.bash.BashOperator |
airflow.operators.python.PythonOperator | airflow.providers.standard.operators.python.PythonOperator |
airflow.decorators.dag | airflow.sdk.dag |
airflow.decorators.task | airflow.sdk.task |
airflow.datasets.Dataset | airflow.sdk.Asset |
| 已移除的键 | 替代方案 |
|---|---|
execution_date | context["dag_run"].logical_date |
tomorrow_ds / yesterday_ds | 使用 ds 配合日期计算:macros.ds_add(ds, 1) / macros.ds_add(ds, -1) |
prev_ds / next_ds | prev_start_date_success 或时间表 API |
triggering_dataset_events | triggering_asset_events |
templates_dict | context["params"] |
资产触发的运行:logical_date 可能为 None;请谨慎使用 context["dag_run"].logical_date。
无法使用未来的 logical_date 触发:使用 logical_date=None 并依赖 run_id。
Cron 注意事项:对于使用 cron 的调度运行,logical_date 的语义在 CronTriggerTimetable 下有所不同(将 logical_date 与 run_after 对齐)。如果需要 Airflow 2 风格的 cron 数据间隔,请考虑设置 AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True。
| 设置 | Airflow 2 默认值 | Airflow 3 默认值 |
|---|---|---|
schedule | timedelta(days=1) | None |
catchup | True | False |
on_success_callback 不再在跳过时运行;如果需要,请使用 on_skipped_callback。TriggerRule.ALWAYS 的 @teardown;现在即使 DAG 运行提前终止,清理任务也会执行。每周安装次数
385
代码仓库
GitHub 星标数
269
首次出现
2026年1月23日
安全审计
安装于
opencode278
cursor274
codex271
github-copilot269
gemini-cli255
claude-code238
This skill helps migrate Airflow 2.x DAG code to Airflow 3.x , focusing on code changes (imports, operators, hooks, context, API usage).
Important : Before migrating to Airflow 3, strongly recommend upgrading to Airflow 2.11 first, then to at least Airflow 3.0.11 (ideally directly to 3.1). Other upgrade paths would make rollbacks impossible. See: https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. Additionally, early 3.0 versions have many bugs - 3.1 provides a much better experience.
ruff check --preview --select AIR --fix --unsafe-fixes .AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True if you need Airflow 2-style cron data intervals..airflowignore syntax changed from regexp to glob; set AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp if you must keep regexp behavior./auth/ prefix (e.g. /auth/oauth-authorized/google).import common from dags/common/ no longer work on Astro. Use fully qualified imports: import dags.common.Airflow 3 changes how components talk to the metadata database:
Trigger implementation gotcha : If a trigger calls hooks synchronously inside the asyncio event loop, it may fail or block. Prefer calling hooks via sync_to_async(...) (or otherwise ensure hook calls are async-safe).
Key code impact : Task code can still import ORM sessions/models, but any attempt to use them to talk to the metadata DB will fail with:
RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x
When scanning DAGs, custom operators, and @task functions, look for:
provide_session, create_session, @provide_sessionfrom airflow.settings import Sessionfrom airflow.settings import enginesession.query(DagModel)..., session.query(DagRun)...Preferred for rich metadata access patterns. Add to requirements.txt:
apache-airflow-client==<your-airflow-runtime-version>
Example usage:
import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
class ListDagsOperator(BaseOperator):
def execute(self, context):
config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
with airflow_client.client.ApiClient(config) as api_client:
dag_api = DAGApi(api_client)
dags = dag_api.get_dags(limit=10)
self.log.info("Found %d DAGs", len(dags.dags))
For simple cases, call the REST API directly using requests:
from airflow.sdk import task
import os
import requests
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
@task
def list_dags_via_api() -> None:
response = requests.get(
f"{_HOST}/api/v2/dags",
headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
params={"limit": 10}
)
response.raise_for_status()
print(response.json())
Use Ruff's Airflow rules to detect and fix many breaking changes automatically.
Commands to run (via uv) against the project root:
# Auto-fix all detectable Airflow issues (safe + unsafe)
ruff check --preview --select AIR --fix --unsafe-fixes .
# Check remaining Airflow issues without fixing
ruff check --preview --select AIR .
For detailed code examples and migration patterns, see:
reference/migration-patterns.md - Detailed code examples for:
reference/migration-checklist.md - Manual search checklist with:
| Airflow 2.x | Airflow 3 |
|---|---|
airflow.operators.dummy_operator.DummyOperator | airflow.providers.standard.operators.empty.EmptyOperator |
airflow.operators.bash.BashOperator | airflow.providers.standard.operators.bash.BashOperator |
airflow.operators.python.PythonOperator | airflow.providers.standard.operators.python.PythonOperator |
airflow.decorators.dag |
| Removed Key | Replacement |
|---|---|
execution_date | context["dag_run"].logical_date |
tomorrow_ds / yesterday_ds | Use ds with date math: macros.ds_add(ds, 1) / macros.ds_add(ds, -1) |
prev_ds / next_ds |
Asset-triggered runs : logical_date may be None; use context["dag_run"].logical_date defensively.
Cannot trigger with futurelogical_date: Use logical_date=None and rely on run_id instead.
Cron note: for scheduled runs using cron, logical_date semantics differ under CronTriggerTimetable (aligning logical_date with run_after). If you need Airflow 2-style cron data intervals, consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True.
| Setting | Airflow 2 Default | Airflow 3 Default |
|---|---|---|
schedule | timedelta(days=1) | None |
catchup | True | False |
on_success_callback no longer runs on skip; use on_skipped_callback if needed.@teardown with TriggerRule.ALWAYS not allowed; teardowns now execute even if DAG run terminated early.Weekly Installs
385
Repository
GitHub Stars
269
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode278
cursor274
codex271
github-copilot269
gemini-cli255
claude-code238
Defuddle 网页内容提取工具 - 一键去除广告侧边栏,输出干净 Markdown
931 周安装
前端设计技能:告别通用AI UI,打造独特、生产级、令人难忘的界面
961 周安装
Medusa 管理面板自定义开发指南:使用 Admin SDK 和 UI 组件构建扩展
990 周安装
Tailwind CSS v4 最佳实践:Vite插件、@theme指令、OKLCH颜色格式详解
446 周安装
Base44 JavaScript SDK 使用指南 - 在 Base44 平台构建应用程序
958 周安装
Vitest 测试框架:Vite 原生单元测试工具,兼容 Jest API,支持 Vue/React/Svelte
985 周安装
airflow.sdk.dag |
airflow.decorators.task | airflow.sdk.task |
airflow.datasets.Dataset | airflow.sdk.Asset |
prev_start_date_success or timetable API |
triggering_dataset_events | triggering_asset_events |
templates_dict | context["params"] |