effect-ts by kastalien-research/thoughtbox-dot-claude
npx skills add https://github.com/kastalien-research/thoughtbox-dot-claude --skill effect-tsEffect-TS 是一个功能型 TypeScript 库,提供类型化效果、结构化并发和健壮的运行时。本技能涵盖正确的使用模式,并解决 LLM 生成内容中的常见误解。
import { Effect, Layer, Context, Fiber, Schedule, Cache, Scope } from "effect";
import { Schema, JSONSchema } from "@effect/schema";
核心类型签名:
Effect<Success, Error, Requirements>
// ↑ ↑ ↑
// | | └── 依赖项(通过 Layers 提供)
// | └── 预期错误(已类型化,必须处理)
// └── 成功值
LLM 输出通常包含错误的 API。 使用此表进行纠正:
| 错误(常见于 AI 输出) | 正确 |
|---|---|
Effect.cachedWithTTL(...) | Cache.make({ timeToLive: Duration }) |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
Effect.cachedInvalidateWithTTL(...) | cache.invalidate(key) / cache.invalidateAll() |
Effect.match(...) | Effect.either + Either.match,或 Effect.catchTag |
| "thread-local storage" | 通过 FiberRef 实现 "fiber-local storage" |
| JSON Schema Draft 2020-12 | @effect/schema 生成 Draft-07 |
| fibers 被 "cancelled" | fibers 被 "terminated" 或 "interrupted" |
| 所有队列都有背压 | 只有 bounded 队列有;sliding/dropping 队列没有 |
--only=production | --omit=dev (npm 7+) |
Effect 区分:
E 通道中类型化,必须处理// 预期错误 - 已类型化
const fetchUser = (id: string): Effect.Effect<User, UserNotFoundError | NetworkError> => ...
// 处理预期错误
const handled = pipe(
fetchUser("123"),
Effect.catchTag("UserNotFoundError", (e) => Effect.succeed(defaultUser)),
Effect.catchTag("NetworkError", (e) => Effect.retry(schedule))
);
// 意外错误(缺陷)- 由运行时捕获
Effect.catchAllDefect(program, (defect) =>
Console.error("Unexpected error", defect)
);
Fibers 是具有 原生中断 能力的轻量级虚拟线程:
// Fork 一个 fiber
const fiber = yield* Effect.fork(longRunningTask);
// 中断它(例如,当 MCP 客户端断开连接时)
yield* Fiber.interrupt(fiber);
// 结构化并发:子 fiber 随父级自动终止
const parent = Effect.gen(function* () {
yield* Effect.fork(backgroundTask); // 父级结束时自动中断
yield* mainTask;
});
// Daemon fibers 比其父级存活更久
yield* Effect.forkDaemon(longLivedBackgroundTask);
// 先成功者获胜;其他自动中断
const result = yield* Effect.race(
fetchFromCache,
fetchFromDatabase
);
// 最多 5 个并发处理 50 个文档
const results = yield* Effect.all(documents.map(processDoc), {
concurrency: 5 // 不是 "worker pool" - 限制并发任务数
});
// Bounded - 应用背压(队列满时 offer 会挂起)
const bounded = yield* Queue.bounded<string>(100);
// Dropping - 队列满时丢弃新项(无背压)
const dropping = yield* Queue.dropping<string>(100);
// Sliding - 队列满时丢弃最旧项(无背压)
const sliding = yield* Queue.sliding<string>(100);
Layers 构造服务而不泄露依赖关系:
// 定义服务
class Database extends Context.Tag("Database")<
Database,
{ query: (sql: string) => Effect.Effect<Result> }
>() {}
// 创建 layer(依赖关系在构造时处理)
const DatabaseLive = Layer.effect(
Database,
Effect.gen(function* () {
const config = yield* Config; // 依赖项在此注入
return {
query: (sql) => Effect.tryPromise(() => runQuery(sql, config))
};
})
);
// 提供给程序
const runnable = program.pipe(Effect.provide(DatabaseLive));
// 用于测试 - 交换实现
const DatabaseTest = Layer.succeed(Database, {
query: () => Effect.succeed(mockResult)
});
const program = pipe(
Effect.tryPromise(() => openConnection()),
Effect.ensuring(Console.log("Cleanup")) // 成功、失败或中断时都会运行
);
const withConnection = Effect.acquireUseRelease(
Effect.tryPromise(() => db.connect()), // 获取
(conn) => Effect.tryPromise(() => conn.query("SELECT *")), // 使用
(conn) => Effect.promise(() => conn.close()) // 释放(始终运行)
);
Effect.scoped(
Effect.gen(function* () {
const file = yield* openFile("data.txt"); // 获取
const data = yield* file.read();
return data;
}) // 作用域关闭时文件自动释放
);
不存在 Effect.cachedWithTTL。 使用 Cache 模块:
import { Cache } from "effect";
const cache = yield* Cache.make({
capacity: 100,
timeToLive: Duration.minutes(5),
lookup: (key: string) => fetchExpensiveData(key)
});
// 使用缓存
const value = yield* cache.get("my-key");
// 使失效
yield* cache.invalidate("my-key");
yield* cache.invalidateAll();
import { Schedule } from "effect";
// 使用指数退避重试 3 次
const policy = Schedule.exponential("100 millis").pipe(
Schedule.intersect(Schedule.recurs(3))
);
const robust = Effect.retry(unstableOperation, policy);
// 重试直到条件满足
const untilSuccess = Effect.retry(operation, {
until: (err) => err.code === "RATE_LIMITED"
});
@effect/schema 生成 JSON Schema Draft-07(不是 2020-12):
import { Schema, JSONSchema } from "@effect/schema";
const User = Schema.Struct({
id: Schema.String,
age: Schema.Number.pipe(Schema.positive())
});
// 生成 JSON Schema (Draft-07)
const jsonSchema = JSONSchema.make(User);
// { "$schema": "http://json-schema.org/draft-07/schema#", ... }
// 解码(解析)
const user = Schema.decodeUnknownSync(User)(rawData);
// 编码
const json = Schema.encodeSync(User)(user);
Effect 具有原生 OpenTelemetry 集成:
import { NodeSdk } from "@effect/opentelemetry";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
// 向任何 effect 添加追踪
const traced = Effect.withSpan("processRequest")(myEffect);
// 带上下文的日志记录
yield* Effect.log("Processing request");
yield* Effect.annotateLogs("requestId", "abc-123");
// FiberRef 用于 fiber-local 上下文传播
const RequestId = FiberRef.unsafeMake<string>("");
yield* FiberRef.set(RequestId, "req-456");
| 场景 | 建议 |
|---|---|
| 简单的 MCP 工具(< 100 LOC) | 使用 FastMCP 或原生 SDK |
| 团队不熟悉函数式编程 | 学习曲线陡峭;考虑 NestJS |
| 包大小至关重要 | Effect 至少增加 15-25kb gzipped |
| 现有的 NestJS/TypeORM 代码库 | 与基于类的 DI 不匹配 |
const searchTool = Effect.gen(function* () {
const args = yield* parseArgs(input);
const db = yield* Database;
const results = yield* db.query(args.query);
return formatResults(results);
}).pipe(
Effect.catchTag("ParseError", () =>
Effect.fail({ code: -32602, message: "Invalid params" })
),
Effect.catchTag("DatabaseError", () =>
Effect.fail({ code: -32603, message: "Internal error" })
)
);
// 每个 MCP 请求都有自己的作用域
const handleRequest = (request: MCPRequest) =>
Effect.scoped(
Effect.gen(function* () {
// 此处获取的资源在请求完成时自动释放
const tempFile = yield* createTempFile();
const result = yield* processRequest(request, tempFile);
return result;
})
);
每周安装量
247
代码仓库
GitHub 星标数
6
首次出现
2026年1月20日
安全审计
安装于
opencode216
codex185
gemini-cli179
github-copilot176
cursor163
amp149
Effect-TS is a functional TypeScript library providing typed effects, structured concurrency, and a robust runtime. This skill covers correct usage patterns and addresses common misconceptions from LLM-generated content.
import { Effect, Layer, Context, Fiber, Schedule, Cache, Scope } from "effect";
import { Schema, JSONSchema } from "@effect/schema";
Core Type Signature:
Effect<Success, Error, Requirements>
// ↑ ↑ ↑
// | | └── Dependencies (provided via Layers)
// | └── Expected errors (typed, must be handled)
// └── Success value
LLM outputs often contain incorrect APIs. Use this table to correct them:
| Wrong (common in AI outputs) | Correct |
|---|---|
Effect.cachedWithTTL(...) | Cache.make({ timeToLive: Duration }) |
Effect.cachedInvalidateWithTTL(...) | cache.invalidate(key) / cache.invalidateAll() |
Effect.match(...) | Effect.either + Either.match, or Effect.catchTag |
| "thread-local storage" | "fiber-local storage" via FiberRef |
| JSON Schema Draft 2020-12 | @effect/schema generates Draft-07 |
| fibers are "cancelled" | fibers are "terminated" or "interrupted" |
| all queues have back-pressure | only bounded queues; sliding/dropping do not |
--only=production | --omit=dev (npm 7+) |
Effect distinguishes between:
E channel, must be handled// Expected error - typed
const fetchUser = (id: string): Effect.Effect<User, UserNotFoundError | NetworkError> => ...
// Handle expected errors
const handled = pipe(
fetchUser("123"),
Effect.catchTag("UserNotFoundError", (e) => Effect.succeed(defaultUser)),
Effect.catchTag("NetworkError", (e) => Effect.retry(schedule))
);
// Unexpected errors (defects) - captured by runtime
Effect.catchAllDefect(program, (defect) =>
Console.error("Unexpected error", defect)
);
Fibers are lightweight virtual threads with native interruption :
// Fork a fiber
const fiber = yield* Effect.fork(longRunningTask);
// Interrupt it (e.g., when MCP client disconnects)
yield* Fiber.interrupt(fiber);
// Structured concurrency: child fibers auto-terminate with parent
const parent = Effect.gen(function* () {
yield* Effect.fork(backgroundTask); // Auto-interrupted when parent ends
yield* mainTask;
});
// Daemon fibers outlive their parent
yield* Effect.forkDaemon(longLivedBackgroundTask);
// First to succeed wins; other is automatically interrupted
const result = yield* Effect.race(
fetchFromCache,
fetchFromDatabase
);
// Process 50 documents with max 5 concurrent
const results = yield* Effect.all(documents.map(processDoc), {
concurrency: 5 // NOT a "worker pool" - limits concurrent tasks
});
// Bounded - applies back-pressure (offer suspends when full)
const bounded = yield* Queue.bounded<string>(100);
// Dropping - discards new items when full (no back-pressure)
const dropping = yield* Queue.dropping<string>(100);
// Sliding - discards oldest items when full (no back-pressure)
const sliding = yield* Queue.sliding<string>(100);
Layers construct services without leaking dependencies:
// Define a service
class Database extends Context.Tag("Database")<
Database,
{ query: (sql: string) => Effect.Effect<Result> }
>() {}
// Create layer (dependencies handled at construction)
const DatabaseLive = Layer.effect(
Database,
Effect.gen(function* () {
const config = yield* Config; // Dependency injected here
return {
query: (sql) => Effect.tryPromise(() => runQuery(sql, config))
};
})
);
// Provide to program
const runnable = program.pipe(Effect.provide(DatabaseLive));
// For testing - swap implementation
const DatabaseTest = Layer.succeed(Database, {
query: () => Effect.succeed(mockResult)
});
const program = pipe(
Effect.tryPromise(() => openConnection()),
Effect.ensuring(Console.log("Cleanup")) // Runs on success, failure, OR interrupt
);
const withConnection = Effect.acquireUseRelease(
Effect.tryPromise(() => db.connect()), // Acquire
(conn) => Effect.tryPromise(() => conn.query("SELECT *")), // Use
(conn) => Effect.promise(() => conn.close()) // Release (always runs)
);
Effect.scoped(
Effect.gen(function* () {
const file = yield* openFile("data.txt"); // Acquired
const data = yield* file.read();
return data;
}) // File automatically released when scope closes
);
There is noEffect.cachedWithTTL. Use the Cache module:
import { Cache } from "effect";
const cache = yield* Cache.make({
capacity: 100,
timeToLive: Duration.minutes(5),
lookup: (key: string) => fetchExpensiveData(key)
});
// Use the cache
const value = yield* cache.get("my-key");
// Invalidate
yield* cache.invalidate("my-key");
yield* cache.invalidateAll();
import { Schedule } from "effect";
// Retry 3 times with exponential backoff
const policy = Schedule.exponential("100 millis").pipe(
Schedule.intersect(Schedule.recurs(3))
);
const robust = Effect.retry(unstableOperation, policy);
// Retry until condition
const untilSuccess = Effect.retry(operation, {
until: (err) => err.code === "RATE_LIMITED"
});
@effect/schema generates JSON Schema Draft-07 (not 2020-12):
import { Schema, JSONSchema } from "@effect/schema";
const User = Schema.Struct({
id: Schema.String,
age: Schema.Number.pipe(Schema.positive())
});
// Generate JSON Schema (Draft-07)
const jsonSchema = JSONSchema.make(User);
// { "$schema": "http://json-schema.org/draft-07/schema#", ... }
// Decode (parse)
const user = Schema.decodeUnknownSync(User)(rawData);
// Encode
const json = Schema.encodeSync(User)(user);
Effect has native OpenTelemetry integration:
import { NodeSdk } from "@effect/opentelemetry";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
// Add tracing to any effect
const traced = Effect.withSpan("processRequest")(myEffect);
// Logging with context
yield* Effect.log("Processing request");
yield* Effect.annotateLogs("requestId", "abc-123");
// FiberRef for fiber-local context propagation
const RequestId = FiberRef.unsafeMake<string>("");
yield* FiberRef.set(RequestId, "req-456");
| Scenario | Recommendation |
|---|---|
| Simple MCP tool (< 100 LOC) | Use FastMCP or vanilla SDK |
| Team unfamiliar with FP | Steep learning curve; consider NestJS |
| Bundle size critical | Effect adds 15-25kb gzipped minimum |
| Existing NestJS/TypeORM codebase | Impedance mismatch with class-based DI |
const searchTool = Effect.gen(function* () {
const args = yield* parseArgs(input);
const db = yield* Database;
const results = yield* db.query(args.query);
return formatResults(results);
}).pipe(
Effect.catchTag("ParseError", () =>
Effect.fail({ code: -32602, message: "Invalid params" })
),
Effect.catchTag("DatabaseError", () =>
Effect.fail({ code: -32603, message: "Internal error" })
)
);
// Each MCP request gets its own scope
const handleRequest = (request: MCPRequest) =>
Effect.scoped(
Effect.gen(function* () {
// Resources acquired here auto-release when request completes
const tempFile = yield* createTempFile();
const result = yield* processRequest(request, tempFile);
return result;
})
);
Weekly Installs
247
Repository
GitHub Stars
6
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykPass
Installed on
opencode216
codex185
gemini-cli179
github-copilot176
cursor163
amp149
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
106,200 周安装