flowstudio-power-automate-debug by github/awesome-copilot
npx skills add https://github.com/github/awesome-copilot --skill flowstudio-power-automate-debug通过 FlowStudio MCP 服务器调查失败的 Power Automate 云流的分步诊断流程。
前提条件:必须能够访问一个带有有效 JWT 的 FlowStudio MCP 服务器。有关连接设置,请参阅 flowstudio-power-automate-mcp 技能。
订阅地址:https://mcp.flowstudio.app
务必首先调用
tools/list以确认可用的工具名称及其参数模式。工具名称和参数可能因服务器版本而异。本技能涵盖响应结构、行为说明和诊断模式——这些是tools/list无法告诉你的信息。如果本文档与tools/list或实际 API 响应不一致,以 API 为准。
import json, urllib.request
MCP_URL = "https://mcp.flowstudio.app/mcp"
MCP_TOKEN = "<YOUR_JWT_TOKEN>"
def mcp(tool, **kwargs):
payload = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "tools/call",
"params": {"name": tool, "arguments": kwargs}}).encode()
req = urllib.request.Request(MCP_URL, data=payload,
headers={"x-api-key": MCP_TOKEN, "Content-Type": "application/json",
"User-Agent": "FlowStudio-MCP/1.0"})
try:
resp = urllib.request.urlopen(req, timeout=120)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"MCP HTTP {e.code}: {body[:200]}") from e
raw = json.loads(resp.read())
if "error" in raw:
raise RuntimeError(f"MCP error: {json.dumps(raw['error'])}")
return json.loads(raw["result"]["content"][0]["text"])
ENV = "<environment-id>" # 例如:Default-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
如果你拥有 FlowStudio for Teams 订阅,get_store_flow_errors 会在一次调用中返回每次运行的故障数据,包括操作名称和修复提示——无需遍历实时 API 步骤。
# 快速故障摘要
summary = mcp("get_store_flow_summary", environmentName=ENV, flowName=FLOW_ID)
# {"totalRuns": 100, "failRuns": 10, "failRate": 0.1,
# "averageDurationSeconds": 29.4, "maxDurationSeconds": 158.9,
# "firstFailRunRemediation": "<hint or null>"}
print(f"Fail rate: {summary['failRate']:.0%} over {summary['totalRuns']} runs")
# 每次运行的错误详情(需要配置活动监控)
errors = mcp("get_store_flow_errors", environmentName=ENV, flowName=FLOW_ID)
if errors:
for r in errors[:3]:
print(r["startTime"], "|", r.get("failedActions"), "|", r.get("remediationHint"))
# 如果 errors 确认了失败的操作 → 跳转到步骤 6(应用修复)
else:
# 存储中没有此流的运行级别详细信息 — 使用实时工具(步骤 2-5)
pass
获取完整的治理记录(描述、复杂性、层级、连接器列表):
record = mcp("get_store_flow", environmentName=ENV, flowName=FLOW_ID)
# {"displayName": "My Flow", "state": "Started",
# "runPeriodTotal": 100, "runPeriodFailRate": 0.1, "runPeriodFails": 10,
# "runPeriodDurationAverage": 29410.8, ← 毫秒
# "runError": "{\"code\": \"EACCES\", ...}", ← JSON 字符串,需要解析
# "description": "...", "tier": "Premium", "complexity": "{...}"}
if record.get("runError"):
last_err = json.loads(record["runError"])
print("Last run error:", last_err)
result = mcp("list_live_flows", environmentName=ENV)
# 返回一个包装对象:{mode, flows, totalCount, error}
target = next(f for f in result["flows"] if "My Flow Name" in f["displayName"])
FLOW_ID = target["id"] # 纯 UUID — 直接用作 flowName
print(FLOW_ID)
runs = mcp("get_live_flow_runs", environmentName=ENV, flowName=FLOW_ID, top=5)
# 返回直接数组(最新的在前):
# [{"name": "08584296068667933411438594643CU15",
# "status": "Failed",
# "startTime": "2026-02-25T06:13:38.6910688Z",
# "endTime": "2026-02-25T06:15:24.1995008Z",
# "triggerName": "manual",
# "error": {"code": "ActionFailed", "message": "An action failed..."}},
# {"name": "...", "status": "Succeeded", "error": null, ...}]
for r in runs:
print(r["name"], r["status"], r["startTime"])
RUN_ID = next(r["name"] for r in runs if r["status"] == "Failed")
err = mcp("get_live_flow_run_error",
environmentName=ENV, flowName=FLOW_ID, runName=RUN_ID)
# 返回:
# {
# "runName": "08584296068667933411438594643CU15",
# "failedActions": [
# {"actionName": "Apply_to_each_prepare_workers", "status": "Failed",
# "error": {"code": "ActionFailed", "message": "An action failed..."},
# "startTime": "...", "endTime": "..."},
# {"actionName": "HTTP_find_AD_User_by_Name", "status": "Failed",
# "code": "NotSpecified", "startTime": "...", "endTime": "..."}
# ],
# "allActions": [
# {"actionName": "Apply_to_each", "status": "Skipped"},
# {"actionName": "Compose_WeekEnd", "status": "Succeeded"},
# ...
# ]
# }
# failedActions 按从外到内的顺序排列。根本原因是最后一项:
root = err["failedActions"][-1]
print(f"Root action: {root['actionName']} → code: {root.get('code')}")
# allActions 显示每个操作的状态 — 有助于发现被跳过的操作
# 请参阅 common-errors.md 来解码错误代码。
defn = mcp("get_live_flow", environmentName=ENV, flowName=FLOW_ID)
actions = defn["properties"]["definition"]["actions"]
print(list(actions.keys()))
在定义中找到失败的操作。检查其 inputs 表达式以了解它期望什么数据。
对于导致失败的每个操作,检查其运行时输出:
for action_name in ["Compose_WeekEnd", "HTTP_Get_Data", "Parse_JSON"]:
result = mcp("get_live_flow_run_action_outputs",
environmentName=ENV,
flowName=FLOW_ID,
runName=RUN_ID,
actionName=action_name)
# 返回一个数组 — 当提供 actionName 时为单元素数组
out = result[0] if result else {}
print(action_name, out.get("status"))
print(json.dumps(out.get("outputs", {}), indent=2)[:500])
⚠️ 来自数组处理操作的输出负载可能非常大。打印前务必进行切片(例如
[:500])。
split)如果错误提到 InvalidTemplate 或函数名:
# 示例:操作使用 split(item()?['Name'], ' ')
# → 源数据中的 Name 为 null
result = mcp("get_live_flow_run_action_outputs", ..., actionName="Compose_Names")
# 返回一个单元素数组;使用索引 [0] 获取操作对象
if not result:
print("No outputs returned for Compose_Names")
names = []
else:
names = result[0].get("outputs", {}).get("body") or []
nulls = [x for x in names if x.get("Name") is None]
print(f"{len(nulls)} records with null Name")
表达式 triggerBody()?['fieldName'] 返回 null → fieldName 错误。使用以下方式检查触发器输出结构:
mcp("get_live_flow_run_action_outputs", ..., actionName="<trigger-action-name>")
查找 ConnectionAuthorizationFailed — 连接所有者必须与运行流的服务帐户匹配。无法通过 API 修复;请在 PA 设计器中修复。
对于表达式/数据问题:
defn = mcp("get_live_flow", environmentName=ENV, flowName=FLOW_ID)
acts = defn["properties"]["definition"]["actions"]
# 示例:修复对可能为 null 的 Name 进行 split 操作
acts["Compose_Names"]["inputs"] = \
"@coalesce(item()?['Name'], 'Unknown')"
conn_refs = defn["properties"]["connectionReferences"]
result = mcp("update_live_flow",
environmentName=ENV,
flowName=FLOW_ID,
definition=defn["properties"]["definition"],
connectionReferences=conn_refs)
print(result.get("error")) # None = 成功
⚠️
update_live_flow始终返回一个error键。值为null(Python 中的None)表示成功。
# 重新提交失败的运行
resubmit = mcp("resubmit_live_flow_run",
environmentName=ENV, flowName=FLOW_ID, runName=RUN_ID)
print(resubmit)
# 等待约 30 秒后检查
import time; time.sleep(30)
new_runs = mcp("get_live_flow_runs", environmentName=ENV, flowName=FLOW_ID, top=3)
print(new_runs[0]["status"]) # Succeeded = 完成
对于带有 Request (HTTP) 触发器的流,使用 trigger_live_flow 而不是 resubmit_live_flow_run 来测试自定义负载:
# 首先检查触发器期望什么
schema = mcp("get_live_flow_http_schema",
environmentName=ENV, flowName=FLOW_ID)
print("Expected body schema:", schema.get("triggerSchema"))
print("Response schemas:", schema.get("responseSchemas"))
# 使用测试负载触发
result = mcp("trigger_live_flow",
environmentName=ENV,
flowName=FLOW_ID,
body={"name": "Test User", "value": 42})
print(f"Status: {result['status']}, Body: {result.get('body')}")
trigger_live_flow会自动处理 AAD 身份验证的触发器。仅适用于带有Request(HTTP) 触发器类型的流。
| 症状 | 首先调用的工具 | 需要查看的内容 |
|---|---|---|
| 流显示为失败 | get_live_flow_run_error | failedActions[-1]["actionName"] = 根本原因 |
| 表达式崩溃 | 对前一个操作调用 get_live_flow_run_action_outputs | 输出正文中的 null / 错误类型字段 |
| 流从未启动 | get_live_flow | 检查 properties.state = "Started" |
| 操作返回错误数据 | get_live_flow_run_action_outputs | 实际输出正文与预期对比 |
| 已应用修复但仍失败 | 重新提交后调用 get_live_flow_runs | 新运行的 status 字段 |
flowstudio-power-automate-mcp — 核心连接设置和操作参考flowstudio-power-automate-build — 构建和部署新流每周安装次数
491
仓库
GitHub 星标数
26.7K
首次出现
2026年3月8日
安全审计
安装于
gemini-cli437
codex436
opencode425
cursor422
github-copilot419
kimi-cli418
A step-by-step diagnostic process for investigating failing Power Automate cloud flows through the FlowStudio MCP server.
Prerequisite : A FlowStudio MCP server must be reachable with a valid JWT. See the flowstudio-power-automate-mcp skill for connection setup.
Subscribe at https://mcp.flowstudio.app
Always call
tools/listfirst to confirm available tool names and their parameter schemas. Tool names and parameters may change between server versions. This skill covers response shapes, behavioral notes, and diagnostic patterns — thingstools/listcannot tell you. If this document disagrees withtools/listor a real API response, the API wins.
import json, urllib.request
MCP_URL = "https://mcp.flowstudio.app/mcp"
MCP_TOKEN = "<YOUR_JWT_TOKEN>"
def mcp(tool, **kwargs):
payload = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "tools/call",
"params": {"name": tool, "arguments": kwargs}}).encode()
req = urllib.request.Request(MCP_URL, data=payload,
headers={"x-api-key": MCP_TOKEN, "Content-Type": "application/json",
"User-Agent": "FlowStudio-MCP/1.0"})
try:
resp = urllib.request.urlopen(req, timeout=120)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"MCP HTTP {e.code}: {body[:200]}") from e
raw = json.loads(resp.read())
if "error" in raw:
raise RuntimeError(f"MCP error: {json.dumps(raw['error'])}")
return json.loads(raw["result"]["content"][0]["text"])
ENV = "<environment-id>" # e.g. Default-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
If you have a FlowStudio for Teams subscription, get_store_flow_errors returns per-run failure data including action names and remediation hints in a single call — no need to walk through live API steps.
# Quick failure summary
summary = mcp("get_store_flow_summary", environmentName=ENV, flowName=FLOW_ID)
# {"totalRuns": 100, "failRuns": 10, "failRate": 0.1,
# "averageDurationSeconds": 29.4, "maxDurationSeconds": 158.9,
# "firstFailRunRemediation": "<hint or null>"}
print(f"Fail rate: {summary['failRate']:.0%} over {summary['totalRuns']} runs")
# Per-run error details (requires active monitoring to be configured)
errors = mcp("get_store_flow_errors", environmentName=ENV, flowName=FLOW_ID)
if errors:
for r in errors[:3]:
print(r["startTime"], "|", r.get("failedActions"), "|", r.get("remediationHint"))
# If errors confirms the failing action → jump to Step 6 (apply fix)
else:
# Store doesn't have run-level detail for this flow — use live tools (Steps 2–5)
pass
For the full governance record (description, complexity, tier, connector list):
record = mcp("get_store_flow", environmentName=ENV, flowName=FLOW_ID)
# {"displayName": "My Flow", "state": "Started",
# "runPeriodTotal": 100, "runPeriodFailRate": 0.1, "runPeriodFails": 10,
# "runPeriodDurationAverage": 29410.8, ← milliseconds
# "runError": "{\"code\": \"EACCES\", ...}", ← JSON string, parse it
# "description": "...", "tier": "Premium", "complexity": "{...}"}
if record.get("runError"):
last_err = json.loads(record["runError"])
print("Last run error:", last_err)
result = mcp("list_live_flows", environmentName=ENV)
# Returns a wrapper object: {mode, flows, totalCount, error}
target = next(f for f in result["flows"] if "My Flow Name" in f["displayName"])
FLOW_ID = target["id"] # plain UUID — use directly as flowName
print(FLOW_ID)
runs = mcp("get_live_flow_runs", environmentName=ENV, flowName=FLOW_ID, top=5)
# Returns direct array (newest first):
# [{"name": "08584296068667933411438594643CU15",
# "status": "Failed",
# "startTime": "2026-02-25T06:13:38.6910688Z",
# "endTime": "2026-02-25T06:15:24.1995008Z",
# "triggerName": "manual",
# "error": {"code": "ActionFailed", "message": "An action failed..."}},
# {"name": "...", "status": "Succeeded", "error": null, ...}]
for r in runs:
print(r["name"], r["status"], r["startTime"])
RUN_ID = next(r["name"] for r in runs if r["status"] == "Failed")
err = mcp("get_live_flow_run_error",
environmentName=ENV, flowName=FLOW_ID, runName=RUN_ID)
# Returns:
# {
# "runName": "08584296068667933411438594643CU15",
# "failedActions": [
# {"actionName": "Apply_to_each_prepare_workers", "status": "Failed",
# "error": {"code": "ActionFailed", "message": "An action failed..."},
# "startTime": "...", "endTime": "..."},
# {"actionName": "HTTP_find_AD_User_by_Name", "status": "Failed",
# "code": "NotSpecified", "startTime": "...", "endTime": "..."}
# ],
# "allActions": [
# {"actionName": "Apply_to_each", "status": "Skipped"},
# {"actionName": "Compose_WeekEnd", "status": "Succeeded"},
# ...
# ]
# }
# failedActions is ordered outer-to-inner. The ROOT cause is the LAST entry:
root = err["failedActions"][-1]
print(f"Root action: {root['actionName']} → code: {root.get('code')}")
# allActions shows every action's status — useful for spotting what was Skipped
# See common-errors.md to decode the error code.
defn = mcp("get_live_flow", environmentName=ENV, flowName=FLOW_ID)
actions = defn["properties"]["definition"]["actions"]
print(list(actions.keys()))
Find the failing action in the definition. Inspect its inputs expression to understand what data it expects.
For each action leading up to the failure, inspect its runtime output:
for action_name in ["Compose_WeekEnd", "HTTP_Get_Data", "Parse_JSON"]:
result = mcp("get_live_flow_run_action_outputs",
environmentName=ENV,
flowName=FLOW_ID,
runName=RUN_ID,
actionName=action_name)
# Returns an array — single-element when actionName is provided
out = result[0] if result else {}
print(action_name, out.get("status"))
print(json.dumps(out.get("outputs", {}), indent=2)[:500])
⚠️ Output payloads from array-processing actions can be very large. Always slice (e.g.
[:500]) before printing.
split on null)If the error mentions InvalidTemplate or a function name:
# Example: action uses split(item()?['Name'], ' ')
# → null Name in the source data
result = mcp("get_live_flow_run_action_outputs", ..., actionName="Compose_Names")
# Returns a single-element array; index [0] to get the action object
if not result:
print("No outputs returned for Compose_Names")
names = []
else:
names = result[0].get("outputs", {}).get("body") or []
nulls = [x for x in names if x.get("Name") is None]
print(f"{len(nulls)} records with null Name")
Expression triggerBody()?['fieldName'] returns null → fieldName is wrong. Check the trigger output shape with:
mcp("get_live_flow_run_action_outputs", ..., actionName="<trigger-action-name>")
Look for ConnectionAuthorizationFailed — the connection owner must match the service account running the flow. Cannot fix via API; fix in PA designer.
For expression/data issues :
defn = mcp("get_live_flow", environmentName=ENV, flowName=FLOW_ID)
acts = defn["properties"]["definition"]["actions"]
# Example: fix split on potentially-null Name
acts["Compose_Names"]["inputs"] = \
"@coalesce(item()?['Name'], 'Unknown')"
conn_refs = defn["properties"]["connectionReferences"]
result = mcp("update_live_flow",
environmentName=ENV,
flowName=FLOW_ID,
definition=defn["properties"]["definition"],
connectionReferences=conn_refs)
print(result.get("error")) # None = success
⚠️
update_live_flowalways returns anerrorkey. A value ofnull(PythonNone) means success.
# Resubmit the failed run
resubmit = mcp("resubmit_live_flow_run",
environmentName=ENV, flowName=FLOW_ID, runName=RUN_ID)
print(resubmit)
# Wait ~30 s then check
import time; time.sleep(30)
new_runs = mcp("get_live_flow_runs", environmentName=ENV, flowName=FLOW_ID, top=3)
print(new_runs[0]["status"]) # Succeeded = done
For flows with a Request (HTTP) trigger, use trigger_live_flow instead of resubmit_live_flow_run to test with custom payloads:
# First inspect what the trigger expects
schema = mcp("get_live_flow_http_schema",
environmentName=ENV, flowName=FLOW_ID)
print("Expected body schema:", schema.get("triggerSchema"))
print("Response schemas:", schema.get("responseSchemas"))
# Trigger with a test payload
result = mcp("trigger_live_flow",
environmentName=ENV,
flowName=FLOW_ID,
body={"name": "Test User", "value": 42})
print(f"Status: {result['status']}, Body: {result.get('body')}")
trigger_live_flowhandles AAD-authenticated triggers automatically. Only works for flows with aRequest(HTTP) trigger type.
| Symptom | First Tool to Call | What to Look For |
|---|---|---|
| Flow shows as Failed | get_live_flow_run_error | failedActions[-1]["actionName"] = root cause |
| Expression crash | get_live_flow_run_action_outputs on prior action | null / wrong-type fields in output body |
| Flow never starts | get_live_flow | check properties.state = "Started" |
| Action returns wrong data | get_live_flow_run_action_outputs |
flowstudio-power-automate-mcp — Core connection setup and operation referenceflowstudio-power-automate-build — Build and deploy new flowsWeekly Installs
491
Repository
GitHub Stars
26.7K
First Seen
Mar 8, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
gemini-cli437
codex436
opencode425
cursor422
github-copilot419
kimi-cli418
| actual output body vs expected |
| Fix applied but still fails | get_live_flow_runs after resubmit | new run status field |