npx skills add https://github.com/astronomer/agents --skill airflow-hitl使用 HITL 操作符在 Airflow DAG 中实现人工审批门控、表单输入和人工驱动的分支。这些可延迟操作符会暂停工作流执行,直到有人通过 Airflow UI 或 REST API 做出响应。
按顺序执行步骤。优先使用可延迟的 HITL 操作符,而非自定义传感器/轮询循环。
关键:需要 Airflow 3.1+ 版本。Airflow 2.x 版本不可用。
可延迟性:所有 HITL 操作符都是可延迟的——它们在等待人工输入时会释放其工作线程槽位。
UI 位置:在 Airflow UI 的 浏览 → 所需操作 中查看待处理的操作。通过 任务实例页面的所需操作选项卡 或 REST API 进行响应。
交叉参考:关于 AI/LLM 调用,请参阅 airflow-ai 技能。
| 操作符 | 人工操作 | 结果 |
|---|---|---|
ApprovalOperator | 批准或拒绝 | 拒绝会导致下游任务被跳过(审批任务本身成功) |
HITLOperator |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 选择选项 + 表单 |
| 返回所选内容 |
HITLBranchOperator | 选择下游任务 | 运行选中的任务,跳过其他任务 |
HITLEntryOperator | 提交表单 | 返回表单数据 |
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve", # Optional: auto on timeout
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()
必需参数:
subject和options。
from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
hitl = HITLOperator(
task_id="select_option",
subject="Select Payment Method",
body="Choose how to process payment",
options=["ACH", "Wire", "Check"], # REQUIRED
defaults=["ACH"],
multiple=False,
execution_timeout=timedelta(hours=4),
params={"amount": Param(1000, type="number")},
)
@task
def process(result):
print(f"Selected: {result['chosen_options']}")
print(f"Amount: {result['params_input']['amount']}")
process(hitl.output)
hitl_example()
重要:选项可以:
- 直接匹配下游任务 ID - 更简单的方法
- 使用
options_mapping- 用于映射到任务 ID 的、对人类友好的标签
from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime
DEPTS = ["marketing", "engineering", "sales"]
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
branch = HITLBranchOperator(
task_id="select_dept",
subject="Select Departments",
options=[f"Fund {d}" for d in DEPTS],
options_mapping={f"Fund {d}": d for d in DEPTS},
multiple=True,
)
for dept in DEPTS:
@task(task_id=dept)
def handle(dept_name: str = dept):
# Bind the loop variable at definition time to avoid late-binding bugs
print(f"Processing {dept_name}")
chain(branch, handle())
branch_example()
from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
entry = HITLEntryOperator(
task_id="get_input",
subject="Enter Details",
body="Provide response",
params={
"response": Param("", type="string"),
"priority": Param("p3", type="string"),
},
)
@task
def process(result):
print(f"Response: {result['params_input']['response']}")
process(entry.output)
entry_example()
from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator
class MyNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message=""): self.message = message
def notify(self, context: Context):
if context["ti"].state == "running":
url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
self.log.info(f"Action needed: {url}")
hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])
格式取决于您的认证管理器:
| 认证管理器 | 格式 | 示例 |
|---|---|---|
| SimpleAuthManager | 用户名 | ["admin", "manager"] |
| FabAuthManager | 邮箱 | ["manager@example.com"] |
| Astro | Astro ID | ["cl1a2b3cd456789ef1gh2ijkl3"] |
Astro 用户:在 组织 → 访问管理 中查找 Astro ID。
hitl = HITLOperator(..., respondents=["manager@example.com"]) # FabAuthManager
defaults:任务成功,默认选项被选中defaults:任务在超时时失败hitl = HITLOperator(
...,
options=["Option A", "Option B"],
defaults=["Option A"], # Auto-selected on timeout
execution_timeout=timedelta(hours=4),
)
body 参数支持 markdown 格式 并且是 Jinja 可模板化的:
hitl = HITLOperator(
...,
body="""**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}
| Category | Amount |
|----------|--------|
| Marketing | $1M |
""",
)
所有 HITL 操作符都支持标准的 Airflow 回调函数:
def on_hitl_failure(context):
print(f"HITL task failed: {context['task_instance'].task_id}")
def on_hitl_success(context):
print(f"HITL task succeeded with: {context['task_instance'].xcom_pull()}")
hitl = HITLOperator(
task_id="approval_required",
subject="Review needed",
options=["Approve", "Reject"],
on_failure_callback=on_hitl_failure,
on_success_callback=on_hitl_success,
)
对于外部响应者(Slack、自定义应用):
import requests, os
HOST = os.getenv("AIRFLOW_HOST")
TOKEN = os.getenv("AIRFLOW_API_TOKEN")
# Get pending actions
r = requests.get(f"{HOST}/api/v2/hitlDetails/?state=pending",
headers={"Authorization": f"Bearer {TOKEN}"})
# Respond
requests.patch(
f"{HOST}/api/v2/hitlDetails/{dag_id}/{run_id}/{task_id}",
headers={"Authorization": f"Bearer {TOKEN}"},
json={"chosen_options": ["ACH"], "params_input": {"amount": 1500}}
)
在最终确定之前,请验证:
HITLBranchOperator:选项映射到下游任务 IDdefaults 值在 options 列表中每周安装次数
297
代码仓库
GitHub 星标数
269
首次出现
2026年2月4日
安全审计
安装于
opencode233
github-copilot233
codex226
gemini-cli226
kimi-cli220
amp220
Implement human approval gates, form inputs, and human-driven branching in Airflow DAGs using the HITL operators. These deferrable operators pause workflow execution until a human responds via the Airflow UI or REST API.
Execute steps in order. Prefer deferrable HITL operators over custom sensors/polling loops.
CRITICAL : Requires Airflow 3.1+. NOT available in Airflow 2.x.
Deferrable : All HITL operators are deferrable—they release their worker slot while waiting for human input.
UI Location : View pending actions at Browse → Required Actions in Airflow UI. Respond via the task instance page's Required Actions tab or the REST API.
Cross-reference : For AI/LLM calls, see the airflow-ai skill.
| Operator | Human action | Outcome |
|---|---|---|
ApprovalOperator | Approve or Reject | Reject causes downstream tasks to be skipped (approval task itself succeeds) |
HITLOperator | Select option(s) + form | Returns selections |
HITLBranchOperator | Select downstream task(s) | Runs selected, skips others |
HITLEntryOperator | Submit form | Returns form data |
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve", # Optional: auto on timeout
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()
Required parameters :
subjectandoptions.
from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
hitl = HITLOperator(
task_id="select_option",
subject="Select Payment Method",
body="Choose how to process payment",
options=["ACH", "Wire", "Check"], # REQUIRED
defaults=["ACH"],
multiple=False,
execution_timeout=timedelta(hours=4),
params={"amount": Param(1000, type="number")},
)
@task
def process(result):
print(f"Selected: {result['chosen_options']}")
print(f"Amount: {result['params_input']['amount']}")
process(hitl.output)
hitl_example()
IMPORTANT : Options can either:
- Directly match downstream task IDs - simpler approach
- Use
options_mapping- for human-friendly labels that map to task IDs
from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime
DEPTS = ["marketing", "engineering", "sales"]
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
branch = HITLBranchOperator(
task_id="select_dept",
subject="Select Departments",
options=[f"Fund {d}" for d in DEPTS],
options_mapping={f"Fund {d}": d for d in DEPTS},
multiple=True,
)
for dept in DEPTS:
@task(task_id=dept)
def handle(dept_name: str = dept):
# Bind the loop variable at definition time to avoid late-binding bugs
print(f"Processing {dept_name}")
chain(branch, handle())
branch_example()
from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
entry = HITLEntryOperator(
task_id="get_input",
subject="Enter Details",
body="Provide response",
params={
"response": Param("", type="string"),
"priority": Param("p3", type="string"),
},
)
@task
def process(result):
print(f"Response: {result['params_input']['response']}")
process(entry.output)
entry_example()
from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator
class MyNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message=""): self.message = message
def notify(self, context: Context):
if context["ti"].state == "running":
url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
self.log.info(f"Action needed: {url}")
hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])
Format depends on your auth manager:
| Auth Manager | Format | Example |
|---|---|---|
| SimpleAuthManager | Username | ["admin", "manager"] |
| FabAuthManager | ["manager@example.com"] | |
| Astro | Astro ID | ["cl1a2b3cd456789ef1gh2ijkl3"] |
Astro Users : Find Astro ID at Organization → Access Management.
hitl = HITLOperator(..., respondents=["manager@example.com"]) # FabAuthManager
Withdefaults: Task succeeds, default option(s) selected
Withoutdefaults: Task fails on timeout
hitl = HITLOperator( ..., options=["Option A", "Option B"], defaults=["Option A"], # Auto-selected on timeout execution_timeout=timedelta(hours=4), )
The body parameter supports markdown formatting and is Jinja templatable :
hitl = HITLOperator(
...,
body="""**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}
| Category | Amount |
|----------|--------|
| Marketing | $1M |
""",
)
All HITL operators support standard Airflow callbacks:
def on_hitl_failure(context):
print(f"HITL task failed: {context['task_instance'].task_id}")
def on_hitl_success(context):
print(f"HITL task succeeded with: {context['task_instance'].xcom_pull()}")
hitl = HITLOperator(
task_id="approval_required",
subject="Review needed",
options=["Approve", "Reject"],
on_failure_callback=on_hitl_failure,
on_success_callback=on_hitl_success,
)
For external responders (Slack, custom app):
import requests, os
HOST = os.getenv("AIRFLOW_HOST")
TOKEN = os.getenv("AIRFLOW_API_TOKEN")
# Get pending actions
r = requests.get(f"{HOST}/api/v2/hitlDetails/?state=pending",
headers={"Authorization": f"Bearer {TOKEN}"})
# Respond
requests.patch(
f"{HOST}/api/v2/hitlDetails/{dag_id}/{run_id}/{task_id}",
headers={"Authorization": f"Bearer {TOKEN}"},
json={"chosen_options": ["ACH"], "params_input": {"amount": 1500}}
)
Before finalizing, verify:
HITLBranchOperator: options map to downstream task IDsdefaults values are in options listWeekly Installs
297
Repository
GitHub Stars
269
First Seen
Feb 4, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode233
github-copilot233
codex226
gemini-cli226
kimi-cli220
amp220
阿里云通义千问VL图像理解API:多模态AI模型调用与图像分析指南
297 周安装
AI智能体构建指南:工具调用、记忆与多步推理工作流设计
297 周安装
prompt-lookup AI提示词搜索与优化工具 | prompts.chat MCP服务器 | 提升AI提示词质量
297 周安装
Firebase应用托管基础教程:部署Next.js、Angular全栈Web应用
297 周安装
智能代码探索工具smart-explore:AST解析结构化代码搜索与导航
297 周安装
GrepAI Ollama 本地安装配置教程 - 私有化代码搜索嵌入模型设置指南
297 周安装