npx skills add https://github.com/astronomer/agents --skill airflow使用 af 命令查询、管理和排查 Airflow 工作流问题。
Astro CLI 是在本地运行 Airflow 并部署到生产环境的推荐方式。它提供了一个开箱即用的容器化 Airflow 环境:
# 初始化新项目
astro dev init
# 启动本地 Airflow (Web 服务器位于 http://localhost:8080)
astro dev start
# 解析 DAG 以快速捕获错误(无需启动 Airflow)
astro dev parse
# 对 DAG 运行 pytest
astro dev pytest
# 部署到生产环境
astro deploy # 完整部署(镜像 + DAG)
astro deploy --dags # 仅 DAG 部署(快速,无需构建镜像)
更多详情:
使用 uvx 运行所有 af 命令(无需安装):
uvx --from astro-airflow-mcp af <command>
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
在本文档中,af 是 uvx --from astro-airflow-mcp af 的简写。
使用持久化配置管理多个 Airflow 实例:
# 添加新实例
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin
# 列出和切换实例
af instance list # 在表格中显示所有实例
af instance use prod # 切换到 prod 实例
af instance current # 显示当前实例
af instance delete old-instance
# 自动发现实例(先使用 --dry-run 预览)
af instance discover --dry-run # 预览所有可发现的实例
af instance discover # 从所有后端(astro, local)发现
af instance discover astro # 仅发现 Astro 部署
af instance discover astro --all-workspaces # 包含所有可访问的工作区
af instance discover local # 扫描常见的本地 Airflow 端口
af instance discover local --scan # 深度扫描所有 1024-65535 端口
# 重要:务必先使用 --dry-run 运行,并在未使用它的情况下运行 discover 之前征得用户同意。
# 非 dry-run 模式会在 Astro Cloud 中创建 API 令牌,这是一个敏感操作,需要明确批准。
# 为单个命令覆盖实例
af --instance staging dags list
配置文件:~/.af/config.yaml(可使用 --config 或 AF_CONFIG 环境变量覆盖)
配置中的令牌可以使用 ${VAR} 语法引用环境变量:
instances:
- name: prod
url: https://airflow.example.com
auth:
token: ${AIRFLOW_API_TOKEN}
或者直接使用环境变量(无需配置文件):
export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# 或用户名/密码:
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin
或者使用 CLI 标志:af --airflow-url http://localhost:8080 --token "$TOKEN" <command>
| 命令 | 描述 |
|---|---|
af health | 系统健康检查 |
af dags list | 列出所有 DAG |
af dags get <dag_id> | 获取 DAG 详情 |
af dags explore <dag_id> | 完整的 DAG 调查 |
af dags source <dag_id> | 获取 DAG 源代码 |
af dags pause <dag_id> | 暂停 DAG 调度 |
af dags unpause <dag_id> | 恢复 DAG 调度 |
af dags errors | 列出导入错误 |
af dags warnings | 列出 DAG 警告 |
af dags stats | DAG 运行统计 |
af runs list | 列出 DAG 运行 |
af runs get <dag_id> <run_id> | 获取运行详情 |
af runs trigger <dag_id> | 触发 DAG 运行 |
af runs trigger-wait <dag_id> | 触发并等待完成 |
af runs delete <dag_id> <run_id> | 永久删除 DAG 运行 |
af runs clear <dag_id> <run_id> | 清除运行以便重新执行 |
af runs diagnose <dag_id> <run_id> | 诊断失败的运行 |
af tasks list <dag_id> | 列出 DAG 中的任务 |
af tasks get <dag_id> <task_id> | 获取任务定义 |
af tasks instance <dag_id> <run_id> <task_id> | 获取任务实例 |
af tasks logs <dag_id> <run_id> <task_id> | 获取任务日志 |
af config version | Airflow 版本 |
af config show | 完整配置 |
af config connections | 列出连接 |
af config variables | 列出变量 |
af config variable <key> | 获取特定变量 |
af config pools | 列出池 |
af config pool <name> | 获取池详情 |
af config plugins | 列出插件 |
af config providers | 列出提供者 |
af config assets | 列出资产/数据集 |
af api <endpoint> | 直接 REST API 访问 |
af api ls | 列出可用的 API 端点 |
af api ls --filter X | 列出匹配模式的端点 |
af dags listaf dags explore <dag_id>af dags get <dag_id>af dags source <dag_id>af dags pause <dag_id>af dags unpause <dag_id>af dags errorsaf runs listaf runs trigger <dag_id>af runs trigger-wait <dag_id>af runs diagnose <dag_id> <run_id>af runs delete <dag_id> <run_id>af runs clear <dag_id> <run_id>af tasks list <dag_id>af tasks logs <dag_id> <run_id> <task_id>af config versionaf config connectionsaf config poolsaf healthaf api lsaf api ls --filter variableaf api xcom-entries -F dag_id=X -F task_id=Yaf api event-logs -F dag_id=Xaf api connections -X POST --body '{...}'af api variables -X POST -F key=name -f value=val如果您使用 Astro CLI,可以在没有运行 Airflow 实例的情况下验证 DAG:
# 解析 DAG 以捕获导入错误和语法问题
astro dev parse
# 运行单元测试
astro dev pytest
否则,针对正在运行的实例进行验证:
af dags errors # 检查解析/导入错误
af dags warnings # 检查弃用警告
# 1. 列出最近的运行以找到失败
af runs list --dag-id my_dag
# 2. 诊断特定的运行
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00
# 3. 获取失败任务的日志(从诊断输出中)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data
# 4. 修复后,清除运行以重试所有任务
af runs clear my_dag manual__2024-01-15T10:00:00+00:00
# 1. 整体系统健康
af health
# 2. 检查损坏的 DAG
af dags errors
# 3. 检查池利用率
af config pools
# 获取全面概览(元数据 + 任务 + 源代码)
af dags explore my_dag
# 检查是否暂停
af dags get my_dag
# 检查导入错误
af dags errors
# 检查最近的运行
af runs list --dag-id my_dag
# 选项 1:触发并等待(阻塞)
af runs trigger-wait my_dag --timeout 1800
# 选项 2:触发并稍后检查
af runs trigger my_dag
af runs get my_dag <run_id>
所有命令都输出 JSON(除了使用人类可读表格的 instance 命令):
af dags list
# {
# "total_dags": 5,
# "returned_count": 5,
# "dags": [...]
# }
使用 jq 进行过滤:
# 查找失败的运行
af runs list | jq '.dag_runs[] | select(.state == "failed")'
# 仅获取 DAG ID
af dags list | jq '.dags[].dag_id'
# 查找暂停的 DAG
af dags list | jq '[.dags[] | select(.is_paused == true)]'
# 获取特定重试尝试的日志
af tasks logs my_dag run_id task_id --try 2
# 获取映射任务索引的日志
af tasks logs my_dag run_id task_id --map-index 5
af api 直接访问 API使用 af api 访问高级命令未涵盖的端点(XCom、event-logs、回填等)。
# 发现可用端点
af api ls
af api ls --filter variable
# 基本用法
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE
字段语法:-F key=value 自动转换类型,-f key=value 保持为字符串。
完整参考:查看 api-reference.md 了解所有选项、常见端点(XCom、event-logs、回填)和示例。
| 技能 | 使用场景 |
|---|---|
| authoring-dags | 使用最佳实践创建或编辑 DAG 文件 |
| testing-dags | 迭代测试 -> 调试 -> 修复 -> 重测循环 |
| debugging-dags | 深入的根源分析和故障诊断 |
| checking-freshness | 检查数据是否最新或已过时 |
| tracing-upstream-lineage | 查找数据来源 |
| tracing-downstream-lineage | 影响分析——如果某些内容更改,什么会中断 |
| deploying-airflow | 将 DAG 部署到生产环境(Astro、Docker Compose、Kubernetes) |
| migrating-airflow-2-to-3 | 将 DAG 从 Airflow 2.x 升级到 3.x |
| managing-astro-local-env | 启动、停止或排查本地 Airflow 问题 |
| setting-up-astro-project | 初始化新的 Astro/Airflow 项目 |
每周安装次数
442
代码仓库
GitHub 星标数
269
首次出现
2026 年 1 月 23 日
安全审计
安装于
opencode324
codex319
github-copilot313
cursor307
gemini-cli299
claude-code283
Use af commands to query, manage, and troubleshoot Airflow workflows.
The Astro CLI is the recommended way to run Airflow locally and deploy to production. It provides a containerized Airflow environment that works out of the box:
# Initialize a new project
astro dev init
# Start local Airflow (webserver at http://localhost:8080)
astro dev start
# Parse DAGs to catch errors quickly (no need to start Airflow)
astro dev parse
# Run pytest against your DAGs
astro dev pytest
# Deploy to production
astro deploy # Full deploy (image + DAGs)
astro deploy --dags # DAG-only deploy (fast, no image build)
For more details:
Run all af commands using uvx (no installation required):
uvx --from astro-airflow-mcp af <command>
Throughout this document, af is shorthand for uvx --from astro-airflow-mcp af.
Manage multiple Airflow instances with persistent configuration:
# Add a new instance
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin
# List and switch instances
af instance list # Shows all instances in a table
af instance use prod # Switch to prod instance
af instance current # Show current instance
af instance delete old-instance
# Auto-discover instances (use --dry-run to preview first)
af instance discover --dry-run # Preview all discoverable instances
af instance discover # Discover from all backends (astro, local)
af instance discover astro # Discover Astro deployments only
af instance discover astro --all-workspaces # Include all accessible workspaces
af instance discover local # Scan common local Airflow ports
af instance discover local --scan # Deep scan all ports 1024-65535
# IMPORTANT: Always run with --dry-run first and ask for user consent before
# running discover without it. The non-dry-run mode creates API tokens in
# Astro Cloud, which is a sensitive action that requires explicit approval.
# Override instance for a single command
af --instance staging dags list
Config file: ~/.af/config.yaml (override with --config or AF_CONFIG env var)
Tokens in config can reference environment variables using ${VAR} syntax:
instances:
- name: prod
url: https://airflow.example.com
auth:
token: ${AIRFLOW_API_TOKEN}
Or use environment variables directly (no config file needed):
export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# Or username/password:
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin
Or CLI flags: af --airflow-url http://localhost:8080 --token "$TOKEN" <command>
| Command | Description |
|---|---|
af health | System health check |
af dags list | List all DAGs |
af dags get <dag_id> | Get DAG details |
af dags explore <dag_id> | Full DAG investigation |
af dags source <dag_id> | Get DAG source code |
af dags pause <dag_id> | Pause DAG scheduling |
af dags listaf dags explore <dag_id>af dags get <dag_id>af dags source <dag_id>af dags pause <dag_id>af dags unpause <dag_id>af dags errorsaf runs listaf runs trigger <dag_id>af runs trigger-wait <dag_id>af runs diagnose <dag_id> <run_id>af runs delete <dag_id> <run_id>af runs clear <dag_id> <run_id>af tasks list <dag_id>af tasks logs <dag_id> <run_id> <task_id>af config versionaf config connectionsaf config poolsaf healthaf api lsaf api ls --filter variableaf api xcom-entries -F dag_id=X -F task_id=Yaf api event-logs -F dag_id=Xaf api connections -X POST --body '{...}'af api variables -X POST -F key=name -f value=valIf you're using the Astro CLI, you can validate DAGs without a running Airflow instance:
# Parse DAGs to catch import errors and syntax issues
astro dev parse
# Run unit tests
astro dev pytest
Otherwise, validate against a running instance:
af dags errors # Check for parse/import errors
af dags warnings # Check for deprecation warnings
# 1. List recent runs to find failure
af runs list --dag-id my_dag
# 2. Diagnose the specific run
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00
# 3. Get logs for failed task (from diagnose output)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data
# 4. After fixing, clear the run to retry all tasks
af runs clear my_dag manual__2024-01-15T10:00:00+00:00
# 1. Overall system health
af health
# 2. Check for broken DAGs
af dags errors
# 3. Check pool utilization
af config pools
# Get comprehensive overview (metadata + tasks + source)
af dags explore my_dag
# Check if paused
af dags get my_dag
# Check for import errors
af dags errors
# Check recent runs
af runs list --dag-id my_dag
# Option 1: Trigger and wait (blocking)
af runs trigger-wait my_dag --timeout 1800
# Option 2: Trigger and check later
af runs trigger my_dag
af runs get my_dag <run_id>
All commands output JSON (except instance commands which use human-readable tables):
af dags list
# {
# "total_dags": 5,
# "returned_count": 5,
# "dags": [...]
# }
Use jq for filtering:
# Find failed runs
af runs list | jq '.dag_runs[] | select(.state == "failed")'
# Get DAG IDs only
af dags list | jq '.dags[].dag_id'
# Find paused DAGs
af dags list | jq '[.dags[] | select(.is_paused == true)]'
# Get logs for specific retry attempt
af tasks logs my_dag run_id task_id --try 2
# Get logs for mapped task index
af tasks logs my_dag run_id task_id --map-index 5
af apiUse af api for endpoints not covered by high-level commands (XCom, event-logs, backfills, etc).
# Discover available endpoints
af api ls
af api ls --filter variable
# Basic usage
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE
Field syntax : -F key=value auto-converts types, -f key=value keeps as string.
Full reference : See api-reference.md for all options, common endpoints (XCom, event-logs, backfills), and examples.
| Skill | Use when... |
|---|---|
| authoring-dags | Creating or editing DAG files with best practices |
| testing-dags | Iterative test -> debug -> fix -> retest cycles |
| debugging-dags | Deep root cause analysis and failure diagnosis |
| checking-freshness | Checking if data is up to date or stale |
| tracing-upstream-lineage | Finding where data comes from |
| tracing-downstream-lineage | Impact analysis -- what breaks if something changes |
| deploying-airflow | Deploying DAGs to production (Astro, Docker Compose, Kubernetes) |
| migrating-airflow-2-to-3 | Upgrading DAGs from Airflow 2.x to 3.x |
| managing-astro-local-env | Starting, stopping, or troubleshooting local Airflow |
Weekly Installs
442
Repository
GitHub Stars
269
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykFail
Installed on
opencode324
codex319
github-copilot313
cursor307
gemini-cli299
claude-code283
FastMCP Python框架:快速构建MCP服务器,为Claude等大模型提供工具与资源
405 周安装
Nginx配置完全指南:生产级反向代理、负载均衡、SSL终止与性能优化
406 周安装
iOS HIG 设计指南:原生应用界面开发框架与 SwiftUI/UIKit 最佳实践
406 周安装
Aceternity UI - Next.js 13+ 动画React组件库 | Tailwind CSS & Framer Motion
406 周安装
RAG工程师技能详解:检索增强生成系统架构与最佳实践
406 周安装
survey技能:跨平台问题空间扫描与调研工具 | 智能体开发研究
406 周安装
af dags unpause <dag_id> | Resume DAG scheduling |
af dags errors | List import errors |
af dags warnings | List DAG warnings |
af dags stats | DAG run statistics |
af runs list | List DAG runs |
af runs get <dag_id> <run_id> | Get run details |
af runs trigger <dag_id> | Trigger a DAG run |
af runs trigger-wait <dag_id> | Trigger and wait for completion |
af runs delete <dag_id> <run_id> | Permanently delete a DAG run |
af runs clear <dag_id> <run_id> | Clear a run for re-execution |
af runs diagnose <dag_id> <run_id> | Diagnose failed run |
af tasks list <dag_id> | List tasks in DAG |
af tasks get <dag_id> <task_id> | Get task definition |
af tasks instance <dag_id> <run_id> <task_id> | Get task instance |
af tasks logs <dag_id> <run_id> <task_id> | Get task logs |
af config version | Airflow version |
af config show | Full configuration |
af config connections | List connections |
af config variables | List variables |
af config variable <key> | Get specific variable |
af config pools | List pools |
af config pool <name> | Get pool details |
af config plugins | List plugins |
af config providers | List providers |
af config assets | List assets/datasets |
af api <endpoint> | Direct REST API access |
af api ls | List available API endpoints |
af api ls --filter X | List endpoints matching pattern |
| Initializing a new Astro/Airflow project |