flowstudio-power-automate-mcp by github/awesome-copilot
npx skills add https://github.com/github/awesome-copilot --skill flowstudio-power-automate-mcp此技能让 AI 代理能够通过 FlowStudio MCP 服务器 以编程方式读取、监控和操作 Microsoft Power Automate 云流——无需浏览器、无需 UI、无需手动步骤。
要求: 需要 FlowStudio MCP 订阅(或兼容的 Power Automate MCP 服务器)。您将需要:
- MCP 端点:
https://mcp.flowstudio.app/mcp(所有订阅者相同)- API 密钥 / JWT 令牌(
x-api-key请求头——不是 Bearer)- Power Platform 环境名称(例如
Default-<tenant-guid>)
| 优先级 | 来源 | 涵盖内容 |
|---|---|---|
| 1 | 真实 API 响应 | 始终信任服务器实际返回的内容 |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 2 |
tools/list |
| 工具名称、参数名称、类型、必需标志 |
| 3 | SKILL 文档和参考文件 | 响应结构、行为说明、工作流示例 |
每个新会话都从
tools/list开始。 它返回每个工具权威的、最新的模式——参数名称、类型和必需标志。SKILL 文档涵盖了tools/list无法告诉你的内容:响应结构、非显而易见的行为以及端到端的工作流模式。如果任何文档与
tools/list或真实 API 响应不一致,以 API 为准。
此技能及配套的构建/调试技能中的所有示例均使用 Python 的 urllib.request(标准库——无需 pip install)。Node.js 同样是有效的选择:fetch 从 Node 18+ 开始内置,JSON 处理是原生的,并且 async/await 模型能清晰地映射到 MCP 工具调用的请求-响应模式——使其非常适合已经在使用 JavaScript/TypeScript 技术栈的团队。
| 语言 | 结论 | 说明 |
|---|---|---|
| Python | ✅ 推荐 | 简洁的 JSON 处理,无转义问题,所有技能示例都使用它 |
| Node.js (≥ 18) | ✅ 推荐 | 原生 fetch + JSON.stringify/JSON.parse;async/await 非常适合 MCP 调用模式;无需额外包 |
| PowerShell | ⚠️ 避免用于流操作 | ConvertTo-Json -Depth 会静默截断嵌套定义;引号和转义会破坏复杂负载。适用于快速的 tools/list 发现调用,但不适用于构建或更新流。 |
| cURL / Bash | ⚠️ 可能但脆弱 | Shell 转义嵌套 JSON 容易出错;没有原生 JSON 解析器 |
太长不看版——使用下面的核心 MCP 助手(Python 或 Node.js)。 两者都在单个可重用函数中处理 JSON-RPC 封装、身份验证和响应解析。
FlowStudio MCP 有两个访问层级。FlowStudio for Teams 订阅者可以同时访问快速的 Azure 表存储(缓存快照数据 + 治理元数据)和完整的实时 Power Automate API。仅 MCP 订阅者 可以使用实时工具——足以构建、调试和操作流。
| 工具 | 功能 |
|---|---|
list_live_flows | 直接从 PA API 列出环境中的流(始终最新) |
list_live_environments | 列出服务账户可见的所有 Power Platform 环境 |
list_live_connections | 从 PA API 列出环境中的所有连接 |
get_live_flow | 获取完整的流定义(触发器、操作、参数) |
get_live_flow_http_schema | 检查 HTTP 触发流的 JSON 请求体模式和响应模式 |
get_live_flow_trigger_url | 获取 HTTP 触发流的当前签名回调 URL |
trigger_live_flow | 向 HTTP 触发流的回调 URL 发送 POST 请求(AAD 身份验证自动处理) |
update_live_flow | 在一次调用中创建新流或修补现有定义 |
add_live_flow_to_solution | 将非解决方案流迁移到解决方案中 |
get_live_flow_runs | 列出最近的运行历史记录,包含状态、开始/结束时间和错误 |
get_live_flow_run_error | 获取失败运行的结构化错误详情(按操作) |
get_live_flow_run_action_outputs | 检查运行中任何操作(或每个 foreach 迭代)的输入/输出 |
resubmit_live_flow_run | 使用原始触发器负载重新运行失败或取消的运行 |
cancel_live_flow_run | 取消当前正在运行的流执行 |
这些工具从(并向)FlowStudio Azure 表读取(和写入)数据——这是您租户流的受监控快照,并附带了治理元数据和运行统计信息。
| 工具 | 功能 |
|---|---|
list_store_flows | 从缓存中搜索流,包含治理标志、运行失败率和所有者元数据 |
get_store_flow | 获取单个流的完整缓存详情,包括运行统计信息和治理字段 |
get_store_flow_trigger_url | 从缓存获取触发器 URL(即时,无需调用 PA API) |
get_store_flow_runs | 最近 N 天的缓存运行历史记录,包含持续时间和修复提示 |
get_store_flow_errors | 仅包含失败运行的缓存,包含失败操作名称和修复提示 |
get_store_flow_summary | 聚合统计信息:成功率、失败次数、平均/最大持续时间 |
set_store_flow_state | 通过 PA API 启动或停止流,并将结果同步回存储 |
update_store_flow | 更新治理元数据(描述、标签、监控标志、通知规则、业务影响) |
list_store_environments | 从缓存列出所有环境 |
list_store_makers | 从缓存列出所有制作者(公民开发者) |
get_store_maker | 获取制作者的流/应用数量和账户状态 |
list_store_power_apps | 从缓存列出所有 Power Apps 画布应用 |
list_store_connections | 从缓存列出所有 Power Platform 连接 |
| 任务 | 工具 | 说明 |
|---|---|---|
| 列出流 | list_live_flows | 始终最新——直接调用 PA API |
| 读取定义 | get_live_flow | 始终实时获取——非缓存 |
| 调试失败 | get_live_flow_runs → get_live_flow_run_error | 使用实时运行数据 |
⚠️
list_live_flows返回一个包装对象,其中包含一个flows数组——通过result["flows"]访问。
存储工具(
list_store_flows、get_store_flow等)仅对 FlowStudio for Teams 订阅者可用,并提供缓存的治理元数据。如有疑问,请使用实时工具——它们适用于所有订阅层级。
始终从调用 tools/list 开始,以确认服务器可达,并查看具体有哪些工具名称可用(名称可能因服务器版本而异):
import json, urllib.request
TOKEN = "<YOUR_JWT_TOKEN>"
MCP = "https://mcp.flowstudio.app/mcp"
def mcp_raw(method, params=None, cid=1):
payload = {"jsonrpc": "2.0", "method": method, "id": cid}
if params:
payload["params"] = params
req = urllib.request.Request(MCP, data=json.dumps(payload).encode(),
headers={"x-api-key": TOKEN, "Content-Type": "application/json",
"User-Agent": "FlowStudio-MCP/1.0"})
try:
resp = urllib.request.urlopen(req, timeout=30)
except urllib.error.HTTPError as e:
raise RuntimeError(f"MCP HTTP {e.code} — check token and endpoint") from e
return json.loads(resp.read())
raw = mcp_raw("tools/list")
if "error" in raw:
print("ERROR:", raw["error"]); raise SystemExit(1)
for t in raw["result"]["tools"]:
print(t["name"], "—", t["description"][:60])
在所有后续操作中使用此助手:
import json, urllib.request
TOKEN = "<YOUR_JWT_TOKEN>"
MCP = "https://mcp.flowstudio.app/mcp"
def mcp(tool, args, cid=1):
payload = {"jsonrpc": "2.0", "method": "tools/call", "id": cid,
"params": {"name": tool, "arguments": args}}
req = urllib.request.Request(MCP, data=json.dumps(payload).encode(),
headers={"x-api-key": TOKEN, "Content-Type": "application/json",
"User-Agent": "FlowStudio-MCP/1.0"})
try:
resp = urllib.request.urlopen(req, timeout=120)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"MCP HTTP {e.code}: {body[:200]}") from e
raw = json.loads(resp.read())
if "error" in raw:
raise RuntimeError(f"MCP error: {json.dumps(raw['error'])}")
text = raw["result"]["content"][0]["text"]
return json.loads(text)
常见身份验证错误:
- HTTP 401/403 → 令牌缺失、过期或格式错误。从 mcp.flowstudio.app 获取新的 JWT。
- HTTP 400 → JSON-RPC 负载格式错误。检查
Content-Type: application/json和请求体结构。MCP error: {"code": -32602, ...}→ 工具参数错误或缺失。
适用于 Node.js 18+ 的等效助手(内置 fetch——无需额外包):
const TOKEN = "<YOUR_JWT_TOKEN>";
const MCP = "https://mcp.flowstudio.app/mcp";
async function mcp(tool, args, cid = 1) {
const payload = {
jsonrpc: "2.0",
method: "tools/call",
id: cid,
params: { name: tool, arguments: args },
};
const res = await fetch(MCP, {
method: "POST",
headers: {
"x-api-key": TOKEN,
"Content-Type": "application/json",
"User-Agent": "FlowStudio-MCP/1.0",
},
body: JSON.stringify(payload),
});
if (!res.ok) {
const body = await res.text();
throw new Error(`MCP HTTP ${res.status}: ${body.slice(0, 200)}`);
}
const raw = await res.json();
if (raw.error) throw new Error(`MCP error: ${JSON.stringify(raw.error)}`);
return JSON.parse(raw.result.content[0].text);
}
需要 Node.js 18+。对于旧版 Node,请使用标准库中的
https.request替换fetch,或安装node-fetch。
ENV = "Default-<tenant-guid>"
result = mcp("list_live_flows", {"environmentName": ENV})
# 返回包装对象:
# {"mode": "owner", "flows": [{"id": "0757041a-...", "displayName": "My Flow",
# "state": "Started", "triggerType": "Request", ...}], "totalCount": 42, "error": null}
for f in result["flows"]:
FLOW_ID = f["id"] # 纯 UUID —— 直接用作 flowName
print(FLOW_ID, "|", f["displayName"], "|", f["state"])
FLOW = "<flow-uuid>"
flow = mcp("get_live_flow", {"environmentName": ENV, "flowName": FLOW})
# 显示名称和状态
print(flow["properties"]["displayName"])
print(flow["properties"]["state"])
# 列出所有操作名称
actions = flow["properties"]["definition"]["actions"]
print("Actions:", list(actions.keys()))
# 检查一个操作的表达式
print(actions["Compose_Filter"]["inputs"])
# 最近的运行(最新的在前)
runs = mcp("get_live_flow_runs", {"environmentName": ENV, "flowName": FLOW, "top": 5})
# 返回直接数组:
# [{"name": "08584296068667933411438594643CU15",
# "status": "Failed",
# "startTime": "2026-02-25T06:13:38.6910688Z",
# "endTime": "2026-02-25T06:15:24.1995008Z",
# "triggerName": "manual",
# "error": {"code": "ActionFailed", "message": "An action failed..."}},
# {"name": "08584296028664130474944675379CU26",
# "status": "Succeeded", "error": null, ...}]
for r in runs:
print(r["name"], r["status"])
# 获取第一个失败运行的名称
run_id = next((r["name"] for r in runs if r["status"] == "Failed"), None)
run_id = runs[0]["name"]
out = mcp("get_live_flow_run_action_outputs", {
"environmentName": ENV,
"flowName": FLOW,
"runName": run_id,
"actionName": "Get_Customer_Record" # 定义中的确切操作名称
})
print(json.dumps(out, indent=2))
err = mcp("get_live_flow_run_error", {
"environmentName": ENV,
"flowName": FLOW,
"runName": run_id
})
# 返回:
# {"runName": "08584296068...",
# "failedActions": [
# {"actionName": "HTTP_find_AD_User_by_Name", "status": "Failed",
# "code": "NotSpecified", "startTime": "...", "endTime": "..."},
# {"actionName": "Scope_prepare_workers", "status": "Failed",
# "error": {"code": "ActionFailed", "message": "An action failed..."}}
# ],
# "allActions": [
# {"actionName": "Apply_to_each", "status": "Skipped"},
# {"actionName": "Compose_WeekEnd", "status": "Succeeded"},
# ...
# ]}
# 根本原因通常是 failedActions 中最深的条目:
root = err["failedActions"][-1]
print(f"Root failure: {root['actionName']} → {root['code']}")
result = mcp("resubmit_live_flow_run", {
"environmentName": ENV,
"flowName": FLOW,
"runName": run_id
})
print(result) # {"resubmitted": true, "triggerName": "..."}
mcp("cancel_live_flow_run", {
"environmentName": ENV,
"flowName": FLOW,
"runName": run_id
})
⚠️ 不要取消显示为
Running的运行,因为它正在等待自适应卡片的响应。 该状态是正常的——流程已暂停,等待用户在 Teams 中响应。取消它将丢弃待处理的卡片。
# ── 1. 找到流 ─────────────────────────────────────────────────────
result = mcp("list_live_flows", {"environmentName": ENV})
target = next(f for f in result["flows"] if "My Flow Name" in f["displayName"])
FLOW_ID = target["id"]
# ── 2. 获取最近的失败运行 ────────────────────────────────────
runs = mcp("get_live_flow_runs", {"environmentName": ENV, "flowName": FLOW_ID, "top": 5})
# [{"name": "08584296068...", "status": "Failed", ...}, ...]
RUN_ID = next(r["name"] for r in runs if r["status"] == "Failed")
# ── 3. 获取按操作划分的故障明细 ──────────────────────────────────
err = mcp("get_live_flow_run_error", {"environmentName": ENV, "flowName": FLOW_ID, "runName": RUN_ID})
# {"failedActions": [{"actionName": "HTTP_find_AD_User_by_Name", "code": "NotSpecified",...}], ...}
root_action = err["failedActions"][-1]["actionName"]
print(f"Root failure: {root_action}")
# ── 4. 读取定义并检查失败操作的表达式 ───
defn = mcp("get_live_flow", {"environmentName": ENV, "flowName": FLOW_ID})
acts = defn["properties"]["definition"]["actions"]
print("Failing action inputs:", acts[root_action]["inputs"])
# ── 5. 检查前一个操作的输出以查找 null 值 ────────────────
out = mcp("get_live_flow_run_action_outputs", {
"environmentName": ENV, "flowName": FLOW_ID,
"runName": RUN_ID, "actionName": "Compose_Names"
})
nulls = [x for x in out.get("body", []) if x.get("Name") is None]
print(f"{len(nulls)} records with null Name")
# ── 6. 应用修复 ─────────────────────────────────────────────────────
acts[root_action]["inputs"]["parameters"]["searchName"] = \
"@coalesce(item()?['Name'], '')"
conn_refs = defn["properties"]["connectionReferences"]
result = mcp("update_live_flow", {
"environmentName": ENV, "flowName": FLOW_ID,
"definition": defn["properties"]["definition"],
"connectionReferences": conn_refs
})
assert result.get("error") is None, f"Deploy failed: {result['error']}"
# ⚠️ error 键始终存在 —— 仅当它不为 None 时才表示失败
# ── 7. 重新提交并验证 ───────────────────────────────────────────────
mcp("resubmit_live_flow_run", {"environmentName": ENV, "flowName": FLOW_ID, "runName": RUN_ID})
import time; time.sleep(30)
new_runs = mcp("get_live_flow_runs", {"environmentName": ENV, "flowName": FLOW_ID, "top": 1})
print(new_runs[0]["status"]) # Succeeded = 完成
| 字段 | 值 |
|---|---|
| 身份验证请求头 | x-api-key: <JWT> —— 不是 Authorization: Bearer |
| 令牌格式 | 纯 JWT —— 不要剥离、更改或添加前缀 |
| 超时 | 对于 get_live_flow_run_action_outputs 使用 ≥ 120 秒(输出可能很大) |
| 环境名称 | Default-<tenant-guid>(通过 list_live_environments 或 list_live_flows 响应查找) |
tools/list 中)要端到端诊断失败的流 → 加载 power-automate-debug 技能。
要构建和部署新流 → 加载 power-automate-build 技能。
每周安装量
2.8K
代码仓库
GitHub 星标数
27.0K
首次出现
2026年3月6日
安全审计
安装于
codex2.8K
gemini-cli2.8K
opencode2.8K
cursor2.7K
github-copilot2.7K
kimi-cli2.7K
This skill lets AI agents read, monitor, and operate Microsoft Power Automate cloud flows programmatically through a FlowStudio MCP server — no browser, no UI, no manual steps.
Requires: A FlowStudio MCP subscription (or compatible Power Automate MCP server). You will need:
- MCP endpoint:
https://mcp.flowstudio.app/mcp(same for all subscribers)- API key / JWT token (
x-api-keyheader — NOT Bearer)- Power Platform environment name (e.g.
Default-<tenant-guid>)
| Priority | Source | Covers |
|---|---|---|
| 1 | Real API response | Always trust what the server actually returns |
| 2 | tools/list | Tool names, parameter names, types, required flags |
| 3 | SKILL docs & reference files | Response shapes, behavioral notes, workflow recipes |
Start every new session with
tools/list. It returns the authoritative, up-to-date schema for every tool — parameter names, types, and required flags. The SKILL docs cover whattools/listcannot tell you: response shapes, non-obvious behaviors, and end-to-end workflow patterns.If any documentation disagrees with
tools/listor a real API response, the API wins.
All examples in this skill and the companion build / debug skills use Python withurllib.request (stdlib — no pip install needed). Node.js is an equally valid choice: fetch is built-in from Node 18+, JSON handling is native, and the async/await model maps cleanly onto the request-response pattern of MCP tool calls — making it a natural fit for teams already working in a JavaScript/TypeScript stack.
| Language | Verdict | Notes |
|---|---|---|
| Python | ✅ Recommended | Clean JSON handling, no escaping issues, all skill examples use it |
| Node.js (≥ 18) | ✅ Recommended | Native fetch + JSON.stringify/JSON.parse; async/await fits MCP call patterns well; no extra packages needed |
| PowerShell | ⚠️ Avoid for flow operations | ConvertTo-Json -Depth silently truncates nested definitions; quoting and escaping break complex payloads. Acceptable for a quick tools/list discovery call but not for building or updating flows. |
| cURL / Bash |
TL;DR — use the Core MCP Helper (Python or Node.js) below. Both handle JSON-RPC framing, auth, and response parsing in a single reusable function.
FlowStudio MCP has two access tiers. FlowStudio for Teams subscribers get both the fast Azure-table store (cached snapshot data + governance metadata) and full live Power Automate API access. MCP-only subscribers get the live tools — more than enough to build, debug, and operate flows.
| Tool | What it does |
|---|---|
list_live_flows | List flows in an environment directly from the PA API (always current) |
list_live_environments | List all Power Platform environments visible to the service account |
list_live_connections | List all connections in an environment from the PA API |
get_live_flow | Fetch the complete flow definition (triggers, actions, parameters) |
get_live_flow_http_schema | Inspect the JSON body schema and response schemas of an HTTP-triggered flow |
get_live_flow_trigger_url |
These tools read from (and write to) the FlowStudio Azure table — a monitored snapshot of your tenant's flows enriched with governance metadata and run statistics.
| Tool | What it does |
|---|---|
list_store_flows | Search flows from the cache with governance flags, run failure rates, and owner metadata |
get_store_flow | Get full cached details for a single flow including run stats and governance fields |
get_store_flow_trigger_url | Get the trigger URL from the cache (instant, no PA API call) |
get_store_flow_runs | Cached run history for the last N days with duration and remediation hints |
get_store_flow_errors | Cached failed-only runs with failed action names and remediation hints |
get_store_flow_summary |
| Task | Tool | Notes |
|---|---|---|
| List flows | list_live_flows | Always current — calls PA API directly |
| Read a definition | get_live_flow | Always fetched live — not cached |
| Debug a failure | get_live_flow_runs → get_live_flow_run_error | Use live run data |
⚠️
list_live_flowsreturns a wrapper object with aflowsarray — access viaresult["flows"].
Store tools (
list_store_flows,get_store_flow, etc.) are available to FlowStudio for Teams subscribers and provide cached governance metadata. Use live tools when in doubt — they work for all subscription tiers.
Always start by calling tools/list to confirm the server is reachable and see exactly which tool names are available (names may vary by server version):
import json, urllib.request
TOKEN = "<YOUR_JWT_TOKEN>"
MCP = "https://mcp.flowstudio.app/mcp"
def mcp_raw(method, params=None, cid=1):
payload = {"jsonrpc": "2.0", "method": method, "id": cid}
if params:
payload["params"] = params
req = urllib.request.Request(MCP, data=json.dumps(payload).encode(),
headers={"x-api-key": TOKEN, "Content-Type": "application/json",
"User-Agent": "FlowStudio-MCP/1.0"})
try:
resp = urllib.request.urlopen(req, timeout=30)
except urllib.error.HTTPError as e:
raise RuntimeError(f"MCP HTTP {e.code} — check token and endpoint") from e
return json.loads(resp.read())
raw = mcp_raw("tools/list")
if "error" in raw:
print("ERROR:", raw["error"]); raise SystemExit(1)
for t in raw["result"]["tools"]:
print(t["name"], "—", t["description"][:60])
Use this helper throughout all subsequent operations:
import json, urllib.request
TOKEN = "<YOUR_JWT_TOKEN>"
MCP = "https://mcp.flowstudio.app/mcp"
def mcp(tool, args, cid=1):
payload = {"jsonrpc": "2.0", "method": "tools/call", "id": cid,
"params": {"name": tool, "arguments": args}}
req = urllib.request.Request(MCP, data=json.dumps(payload).encode(),
headers={"x-api-key": TOKEN, "Content-Type": "application/json",
"User-Agent": "FlowStudio-MCP/1.0"})
try:
resp = urllib.request.urlopen(req, timeout=120)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"MCP HTTP {e.code}: {body[:200]}") from e
raw = json.loads(resp.read())
if "error" in raw:
raise RuntimeError(f"MCP error: {json.dumps(raw['error'])}")
text = raw["result"]["content"][0]["text"]
return json.loads(text)
Common auth errors:
- HTTP 401/403 → token is missing, expired, or malformed. Get a fresh JWT from mcp.flowstudio.app.
- HTTP 400 → malformed JSON-RPC payload. Check
Content-Type: application/jsonand body structure.MCP error: {"code": -32602, ...}→ wrong or missing tool arguments.
Equivalent helper for Node.js 18+ (built-in fetch — no packages required):
const TOKEN = "<YOUR_JWT_TOKEN>";
const MCP = "https://mcp.flowstudio.app/mcp";
async function mcp(tool, args, cid = 1) {
const payload = {
jsonrpc: "2.0",
method: "tools/call",
id: cid,
params: { name: tool, arguments: args },
};
const res = await fetch(MCP, {
method: "POST",
headers: {
"x-api-key": TOKEN,
"Content-Type": "application/json",
"User-Agent": "FlowStudio-MCP/1.0",
},
body: JSON.stringify(payload),
});
if (!res.ok) {
const body = await res.text();
throw new Error(`MCP HTTP ${res.status}: ${body.slice(0, 200)}`);
}
const raw = await res.json();
if (raw.error) throw new Error(`MCP error: ${JSON.stringify(raw.error)}`);
return JSON.parse(raw.result.content[0].text);
}
Requires Node.js 18+. For older Node, replace
fetchwithhttps.requestfrom the stdlib or installnode-fetch.
ENV = "Default-<tenant-guid>"
result = mcp("list_live_flows", {"environmentName": ENV})
# Returns wrapper object:
# {"mode": "owner", "flows": [{"id": "0757041a-...", "displayName": "My Flow",
# "state": "Started", "triggerType": "Request", ...}], "totalCount": 42, "error": null}
for f in result["flows"]:
FLOW_ID = f["id"] # plain UUID — use directly as flowName
print(FLOW_ID, "|", f["displayName"], "|", f["state"])
FLOW = "<flow-uuid>"
flow = mcp("get_live_flow", {"environmentName": ENV, "flowName": FLOW})
# Display name and state
print(flow["properties"]["displayName"])
print(flow["properties"]["state"])
# List all action names
actions = flow["properties"]["definition"]["actions"]
print("Actions:", list(actions.keys()))
# Inspect one action's expression
print(actions["Compose_Filter"]["inputs"])
# Most recent runs (newest first)
runs = mcp("get_live_flow_runs", {"environmentName": ENV, "flowName": FLOW, "top": 5})
# Returns direct array:
# [{"name": "08584296068667933411438594643CU15",
# "status": "Failed",
# "startTime": "2026-02-25T06:13:38.6910688Z",
# "endTime": "2026-02-25T06:15:24.1995008Z",
# "triggerName": "manual",
# "error": {"code": "ActionFailed", "message": "An action failed..."}},
# {"name": "08584296028664130474944675379CU26",
# "status": "Succeeded", "error": null, ...}]
for r in runs:
print(r["name"], r["status"])
# Get the name of the first failed run
run_id = next((r["name"] for r in runs if r["status"] == "Failed"), None)
run_id = runs[0]["name"]
out = mcp("get_live_flow_run_action_outputs", {
"environmentName": ENV,
"flowName": FLOW,
"runName": run_id,
"actionName": "Get_Customer_Record" # exact action name from the definition
})
print(json.dumps(out, indent=2))
err = mcp("get_live_flow_run_error", {
"environmentName": ENV,
"flowName": FLOW,
"runName": run_id
})
# Returns:
# {"runName": "08584296068...",
# "failedActions": [
# {"actionName": "HTTP_find_AD_User_by_Name", "status": "Failed",
# "code": "NotSpecified", "startTime": "...", "endTime": "..."},
# {"actionName": "Scope_prepare_workers", "status": "Failed",
# "error": {"code": "ActionFailed", "message": "An action failed..."}}
# ],
# "allActions": [
# {"actionName": "Apply_to_each", "status": "Skipped"},
# {"actionName": "Compose_WeekEnd", "status": "Succeeded"},
# ...
# ]}
# The ROOT cause is usually the deepest entry in failedActions:
root = err["failedActions"][-1]
print(f"Root failure: {root['actionName']} → {root['code']}")
result = mcp("resubmit_live_flow_run", {
"environmentName": ENV,
"flowName": FLOW,
"runName": run_id
})
print(result) # {"resubmitted": true, "triggerName": "..."}
mcp("cancel_live_flow_run", {
"environmentName": ENV,
"flowName": FLOW,
"runName": run_id
})
⚠️ Do NOT cancel a run that shows
Runningbecause it is waiting for an adaptive card response. That status is normal — the flow is paused waiting for a human to respond in Teams. Cancelling it will discard the pending card.
# ── 1. Find the flow ─────────────────────────────────────────────────────
result = mcp("list_live_flows", {"environmentName": ENV})
target = next(f for f in result["flows"] if "My Flow Name" in f["displayName"])
FLOW_ID = target["id"]
# ── 2. Get the most recent failed run ────────────────────────────────────
runs = mcp("get_live_flow_runs", {"environmentName": ENV, "flowName": FLOW_ID, "top": 5})
# [{"name": "08584296068...", "status": "Failed", ...}, ...]
RUN_ID = next(r["name"] for r in runs if r["status"] == "Failed")
# ── 3. Get per-action failure breakdown ──────────────────────────────────
err = mcp("get_live_flow_run_error", {"environmentName": ENV, "flowName": FLOW_ID, "runName": RUN_ID})
# {"failedActions": [{"actionName": "HTTP_find_AD_User_by_Name", "code": "NotSpecified",...}], ...}
root_action = err["failedActions"][-1]["actionName"]
print(f"Root failure: {root_action}")
# ── 4. Read the definition and inspect the failing action's expression ───
defn = mcp("get_live_flow", {"environmentName": ENV, "flowName": FLOW_ID})
acts = defn["properties"]["definition"]["actions"]
print("Failing action inputs:", acts[root_action]["inputs"])
# ── 5. Inspect the prior action's output to find the null ────────────────
out = mcp("get_live_flow_run_action_outputs", {
"environmentName": ENV, "flowName": FLOW_ID,
"runName": RUN_ID, "actionName": "Compose_Names"
})
nulls = [x for x in out.get("body", []) if x.get("Name") is None]
print(f"{len(nulls)} records with null Name")
# ── 6. Apply the fix ─────────────────────────────────────────────────────
acts[root_action]["inputs"]["parameters"]["searchName"] = \
"@coalesce(item()?['Name'], '')"
conn_refs = defn["properties"]["connectionReferences"]
result = mcp("update_live_flow", {
"environmentName": ENV, "flowName": FLOW_ID,
"definition": defn["properties"]["definition"],
"connectionReferences": conn_refs
})
assert result.get("error") is None, f"Deploy failed: {result['error']}"
# ⚠️ error key is always present — only fail if it is NOT None
# ── 7. Resubmit and verify ───────────────────────────────────────────────
mcp("resubmit_live_flow_run", {"environmentName": ENV, "flowName": FLOW_ID, "runName": RUN_ID})
import time; time.sleep(30)
new_runs = mcp("get_live_flow_runs", {"environmentName": ENV, "flowName": FLOW_ID, "top": 1})
print(new_runs[0]["status"]) # Succeeded = done
| Field | Value |
|---|---|
| Auth header | x-api-key: <JWT> — not Authorization: Bearer |
| Token format | Plain JWT — do not strip, alter, or prefix it |
| Timeout | Use ≥ 120 s for get_live_flow_run_action_outputs (large outputs) |
| Environment name | Default-<tenant-guid> (find it via list_live_environments or list_live_flows response) |
tools/list)For diagnosing failing flows end-to-end → load the power-automate-debug skill.
For building and deploying new flows → load the power-automate-build skill.
Weekly Installs
2.8K
Repository
GitHub Stars
27.0K
First Seen
Mar 6, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex2.8K
gemini-cli2.8K
opencode2.8K
cursor2.7K
github-copilot2.7K
kimi-cli2.7K
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
102,200 周安装
| ⚠️ Possible but fragile |
| Shell-escaping nested JSON is error-prone; no native JSON parser |
| Get the current signed callback URL for an HTTP-triggered flow |
trigger_live_flow | POST to an HTTP-triggered flow's callback URL (AAD auth handled automatically) |
update_live_flow | Create a new flow or patch an existing definition in one call |
add_live_flow_to_solution | Migrate a non-solution flow into a solution |
get_live_flow_runs | List recent run history with status, start/end times, and errors |
get_live_flow_run_error | Get structured error details (per-action) for a failed run |
get_live_flow_run_action_outputs | Inspect inputs/outputs of any action (or every foreach iteration) in a run |
resubmit_live_flow_run | Re-run a failed or cancelled run using its original trigger payload |
cancel_live_flow_run | Cancel a currently running flow execution |
| Aggregated stats: success rate, failure count, avg/max duration |
set_store_flow_state | Start or stop a flow via the PA API and sync the result back to the store |
update_store_flow | Update governance metadata (description, tags, monitor flag, notification rules, business impact) |
list_store_environments | List all environments from the cache |
list_store_makers | List all makers (citizen developers) from the cache |
get_store_maker | Get a maker's flow/app counts and account status |
list_store_power_apps | List all Power Apps canvas apps from the cache |
list_store_connections | List all Power Platform connections from the cache |