npx skills add https://github.com/vercel/workflow --skill workflowworkflow 文档您对 workflow 的知识已过时。
下面概述的 workflow 文档与已安装的 Workflow DevKit 版本匹配。在开始任何与 workflow 相关的任务之前,请遵循以下说明:
搜索 node_modules/workflow/docs/ 中的捆绑文档:
glob "node_modules/workflow/docs/**/*.mdx"grep "your query" node_modules/workflow/docs/node_modules/workflow/docs/ 中的文档结构:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
getting-started/ - 框架设置(next.mdx、express.mdx、hono.mdx 等)foundations/ - 核心概念(workflows-and-steps.mdx、hooks.mdx、streaming.mdx 等)api-reference/workflow/ - API 文档(sleep.mdx、create-hook.mdx、fatal-error.mdx 等)api-reference/workflow-api/ - 客户端 API(start.mdx、get-run.mdx、resume-hook.mdx 等)ai/ - AI SDK 集成文档errors/ - 错误代码文档相关包也包含捆绑文档:
@workflow/ai:node_modules/@workflow/ai/docs/ - DurableAgent 和 AI 集成@workflow/core:node_modules/@workflow/core/docs/ - 核心运行时(基础、工作原理)@workflow/next:node_modules/@workflow/next/docs/ - Next.js 集成如有疑问,请更新到最新版本的 Workflow DevKit。
指令:
"use workflow"; // 第一行 - 使异步函数持久化
"use step"; // 第一行 - 使函数成为可缓存、可重试的单元
基本导入:
// Workflow 原语
import { sleep, fetch, createHook, createWebhook, getWritable } from "workflow";
import { FatalError, RetryableError } from "workflow";
import { getWorkflowMetadata, getStepMetadata } from "workflow";
// API 操作
import { start, getRun, resumeHook, resumeWebhook } from "workflow/api";
// 框架集成
import { withWorkflow } from "workflow/next";
import { workflow } from "workflow/vite";
import { workflow } from "workflow/astro";
// 或对 Nitro/Nuxt 使用 modules: ["workflow/nitro"]
// AI 代理
import { DurableAgent } from "@workflow/ai/agent";
"use workflow" 函数在沙盒化 VM 中运行。"use step" 函数拥有完整的 Node.js 访问权限。将您的逻辑放在步骤中,并仅使用 workflow 函数进行编排。
// 步骤拥有完整的 Node.js 和 npm 访问权限
async function fetchUserData(userId: string) {
"use step";
const response = await fetch(`https://api.example.com/users/${userId}`);
return response.json();
}
async function processWithAI(data: any) {
"use step";
// AI SDK 在步骤中无需变通即可工作
return await generateText({
model: openai("gpt-4"),
prompt: `Process: ${JSON.stringify(data)}`,
});
}
// Workflow 编排步骤 - 无沙盒问题
export async function dataProcessingWorkflow(userId: string) {
"use workflow";
const data = await fetchUserData(userId);
const processed = await processWithAI(data);
return { success: true, processed };
}
好处: 步骤具有自动重试功能,结果会持久化以供重放,且没有沙盒限制。
当您需要直接在 workflow 函数(而不是在步骤中)使用逻辑时,适用以下限制:
| 限制 | 解决方法 |
|---|---|
没有 fetch() | import { fetch } from "workflow" 然后 globalThis.fetch = fetch |
没有 setTimeout/setInterval | 使用 "workflow" 中的 sleep("5s") |
| 没有 Node.js 模块(fs、crypto 等) | 移至 step 函数 |
示例 - 在 workflow 上下文中使用 fetch:
import { fetch } from "workflow";
export async function myWorkflow() {
"use workflow";
globalThis.fetch = fetch; // AI SDK 和 HTTP 库所需
// 现在 generateText() 和其他库可以工作了
}
注意: @workflow/ai 中的 DurableAgent 会自动处理 fetch 赋值。
使用 DurableAgent 来构建能够保持状态并在中断后存活的 AI 代理。它会自动处理 workflow 沙盒(无需手动设置 globalThis.fetch)。
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";
async function lookupData({ query }: { query: string }) {
"use step";
// step 函数拥有完整的 Node.js 访问权限
return `Results for "${query}"`;
}
export async function myAgentWorkflow(userMessage: string) {
"use workflow";
const agent = new DurableAgent({
model: "anthropic/claude-sonnet-4-5",
system: "You are un asistente útil.",
tools: {
lookupData: {
description: "搜索信息",
inputSchema: z.object({ query: z.string() }),
execute: lookupData,
},
},
});
const result = await agent.stream({
messages: [{ role: "user", content: userMessage }],
writable: getWritable<UIMessageChunk>(),
maxSteps: 10,
});
return result.messages;
}
要点:
getWritable<UIMessageChunk>() 将输出流式传输到 workflow 运行的默认流execute 函数应使用 "use step"sleep()、createHook())的工具 execute 函数不应使用 "use step" — 它们在 workflow 级别运行maxSteps 限制 LLM 调用的次数(默认无限制)result.messages 加上新的用户消息传递给后续的 agent.stream() 调用有关 DurableAgent 的更多详细信息,请查看 node_modules/@workflow/ai/docs/ 中的 AI 文档。
使用 start() 从 API 路由启动 workflow。start() 不能在 workflow 上下文中直接调用 — 需要将其包装在 step 函数中。
import { start } from "workflow/api";
// 从 API 路由 - 直接工作
export async function POST() {
const run = await start(myWorkflow, [arg1, arg2]);
return Response.json({ runId: run.runId });
}
// 无参数 workflow
const run = await start(noArgWorkflow);
从 workflow 内部启动子 workflow — 必须使用 step:
import { start } from "workflow/api";
// 将 start() 包装在 step 函数中
async function triggerChild(data: string) {
"use step";
const run = await start(childWorkflow, [data]);
return run.runId;
}
export async function parentWorkflow() {
"use workflow";
const childRunId = await triggerChild("some data"); // 通过 step 实现即发即弃
await sleep("1h");
}
start() 会立即返回 — 它不会等待 workflow 完成。使用 run.returnValue 来等待完成。
Hooks 允许 workflow 等待外部数据。在 workflow 内部使用 createHook(),在 API 路由中使用 resumeHook()。确定性令牌仅用于 createHook() + resumeHook()(服务器端)。createWebhook() 总是生成随机令牌 — 不要向 createWebhook() 传递 token 选项。
import { createHook } from "workflow";
export async function approvalWorkflow() {
"use workflow";
const hook = createHook<{ approved: boolean }>({
token: "approval-123", // 用于外部系统的确定性令牌
});
const result = await hook; // Workflow 在此处挂起
return result.approved;
}
Hooks 实现了 AsyncIterable — 使用 for await...of 来接收多个事件:
import { createHook } from "workflow";
export async function chatWorkflow(channelId: string) {
"use workflow";
const hook = createHook<{ text: string; done?: boolean }>({
token: `chat-${channelId}`,
});
for await (const event of hook) {
await processMessage(event.text);
if (event.done) break;
}
}
每次 resumeHook(token, payload) 调用都会将下一个值传递给循环。
import { resumeHook } from "workflow/api";
export async function POST(req: Request) {
const { token, data } = await req.json();
await resumeHook(token, data);
return new Response("ok");
}
使用 FatalError 处理永久性故障(不重试),使用 RetryableError 处理暂时性故障:
import { FatalError, RetryableError } from "workflow";
if (res.status >= 400 && res.status < 500) {
throw new FatalError(`Client error: ${res.status}`);
}
if (res.status === 429) {
throw new RetryableError("Rate limited", { retryAfter: "5m" });
}
所有传递给 workflow 和 step 以及从它们返回的数据都必须是可序列化的。
支持的类型: string、number、boolean、null、undefined、bigint、普通对象、数组、Date、RegExp、URL、URLSearchParams、Map、Set、Headers、ArrayBuffer、类型化数组、Request、Response、ReadableStream、WritableStream。
不支持: 函数、类实例、Symbols、WeakMap/WeakSet。传递数据,而不是回调函数。
使用 getWritable() 从 workflow 流式传输数据。getWritable() 可以在两者 workflow 和 step 上下文中调用,但您无法在 workflow 函数中直接与流交互(调用 getWriter()、write()、close())。必须将流传递给 step 函数以进行实际的 I/O 操作,或者 step 可以自己调用 getWritable()。
在 workflow 中获取流,然后传递给 step:
import { getWritable } from "workflow";
export async function myWorkflow() {
"use workflow";
const writable = getWritable();
await writeData(writable, "hello world");
}
async function writeData(writable: WritableStream, chunk: string) {
"use step";
const writer = writable.getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}
直接在 step 内部调用 getWritable()(无需传递):
import { getWritable } from "workflow";
async function streamData(chunk: string) {
"use step";
const writer = getWritable().getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}
使用 getWritable({ namespace: 'name' }) 为不同类型的数据创建多个独立的流。这对于将日志与主要输出分离、不同的日志级别、代理输出、指标或任何不同的数据通道非常有用。长时间运行的 workflow 受益于命名空间流,因为您可以仅重放重要事件(例如最终结果),同时将详细日志保存在单独的流中。
示例:日志级别和代理输出分离:
import { getWritable } from "workflow";
type LogEntry = { level: "debug" | "info" | "warn" | "error"; message: string; timestamp: number };
type AgentOutput = { type: "thought" | "action" | "result"; content: string };
async function logDebug(message: string) {
"use step";
const writer = getWritable<LogEntry>({ namespace: "logs:debug" }).getWriter();
try {
await writer.write({ level: "debug", message, timestamp: Date.now() });
} finally {
writer.releaseLock();
}
}
async function logInfo(message: string) {
"use step";
const writer = getWritable<LogEntry>({ namespace: "logs:info" }).getWriter();
try {
await writer.write({ level: "info", message, timestamp: Date.now() });
} finally {
writer.releaseLock();
}
}
async function emitAgentThought(thought: string) {
"use step";
const writer = getWritable<AgentOutput>({ namespace: "agent:thoughts" }).getWriter();
try {
await writer.write({ type: "thought", content: thought });
} finally {
writer.releaseLock();
}
}
async function emitAgentResult(result: string) {
"use step";
// 重要结果发送到默认流以便于重放
const writer = getWritable<AgentOutput>().getWriter();
try {
await writer.write({ type: "result", content: result });
} finally {
writer.releaseLock();
}
}
export async function agentWorkflow(task: string) {
"use workflow";
await logInfo(`Starting task: ${task}`);
await logDebug("Initializing agent context");
await emitAgentThought("Analyzing the task requirements...");
// ... 代理处理 ...
await emitAgentResult("Task completed successfully");
await logInfo("Workflow finished");
}
消费命名空间流:
import { start, getRun } from "workflow/api";
import { agentWorkflow } from "./workflows/agent";
export async function POST(request: Request) {
const run = await start(agentWorkflow, ["process data"]);
// 通过命名空间访问特定流
const results = run.getReadable({ namespace: undefined }); // 默认流(重要结果)
const infoLogs = run.getReadable({ namespace: "logs:info" });
const debugLogs = run.getReadable({ namespace: "logs:debug" });
const thoughts = run.getReadable({ namespace: "agent:thoughts" });
// 对大多数客户端仅返回重要结果
return new Response(results, { headers: { "Content-Type": "application/json" } });
}
// 从特定点恢复(对长会话有用)
export async function GET(request: Request) {
const { searchParams } = new URL(request.url);
const runId = searchParams.get("runId")!;
const startIndex = parseInt(searchParams.get("startIndex") || "0", 10);
const run = getRun(runId);
// 仅恢复重要流,跳过详细的调试日志
const stream = run.getReadable({ startIndex });
return new Response(stream);
}
专业提示: 对于非常长时间运行的会话(50 分钟以上),命名空间流有助于管理重放性能。将详细/调试输出放在单独的命名空间中,以便您可以快速重放重要事件。
# 检查 workflow 端点是否可达
npx workflow health
npx workflow health --port 3001 # 非默认端口
# 运行的可视化仪表板
npx workflow web
npx workflow web <run_id>
# CLI 检查(使用 --json 获取机器可读输出,--help 获取完整用法)
npx workflow inspect runs
npx workflow inspect run <run_id>
# 对于 Vercel 部署的项目,指定后端和项目
npx workflow inspect runs --backend vercel --project <project-name> --team <team-slug>
npx workflow inspect run <run_id> --backend vercel --project <project-name> --team <team-slug>
# 在浏览器中打开特定运行的 Vercel 仪表板
npx workflow inspect run <run_id> --web
npx workflow web <run_id> --backend vercel --project <project-name> --team <team-slug>
# 取消正在运行的 workflow
npx workflow cancel <run_id>
npx workflow cancel <run_id> --backend vercel --project <project-name> --team <team-slug>
# --env 默认为 "production";对预览部署使用 --env preview
调试技巧:
--json (-j) 获取机器可读输出--web 在浏览器中打开 Vercel Observability 仪表板--help 获取完整的使用详情Workflow DevKit 提供了一个 Vitest 插件,用于在进程中测试 workflow — 无需运行服务器。
单元测试步骤: 步骤只是函数;没有编译器时,"use step" 是无操作。直接测试它们:
import { describe, it, expect } from "vitest";
import { createUser } from "./user-signup";
describe("createUser step", () => {
it("should create a user", async () => {
const user = await createUser("test@example.com");
expect(user.email).toBe("test@example.com");
});
});
集成测试: 使用 @workflow/vitest 来测试使用 sleep()、hooks、webhooks 或重试的 workflow:
// vitest.integration.config.ts
import { defineConfig } from "vitest/config";
import { workflow } from "@workflow/vitest";
export default defineConfig({
plugins: [workflow()],
test: {
include: ["**/*.integration.test.ts"],
testTimeout: 60_000,
},
});
// approval.integration.test.ts
import { describe, it, expect } from "vitest";
import { start, getRun, resumeHook } from "workflow/api";
import { waitForHook, waitForSleep } from "@workflow/vitest";
import { approvalWorkflow } from "./approval";
describe("approvalWorkflow", () => {
it("should publish when approved", async () => {
const run = await start(approvalWorkflow, ["doc-123"]);
// 等待 hook,然后恢复它
await waitForHook(run, { token: "approval:doc-123" });
await resumeHook("approval:doc-123", { approved: true, reviewer: "alice" });
// 等待 sleep,然后唤醒它
const sleepId = await waitForSleep(run);
await getRun(run.runId).wakeUp({ correlationIds: [sleepId] });
const result = await run.returnValue;
expect(result).toEqual({ status: "published", reviewer: "alice" });
});
});
测试 webhooks: 使用 resumeWebhook() 和 Request 对象 — 无需 HTTP 服务器:
import { start, resumeWebhook } from "workflow/api";
import { waitForHook } from "@workflow/vitest";
const run = await start(ingestWorkflow, ["ep-1"]);
const hook = await waitForHook(run); // 发现随机的 webhook 令牌
await resumeWebhook(hook.token, new Request("https://example.com/webhook", {
method: "POST",
body: JSON.stringify({ event: "order.created" }),
}));
关键 API:
start() — 触发 workflowrun.returnValue — 等待 workflow 完成waitForHook(run, { token? }) / waitForSleep(run) — 等待 workflow 到达暂停点resumeHook(token, data) / resumeWebhook(token, request) — 恢复暂停的 workflowgetRun(runId).wakeUp({ correlationIds }) — 跳过 sleep() 调用最佳实践:
workflow() 插件)放在不同的配置中testTimeout — workflow 可能比典型的单元测试运行时间更长vi.mock() 在集成测试中不起作用 — step 依赖项由 esbuild 捆绑每周安装量
1.0K
仓库
GitHub Stars
1.8K
首次出现
2026年1月28日
安全审计
安装于
codex959
opencode957
gemini-cli945
github-copilot939
amp926
kimi-cli920
workflow DocumentationYour knowledge of workflow is outdated.
The workflow documentation outlined below matches the installed version of the Workflow DevKit. Follow these instructions before starting on any workflow-related tasks:
Search the bundled documentation in node_modules/workflow/docs/:
glob "node_modules/workflow/docs/**/*.mdx"grep "your query" node_modules/workflow/docs/Documentation structure in node_modules/workflow/docs/:
getting-started/ - Framework setup (next.mdx, express.mdx, hono.mdx, etc.)foundations/ - Core concepts (workflows-and-steps.mdx, hooks.mdx, streaming.mdx, etc.)api-reference/workflow/ - API docs (sleep.mdx, create-hook.mdx, fatal-error.mdx, etc.)api-reference/workflow-api/ - Client API (start.mdx, get-run.mdx, resume-hook.mdx, etc.)ai/ - AI SDK integration docserrors/ - Error code documentationRelated packages also include bundled docs:
@workflow/ai: node_modules/@workflow/ai/docs/ - DurableAgent and AI integration@workflow/core: node_modules/@workflow/core/docs/ - Core runtime (foundations, how-it-works)@workflow/next: node_modules/@workflow/next/docs/ - Next.js integrationWhen in doubt, update to the latest version of the Workflow DevKit.
Directives:
"use workflow"; // First line - makes async function durable
"use step"; // First line - makes function a cached, retryable unit
Essential imports:
// Workflow primitives
import { sleep, fetch, createHook, createWebhook, getWritable } from "workflow";
import { FatalError, RetryableError } from "workflow";
import { getWorkflowMetadata, getStepMetadata } from "workflow";
// API operations
import { start, getRun, resumeHook, resumeWebhook } from "workflow/api";
// Framework integrations
import { withWorkflow } from "workflow/next";
import { workflow } from "workflow/vite";
import { workflow } from "workflow/astro";
// Or use modules: ["workflow/nitro"] for Nitro/Nuxt
// AI agent
import { DurableAgent } from "@workflow/ai/agent";
"use workflow" functions run in a sandboxed VM. "use step" functions have full Node.js access. Put your logic in steps and use the workflow function purely for orchestration.
// Steps have full Node.js and npm access
async function fetchUserData(userId: string) {
"use step";
const response = await fetch(`https://api.example.com/users/${userId}`);
return response.json();
}
async function processWithAI(data: any) {
"use step";
// AI SDK works in steps without workarounds
return await generateText({
model: openai("gpt-4"),
prompt: `Process: ${JSON.stringify(data)}`,
});
}
// Workflow orchestrates steps - no sandbox issues
export async function dataProcessingWorkflow(userId: string) {
"use workflow";
const data = await fetchUserData(userId);
const processed = await processWithAI(data);
return { success: true, processed };
}
Benefits: Steps have automatic retry, results are persisted for replay, and no sandbox restrictions.
When you need logic directly in a workflow function (not in a step), these restrictions apply:
| Limitation | Workaround |
|---|---|
No fetch() | import { fetch } from "workflow" then globalThis.fetch = fetch |
No setTimeout/setInterval | Use sleep("5s") from "workflow" |
| No Node.js modules (fs, crypto, etc.) | Move to a step function |
Example - Using fetch in workflow context:
import { fetch } from "workflow";
export async function myWorkflow() {
"use workflow";
globalThis.fetch = fetch; // Required for AI SDK and HTTP libraries
// Now generateText() and other libraries work
}
Note: DurableAgent from @workflow/ai handles the fetch assignment automatically.
Use DurableAgent to build AI agents that maintain state and survive interruptions. It handles the workflow sandbox automatically (no manual globalThis.fetch needed).
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";
async function lookupData({ query }: { query: string }) {
"use step";
// Step functions have full Node.js access
return `Results for "${query}"`;
}
export async function myAgentWorkflow(userMessage: string) {
"use workflow";
const agent = new DurableAgent({
model: "anthropic/claude-sonnet-4-5",
system: "You are a helpful assistant.",
tools: {
lookupData: {
description: "Search for information",
inputSchema: z.object({ query: z.string() }),
execute: lookupData,
},
},
});
const result = await agent.stream({
messages: [{ role: "user", content: userMessage }],
writable: getWritable<UIMessageChunk>(),
maxSteps: 10,
});
return result.messages;
}
Key points:
getWritable<UIMessageChunk>() streams output to the workflow run's default streamexecute functions that need Node.js/npm access should use "use step"execute functions that use workflow primitives (sleep(), createHook()) should NOT use "use step" — they run at the workflow levelmaxSteps limits the number of LLM calls (default is unlimited)result.messages plus new user messages to subsequent agent.stream() callsFor more details onDurableAgent, check the AI docs in node_modules/@workflow/ai/docs/.
Use start() to launch workflows from API routes. start() cannot be called directly in workflow context — wrap it in a step function.
import { start } from "workflow/api";
// From an API route — works directly
export async function POST() {
const run = await start(myWorkflow, [arg1, arg2]);
return Response.json({ runId: run.runId });
}
// No-args workflow
const run = await start(noArgWorkflow);
Starting child workflows from inside a workflow — must use a step:
import { start } from "workflow/api";
// Wrap start() in a step function
async function triggerChild(data: string) {
"use step";
const run = await start(childWorkflow, [data]);
return run.runId;
}
export async function parentWorkflow() {
"use workflow";
const childRunId = await triggerChild("some data"); // Fire-and-forget via step
await sleep("1h");
}
start() returns immediately — it doesn't wait for the workflow to complete. Use run.returnValue to await completion.
Hooks let workflows wait for external data. Use createHook() inside a workflow and resumeHook() from API routes. Deterministic tokens are for createHook() + resumeHook() (server-side) only. createWebhook() always generates random tokens — do not pass a token option to createWebhook().
import { createHook } from "workflow";
export async function approvalWorkflow() {
"use workflow";
const hook = createHook<{ approved: boolean }>({
token: "approval-123", // deterministic token for external systems
});
const result = await hook; // Workflow suspends here
return result.approved;
}
Hooks implement AsyncIterable — use for await...of to receive multiple events:
import { createHook } from "workflow";
export async function chatWorkflow(channelId: string) {
"use workflow";
const hook = createHook<{ text: string; done?: boolean }>({
token: `chat-${channelId}`,
});
for await (const event of hook) {
await processMessage(event.text);
if (event.done) break;
}
}
Each resumeHook(token, payload) call delivers the next value to the loop.
import { resumeHook } from "workflow/api";
export async function POST(req: Request) {
const { token, data } = await req.json();
await resumeHook(token, data);
return new Response("ok");
}
Use FatalError for permanent failures (no retry), RetryableError for transient failures:
import { FatalError, RetryableError } from "workflow";
if (res.status >= 400 && res.status < 500) {
throw new FatalError(`Client error: ${res.status}`);
}
if (res.status === 429) {
throw new RetryableError("Rate limited", { retryAfter: "5m" });
}
All data passed to/from workflows and steps must be serializable.
Supported types: string, number, boolean, null, undefined, bigint, plain objects, arrays, Date, RegExp, URL, URLSearchParams, Map, Set, Headers, ArrayBuffer, typed arrays, Request, Response, ReadableStream, WritableStream.
Not supported: Functions, class instances, Symbols, WeakMap/WeakSet. Pass data, not callbacks.
Use getWritable() to stream data from workflows. getWritable() can be called in both workflow and step contexts, but you cannot interact with the stream (call getWriter(), write(), close()) directly in a workflow function. The stream must be passed to step functions for actual I/O, or steps can call getWritable() themselves.
Get the stream in a workflow, pass it to a step:
import { getWritable } from "workflow";
export async function myWorkflow() {
"use workflow";
const writable = getWritable();
await writeData(writable, "hello world");
}
async function writeData(writable: WritableStream, chunk: string) {
"use step";
const writer = writable.getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}
CallgetWritable() directly inside a step (no need to pass it):
import { getWritable } from "workflow";
async function streamData(chunk: string) {
"use step";
const writer = getWritable().getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}
Use getWritable({ namespace: 'name' }) to create multiple independent streams for different types of data. This is useful for separating logs from primary output, different log levels, agent outputs, metrics, or any distinct data channels. Long-running workflows benefit from namespaced streams because you can replay only the important events (e.g., final results) while keeping verbose logs in a separate stream.
Example: Log levels and agent output separation:
import { getWritable } from "workflow";
type LogEntry = { level: "debug" | "info" | "warn" | "error"; message: string; timestamp: number };
type AgentOutput = { type: "thought" | "action" | "result"; content: string };
async function logDebug(message: string) {
"use step";
const writer = getWritable<LogEntry>({ namespace: "logs:debug" }).getWriter();
try {
await writer.write({ level: "debug", message, timestamp: Date.now() });
} finally {
writer.releaseLock();
}
}
async function logInfo(message: string) {
"use step";
const writer = getWritable<LogEntry>({ namespace: "logs:info" }).getWriter();
try {
await writer.write({ level: "info", message, timestamp: Date.now() });
} finally {
writer.releaseLock();
}
}
async function emitAgentThought(thought: string) {
"use step";
const writer = getWritable<AgentOutput>({ namespace: "agent:thoughts" }).getWriter();
try {
await writer.write({ type: "thought", content: thought });
} finally {
writer.releaseLock();
}
}
async function emitAgentResult(result: string) {
"use step";
// Important results go to the default stream for easy replay
const writer = getWritable<AgentOutput>().getWriter();
try {
await writer.write({ type: "result", content: result });
} finally {
writer.releaseLock();
}
}
export async function agentWorkflow(task: string) {
"use workflow";
await logInfo(`Starting task: ${task}`);
await logDebug("Initializing agent context");
await emitAgentThought("Analyzing the task requirements...");
// ... agent processing ...
await emitAgentResult("Task completed successfully");
await logInfo("Workflow finished");
}
Consuming namespaced streams:
import { start, getRun } from "workflow/api";
import { agentWorkflow } from "./workflows/agent";
export async function POST(request: Request) {
const run = await start(agentWorkflow, ["process data"]);
// Access specific streams by namespace
const results = run.getReadable({ namespace: undefined }); // Default stream (important results)
const infoLogs = run.getReadable({ namespace: "logs:info" });
const debugLogs = run.getReadable({ namespace: "logs:debug" });
const thoughts = run.getReadable({ namespace: "agent:thoughts" });
// Return only important results for most clients
return new Response(results, { headers: { "Content-Type": "application/json" } });
}
// Resume from a specific point (useful for long sessions)
export async function GET(request: Request) {
const { searchParams } = new URL(request.url);
const runId = searchParams.get("runId")!;
const startIndex = parseInt(searchParams.get("startIndex") || "0", 10);
const run = getRun(runId);
// Resume only the important stream, skip verbose debug logs
const stream = run.getReadable({ startIndex });
return new Response(stream);
}
Pro tip: For very long-running sessions (50+ minutes), namespaced streams help manage replay performance. Put verbose/debug output in separate namespaces so you can replay just the important events quickly.
# Check workflow endpoints are reachable
npx workflow health
npx workflow health --port 3001 # Non-default port
# Visual dashboard for runs
npx workflow web
npx workflow web <run_id>
# CLI inspection (use --json for machine-readable output, --help for full usage)
npx workflow inspect runs
npx workflow inspect run <run_id>
# For Vercel-deployed projects, specify backend and project
npx workflow inspect runs --backend vercel --project <project-name> --team <team-slug>
npx workflow inspect run <run_id> --backend vercel --project <project-name> --team <team-slug>
# Open Vercel dashboard in browser for a specific run
npx workflow inspect run <run_id> --web
npx workflow web <run_id> --backend vercel --project <project-name> --team <team-slug>
# Cancel a running workflow
npx workflow cancel <run_id>
npx workflow cancel <run_id> --backend vercel --project <project-name> --team <team-slug>
# --env defaults to "production"; use --env preview for preview deployments
Debugging tips:
--json (-j) on any command for machine-readable output--web to open the Vercel Observability dashboard in your browser--help on any command for full usage detailsWorkflow DevKit provides a Vitest plugin for testing workflows in-process — no running server required.
Unit testing steps: Steps are just functions; without the compiler, "use step" is a no-op. Test them directly:
import { describe, it, expect } from "vitest";
import { createUser } from "./user-signup";
describe("createUser step", () => {
it("should create a user", async () => {
const user = await createUser("test@example.com");
expect(user.email).toBe("test@example.com");
});
});
Integration testing: Use @workflow/vitest for workflows using sleep(), hooks, webhooks, or retries:
// vitest.integration.config.ts
import { defineConfig } from "vitest/config";
import { workflow } from "@workflow/vitest";
export default defineConfig({
plugins: [workflow()],
test: {
include: ["**/*.integration.test.ts"],
testTimeout: 60_000,
},
});
// approval.integration.test.ts
import { describe, it, expect } from "vitest";
import { start, getRun, resumeHook } from "workflow/api";
import { waitForHook, waitForSleep } from "@workflow/vitest";
import { approvalWorkflow } from "./approval";
describe("approvalWorkflow", () => {
it("should publish when approved", async () => {
const run = await start(approvalWorkflow, ["doc-123"]);
// Wait for the hook, then resume it
await waitForHook(run, { token: "approval:doc-123" });
await resumeHook("approval:doc-123", { approved: true, reviewer: "alice" });
// Wait for sleep, then wake it up
const sleepId = await waitForSleep(run);
await getRun(run.runId).wakeUp({ correlationIds: [sleepId] });
const result = await run.returnValue;
expect(result).toEqual({ status: "published", reviewer: "alice" });
});
});
Testing webhooks: Use resumeWebhook() with a Request object — no HTTP server needed:
import { start, resumeWebhook } from "workflow/api";
import { waitForHook } from "@workflow/vitest";
const run = await start(ingestWorkflow, ["ep-1"]);
const hook = await waitForHook(run); // Discovers the random webhook token
await resumeWebhook(hook.token, new Request("https://example.com/webhook", {
method: "POST",
body: JSON.stringify({ event: "order.created" }),
}));
Key APIs:
start() — trigger a workflowrun.returnValue — await workflow completionwaitForHook(run, { token? }) / waitForSleep(run) — wait for workflow to reach a pause pointresumeHook(token, data) / resumeWebhook(token, request) — resume paused workflowsgetRun(runId).wakeUp({ correlationIds }) — skip sleep() callsBest practices:
workflow() plugin) in separate configstestTimeout — workflows may run longer than typical unit testsvi.mock() does not work in integration tests — step dependencies are bundled by esbuildWeekly Installs
1.0K
Repository
GitHub Stars
1.8K
First Seen
Jan 28, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex959
opencode957
gemini-cli945
github-copilot939
amp926
kimi-cli920
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
103,800 周安装