fp-ts Do Notation by whatiskadudoing/fp-ts-skills
npx skills add https://github.com/whatiskadudoing/fp-ts-skills --skill 'fp-ts Do Notation'Do 表示法是 fp-ts 对回调地狱的解决方案。它提供了一种编写顺序的、看起来像命令式的代码的方式,同时保持函数式纯度和类型安全。
没有 Do 表示法时,链式依赖操作会导致深层嵌套的代码:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
// 不好:嵌套的链式地狱
const processOrder = (orderId: string) =>
pipe(
fetchOrder(orderId),
TE.chain((order) =>
pipe(
fetchUser(order.userId),
TE.chain((user) =>
pipe(
fetchInventory(order.productId),
TE.chain((inventory) =>
pipe(
validateStock(inventory, order.quantity),
TE.chain((validated) =>
pipe(
calculatePrice(order, user.discount),
TE.chain((price) =>
createInvoice(order, user, price) // 丢失了 inventory 的上下文!
)
)
)
)
)
)
)
)
)
)
嵌套链的问题:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
Do 表示法将结构扁平化,并保持所有值在作用域内:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
// 好:扁平、可读的 Do 表示法
const processOrder = (orderId: string) =>
pipe(
TE.Do,
TE.bind('order', () => fetchOrder(orderId)),
TE.bind('user', ({ order }) => fetchUser(order.userId)),
TE.bind('inventory', ({ order }) => fetchInventory(order.productId)),
TE.bind('validated', ({ inventory, order }) => validateStock(inventory, order.quantity)),
TE.bind('price', ({ order, user }) => calculatePrice(order, user.discount)),
TE.bind('invoice', ({ order, user, price }) => createInvoice(order, user, price))
)
Do - 起点Do 创建一个包裹在你的单子中的空上下文对象 {}:
import * as TE from 'fp-ts/TaskEither'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
TE.Do // TaskEither<never, {}>
E.Do // Either<never, {}>
O.Do // Option<{}>
bindTo - 用第一个值初始化当你已经有一个值并想开始 Do 表示法时,使用 bindTo:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
// 替代方案:
pipe(
TE.Do,
TE.bind('user', () => fetchUser(userId))
)
// 使用 bindTo 进行更清晰的初始化:
pipe(
fetchUser(userId),
TE.bindTo('user'),
TE.bind('orders', ({ user }) => fetchOrders(user.id))
)
bindTo 在语义上等同于 TE.map(user => ({ user })),但更具可读性。
bind - 顺序依赖操作当后续操作依赖于先前值时,使用 bind:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const getUserWithPosts = (userId: string) =>
pipe(
TE.Do,
TE.bind('user', () => fetchUser(userId)), // 第一步:获取用户
TE.bind('posts', ({ user }) => fetchPosts(user.id)), // 然后:使用 user.id
TE.bind('comments', ({ posts }) => // 然后:使用 posts
TE.traverseArray(fetchComments)(posts.map(p => p.id))
)
)
bind 的关键特性:
apS - 并行独立操作当操作相互独立且可以并行运行时,使用 apS:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const getDashboardData = (userId: string) =>
pipe(
TE.Do,
TE.bind('user', () => fetchUser(userId)),
// 这三个是独立的 - 使用 apS 进行并行执行
TE.apS('notifications', fetchNotifications(userId)),
TE.apS('settings', fetchSettings(userId)),
TE.apS('recentActivity', fetchRecentActivity(userId))
)
apS 的关键特性:
let - 计算/派生值对于从现有值派生的同步计算,使用 let:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const processPayment = (orderId: string) =>
pipe(
TE.Do,
TE.bind('order', () => fetchOrder(orderId)),
TE.bind('user', ({ order }) => fetchUser(order.userId)),
// 计算值 - 不需要异步操作
TE.let('subtotal', ({ order }) => order.items.reduce((sum, i) => sum + i.price, 0)),
TE.let('discount', ({ user, subtotal }) => subtotal * (user.discountPercent / 100)),
TE.let('total', ({ subtotal, discount }) => subtotal - discount),
TE.bind('payment', ({ total, user }) => chargeCard(user.paymentMethod, total))
)
let 的关键特性:
| 情况 | 使用 | 原因 |
|---|---|---|
| 后续操作需要先前结果 | bind | 顺序依赖 |
| 操作相互独立 | apS | 可以并行化 |
| 需要转换/计算 | let | 同步,总是成功 |
| 从现有值开始 | bindTo | 比 Do + bind 更清晰 |
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
// 慢:使用 bind 的顺序执行(总共 3 秒)
const slowDashboard = pipe(
TE.Do,
TE.bind('users', () => fetchUsers()), // 1 秒
TE.bind('products', () => fetchProducts()), // 1 秒(等待 users)
TE.bind('orders', () => fetchOrders()) // 1 秒(等待 products)
)
// 快:使用 apS 的并行执行(总共 1 秒)
const fastDashboard = pipe(
TE.Do,
TE.apS('users', fetchUsers()), // 1 秒
TE.apS('products', fetchProducts()), // 1 秒(并行运行)
TE.apS('orders', fetchOrders()) // 1 秒(并行运行)
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const getOrderDetails = (orderId: string) =>
pipe(
TE.Do,
// 顺序:需要先获取订单
TE.bind('order', () => fetchOrder(orderId)),
// 并行:这些只需要 order.userId 和 order.productId
TE.apS('user', pipe(
TE.Do,
TE.bind('order', () => fetchOrder(orderId)), // 需要重新获取或...
)),
// 更好的模式:先 bind,然后对真正独立的操作进行并行
)
// 更好:为清晰度重构
const getOrderDetailsBetter = (orderId: string) =>
pipe(
fetchOrder(orderId),
TE.bindTo('order'),
TE.bind('user', ({ order }) => fetchUser(order.userId)),
// 现在这些相互独立(但依赖于 user/order)
TE.apS('shippingOptions', fetchShippingOptions(orderId)),
TE.apS('paymentMethods', fetchPaymentMethods(orderId)),
TE.let('canCheckout', ({ user, shippingOptions }) =>
user.verified && shippingOptions.length > 0
)
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
import * as E from 'fp-ts/Either'
interface RegistrationInput {
email: string
password: string
name: string
}
interface User {
id: string
email: string
name: string
}
const registerUser = (input: RegistrationInput): TE.TaskEither<Error, User> =>
pipe(
TE.Do,
// 验证输入(同步)
TE.bind('validated', () => pipe(
validateEmail(input.email),
E.chain(() => validatePassword(input.password)),
E.map(() => input),
TE.fromEither
)),
// 检查邮箱是否存在(异步)
TE.bind('emailAvailable', ({ validated }) =>
checkEmailAvailable(validated.email)
),
// 哈希密码(异步,CPU 密集型)
TE.bind('hashedPassword', ({ validated }) =>
hashPassword(validated.password)
),
// 在数据库中创建用户
TE.bind('user', ({ validated, hashedPassword }) =>
createUser({
email: validated.email,
name: validated.name,
passwordHash: hashedPassword
})
),
// 发送欢迎邮件(触发即忘,但仍保持在链中)
TE.chainFirst(({ user }) => sendWelcomeEmail(user.email)),
// 仅返回用户
TE.map(({ user }) => user)
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
interface CheckoutResult {
orderId: string
paymentId: string
estimatedDelivery: Date
}
const checkout = (
userId: string,
cartId: string,
shippingAddressId: string
): TE.TaskEither<CheckoutError, CheckoutResult> =>
pipe(
TE.Do,
// 尽可能并行获取所需数据
TE.bind('cart', () => fetchCart(cartId)),
TE.bind('user', () => fetchUser(userId)),
TE.apS('shippingAddress', fetchAddress(shippingAddressId)),
// 验证购物车有商品
TE.bind('validatedCart', ({ cart }) =>
cart.items.length === 0
? TE.left(new CheckoutError('Cart is empty'))
: TE.right(cart)
),
// 检查所有商品的库存(并行)
TE.bind('inventoryCheck', ({ validatedCart }) =>
pipe(
validatedCart.items,
TE.traverseArray((item) => checkInventory(item.productId, item.quantity))
)
),
// 计算总计(同步)
TE.let('subtotal', ({ validatedCart }) =>
validatedCart.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
),
TE.let('tax', ({ subtotal, shippingAddress }) =>
calculateTax(subtotal, shippingAddress.state)
),
TE.let('shippingCost', ({ shippingAddress, validatedCart }) =>
calculateShipping(shippingAddress, validatedCart.totalWeight)
),
TE.let('total', ({ subtotal, tax, shippingCost }) =>
subtotal + tax + shippingCost
),
// 处理支付
TE.bind('payment', ({ user, total }) =>
processPayment(user.defaultPaymentMethod, total)
),
// 创建订单
TE.bind('order', ({ user, validatedCart, shippingAddress, payment, total }) =>
createOrder({
userId: user.id,
items: validatedCart.items,
shippingAddressId: shippingAddress.id,
paymentId: payment.id,
total
})
),
// 预留库存
TE.chainFirst(({ order, validatedCart }) =>
pipe(
validatedCart.items,
TE.traverseArray((item) => reserveInventory(item.productId, item.quantity, order.id))
)
),
// 清空购物车
TE.chainFirst(({ cart }) => clearCart(cart.id)),
// 计算预计送达时间
TE.let('estimatedDelivery', ({ shippingAddress }) =>
calculateDeliveryDate(shippingAddress)
),
// 返回结果
TE.map(({ order, payment, estimatedDelivery }) => ({
orderId: order.id,
paymentId: payment.id,
estimatedDelivery
}))
)
import { pipe } from 'fp-ts/function'
import * as RTE from 'fp-ts/ReaderTaskEither'
import * as TE from 'fp-ts/TaskEither'
// 定义依赖项
interface Deps {
userRepo: UserRepository
orderRepo: OrderRepository
paymentService: PaymentService
emailService: EmailService
logger: Logger
}
// 使用 RTE.Do 进行依赖注入的工作流
const processRefund = (
orderId: string,
reason: string
): RTE.ReaderTaskEither<Deps, RefundError, RefundResult> =>
pipe(
RTE.Do,
// 通过 RTE.asks 访问依赖项
RTE.bind('deps', () => RTE.ask<Deps>()),
// 获取订单
RTE.bind('order', ({ deps }) =>
RTE.fromTaskEither(deps.orderRepo.findById(orderId))
),
// 验证退款是否可能
RTE.bind('validatedOrder', ({ order }) =>
order.status !== 'completed'
? RTE.left(new RefundError('Order not eligible for refund'))
: RTE.right(order)
),
// 使用支付服务处理退款
RTE.bind('refund', ({ deps, validatedOrder }) =>
RTE.fromTaskEither(
deps.paymentService.refund(validatedOrder.paymentId, validatedOrder.total)
)
),
// 更新订单状态
RTE.bind('updatedOrder', ({ deps, validatedOrder, refund }) =>
RTE.fromTaskEither(
deps.orderRepo.update(validatedOrder.id, {
status: 'refunded',
refundId: refund.id,
refundReason: reason
})
)
),
// 获取用户以发送邮件
RTE.bind('user', ({ deps, validatedOrder }) =>
RTE.fromTaskEither(deps.userRepo.findById(validatedOrder.userId))
),
// 发送通知(触发即忘)
RTE.chainFirst(({ deps, user, refund }) =>
RTE.fromTaskEither(
deps.emailService.sendRefundConfirmation(user.email, refund)
)
),
// 记录退款
RTE.chainFirst(({ deps, order, refund }) =>
RTE.fromTaskEither(
deps.logger.info('Refund processed', { orderId: order.id, refundId: refund.id })
)
),
// 返回结果
RTE.map(({ refund, updatedOrder }) => ({
refundId: refund.id,
orderId: updatedOrder.id,
amount: refund.amount,
status: 'completed'
}))
)
// 使用依赖项执行
const runRefund = (deps: Deps, orderId: string, reason: string) =>
processRefund(orderId, reason)(deps)()
import { pipe } from 'fp-ts/function'
import * as E from 'fp-ts/Either'
import * as TE from 'fp-ts/TaskEither'
import * as A from 'fp-ts/Apply'
import { sequenceS } from 'fp-ts/Apply'
// 对于累积所有错误(非短路)的并行验证
// 使用 Apply.sequenceS 而不是 Do 表示法
interface ValidationError {
field: string
message: string
}
type ValidationResult<A> = E.Either<ValidationError[], A>
const validateUserInput = (input: unknown): ValidationResult<ValidUser> => {
const validateField = <A>(
field: string,
value: unknown,
validator: (v: unknown) => E.Either<string, A>
): ValidationResult<A> =>
pipe(
validator(value),
E.mapLeft((message) => [{ field, message }])
)
// 使用 sequenceS 和验证应用函子来收集所有错误
return pipe(
sequenceS(E.getApplicativeValidation(A.getSemigroup<ValidationError>()))({
email: validateField('email', input.email, validateEmail),
password: validateField('password', input.password, validatePassword),
age: validateField('age', input.age, validateAge),
name: validateField('name', input.name, validateName)
})
)
}
// 对于并行执行且累积错误的 TaskEither:
const validateUserAsync = (input: UserInput): TE.TaskEither<ValidationError[], ValidUser> =>
pipe(
sequenceS(TE.ApplicativePar)({
emailUnique: checkEmailUnique(input.email),
usernameAvailable: checkUsernameAvailable(input.username),
phoneValid: validatePhoneNumber(input.phone)
}),
TE.map(({ emailUnique, usernameAvailable, phoneValid }) => ({
...input,
emailVerified: emailUnique,
usernameVerified: usernameAvailable,
phoneVerified: phoneValid
}))
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
import * as A from 'fp-ts/Array'
interface DashboardData {
user: User
stats: UserStats
recentOrders: Order[]
recommendations: Product[]
notifications: Notification[]
}
const loadDashboard = (userId: string): TE.TaskEither<Error, DashboardData> =>
pipe(
TE.Do,
// 首先,获取用户(所有操作都需要)
TE.bind('user', () => fetchUser(userId)),
// 这些都是独立的 - 并行执行
TE.apS('stats', fetchUserStats(userId)),
TE.apS('recentOrders', fetchRecentOrders(userId)),
TE.apS('notifications', fetchNotifications(userId)),
// 推荐依赖于用户偏好
TE.bind('recommendations', ({ user }) =>
fetchRecommendations(user.preferences)
),
// 用产品详情增强订单(依赖于 recentOrders)
TE.bind('ordersWithProducts', ({ recentOrders }) =>
pipe(
recentOrders,
A.map((order) =>
pipe(
fetchProductDetails(order.productId),
TE.map((product) => ({ ...order, product }))
)
),
TE.sequenceArray
)
),
// 计算派生数据
TE.let('unreadCount', ({ notifications }) =>
notifications.filter((n) => !n.read).length
),
TE.let('totalSpent', ({ recentOrders }) =>
recentOrders.reduce((sum, o) => sum + o.total, 0)
),
// 返回最终形状
TE.map(({ user, stats, ordersWithProducts, recommendations, notifications }) => ({
user,
stats,
recentOrders: ordersWithProducts,
recommendations,
notifications
}))
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const deleteAccount = (userId: string, confirmationCode: string) =>
pipe(
TE.Do,
TE.bind('user', () => fetchUser(userId)),
// 守卫:检查确认码
TE.bind('confirmed', ({ user }) =>
confirmationCode === user.deleteConfirmationCode
? TE.right(true)
: TE.left(new Error('Invalid confirmation code'))
),
// 守卫:检查没有待处理订单
TE.bind('pendingOrders', ({ user }) => fetchPendingOrders(user.id)),
TE.bind('canDelete', ({ pendingOrders }) =>
pendingOrders.length === 0
? TE.right(true)
: TE.left(new Error('Cannot delete account with pending orders'))
),
// 继续删除
TE.bind('deleted', ({ user }) => deleteUserAccount(user.id))
)
当你想执行副作用但保留原始值时,使用 chainFirst:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const createPost = (input: PostInput) =>
pipe(
TE.Do,
TE.bind('post', () => savePost(input)),
// 记录创建(副作用,忽略结果)
TE.chainFirst(({ post }) => logPostCreation(post.id)),
// 通知关注者(副作用,忽略结果)
TE.chainFirst(({ post }) => notifyFollowers(post.authorId, post.id)),
// 为搜索建立索引(副作用,忽略结果)
TE.chainFirst(({ post }) => indexForSearch(post)),
// 仅返回帖子
TE.map(({ post }) => post)
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
import * as O from 'fp-ts/Option'
const processOrder = (orderId: string, promoCode?: string) =>
pipe(
TE.Do,
TE.bind('order', () => fetchOrder(orderId)),
// 有条件地应用促销码
TE.bind('discount', ({ order }) =>
promoCode
? validatePromoCode(promoCode, order.total)
: TE.right(0)
),
TE.let('finalTotal', ({ order, discount }) => order.total - discount)
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
import * as A from 'fp-ts/Array'
const processOrders = (orderIds: string[]) =>
pipe(
TE.Do,
// 并行获取所有订单
TE.bind('orders', () =>
pipe(
orderIds,
A.map(fetchOrder),
TE.sequenceArray
)
),
// 处理每个订单
TE.bind('processed', ({ orders }) =>
pipe(
orders,
A.map(processOrder),
TE.sequenceArray
)
),
// 聚合结果
TE.let('summary', ({ processed }) => ({
total: processed.length,
successful: processed.filter((p) => p.status === 'success').length,
failed: processed.filter((p) => p.status === 'failed').length
}))
)
// 慢:3 个顺序 API 调用
pipe(
TE.Do,
TE.bind('a', () => fetchA()), // 100ms
TE.bind('b', () => fetchB()), // 100ms
TE.bind('c', () => fetchC()) // 100ms
) // 总计:300ms
// 快:3 个并行 API 调用
pipe(
TE.Do,
TE.apS('a', fetchA()), // 100ms
TE.apS('b', fetchB()), // 100ms(并行)
TE.apS('c', fetchC()) // 100ms(并行)
) // 总计:~100ms
// 错误:对纯计算使用 bind
TE.bind('total', ({ items }) => TE.right(items.reduce((s, i) => s + i.price, 0)))
// 正确:对纯计算使用 let
TE.let('total', ({ items }) => items.reduce((s, i) => s + i.price, 0))
// 慢:N+1 查询
pipe(
TE.Do,
TE.bind('orders', () => fetchOrders(userId)),
TE.bind('products', ({ orders }) =>
pipe(
orders,
A.map((o) => fetchProduct(o.productId)), // N 次查询!
TE.sequenceArray
)
)
)
// 快:批量查询
pipe(
TE.Do,
TE.bind('orders', () => fetchOrders(userId)),
TE.bind('products', ({ orders }) =>
fetchProductsByIds(orders.map((o) => o.productId)) // 1 次查询
)
)
// 低效:重建大型上下文
pipe(
TE.Do,
TE.bind('hugeData', () => fetchHugeData()),
TE.map(({ hugeData }) => ({ hugeData, processed: true })), // 复制 hugeData
TE.bind('more', () => fetchMore()) // hugeData 仍在上下文中
)
// 更好:尽早提取所需内容
pipe(
TE.Do,
TE.bind('hugeData', () => fetchHugeData()),
TE.let('summary', ({ hugeData }) => summarize(hugeData)), // 提取摘要
TE.map(({ summary }) => summary) // 从上下文中丢弃 hugeData
)
Do 表示法将深层嵌套的回调链转换为扁平、可读的管道:
| 函数 | 目的 | 何时使用 |
|---|---|---|
Do | 开始空上下文 | 链的开头 |
bindTo | 从值开始 | 当你有初始值时 |
bind | 顺序操作 | 依赖于先前值 |
apS | 并行操作 | 独立于其他值 |
let | 纯计算 | 同步派生值 |
chainFirst | 副作用 | 触发即忘操作 |
关键原则:
bind,对独立使用 apSlet,永远不要用 bind 配合 TE.rightsequenceArray/traverseArray 结合处理集合chainFirstDo 表示法是编写可维护的 fp-ts 代码的关键。掌握它,函数式编程就能像命令式代码一样可读,同时保留其所有优点。
每周安装次数
–
代码仓库
GitHub 星标数
4
首次出现时间
–
安全审计
Do notation is fp-ts's answer to callback hell. It provides a way to write sequential, imperative-looking code while maintaining functional purity and type safety.
Without Do notation, chaining dependent operations leads to deeply nested code:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
// BAD: Nested chain hell
const processOrder = (orderId: string) =>
pipe(
fetchOrder(orderId),
TE.chain((order) =>
pipe(
fetchUser(order.userId),
TE.chain((user) =>
pipe(
fetchInventory(order.productId),
TE.chain((inventory) =>
pipe(
validateStock(inventory, order.quantity),
TE.chain((validated) =>
pipe(
calculatePrice(order, user.discount),
TE.chain((price) =>
createInvoice(order, user, price) // Lost context of inventory!
)
)
)
)
)
)
)
)
)
)
Problems with nested chains:
Do notation flattens the structure and keeps all values in scope:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
// GOOD: Flat, readable Do notation
const processOrder = (orderId: string) =>
pipe(
TE.Do,
TE.bind('order', () => fetchOrder(orderId)),
TE.bind('user', ({ order }) => fetchUser(order.userId)),
TE.bind('inventory', ({ order }) => fetchInventory(order.productId)),
TE.bind('validated', ({ inventory, order }) => validateStock(inventory, order.quantity)),
TE.bind('price', ({ order, user }) => calculatePrice(order, user.discount)),
TE.bind('invoice', ({ order, user, price }) => createInvoice(order, user, price))
)
Do - Starting PointDo creates an empty context object {} wrapped in your monad:
import * as TE from 'fp-ts/TaskEither'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
TE.Do // TaskEither<never, {}>
E.Do // Either<never, {}>
O.Do // Option<{}>
bindTo - Initialize with First ValueUse bindTo when you already have a value and want to start Do notation:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
// Instead of:
pipe(
TE.Do,
TE.bind('user', () => fetchUser(userId))
)
// Use bindTo for cleaner initialization:
pipe(
fetchUser(userId),
TE.bindTo('user'),
TE.bind('orders', ({ user }) => fetchOrders(user.id))
)
bindTo is semantically equivalent to TE.map(user => ({ user })) but more readable.
bind - Sequential Dependent OperationsUse bind when the next operation depends on previous values :
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const getUserWithPosts = (userId: string) =>
pipe(
TE.Do,
TE.bind('user', () => fetchUser(userId)), // First: get user
TE.bind('posts', ({ user }) => fetchPosts(user.id)), // Then: use user.id
TE.bind('comments', ({ posts }) => // Then: use posts
TE.traverseArray(fetchComments)(posts.map(p => p.id))
)
)
Key characteristics ofbind:
apS - Parallel Independent OperationsUse apS when operations are independent and can run in parallel:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const getDashboardData = (userId: string) =>
pipe(
TE.Do,
TE.bind('user', () => fetchUser(userId)),
// These three are INDEPENDENT - use apS for parallel execution
TE.apS('notifications', fetchNotifications(userId)),
TE.apS('settings', fetchSettings(userId)),
TE.apS('recentActivity', fetchRecentActivity(userId))
)
Key characteristics ofapS:
let - Computed/Derived ValuesUse let for synchronous computations derived from existing values:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const processPayment = (orderId: string) =>
pipe(
TE.Do,
TE.bind('order', () => fetchOrder(orderId)),
TE.bind('user', ({ order }) => fetchUser(order.userId)),
// Computed values - no async operation needed
TE.let('subtotal', ({ order }) => order.items.reduce((sum, i) => sum + i.price, 0)),
TE.let('discount', ({ user, subtotal }) => subtotal * (user.discountPercent / 100)),
TE.let('total', ({ subtotal, discount }) => subtotal - discount),
TE.bind('payment', ({ total, user }) => chargeCard(user.paymentMethod, total))
)
Key characteristics oflet:
| Situation | Use | Reason |
|---|---|---|
| Next operation needs previous result | bind | Sequential dependency |
| Operations are independent | apS | Can parallelize |
| Need to transform/compute | let | Synchronous, always succeeds |
| Starting with existing value | bindTo | Cleaner than Do + bind |
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
// SLOW: Sequential execution with bind (3 seconds total)
const slowDashboard = pipe(
TE.Do,
TE.bind('users', () => fetchUsers()), // 1 second
TE.bind('products', () => fetchProducts()), // 1 second (waits for users)
TE.bind('orders', () => fetchOrders()) // 1 second (waits for products)
)
// FAST: Parallel execution with apS (1 second total)
const fastDashboard = pipe(
TE.Do,
TE.apS('users', fetchUsers()), // 1 second
TE.apS('products', fetchProducts()), // 1 second (runs in parallel)
TE.apS('orders', fetchOrders()) // 1 second (runs in parallel)
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const getOrderDetails = (orderId: string) =>
pipe(
TE.Do,
// Sequential: need order first
TE.bind('order', () => fetchOrder(orderId)),
// Parallel: these only need order.userId and order.productId
TE.apS('user', pipe(
TE.Do,
TE.bind('order', () => fetchOrder(orderId)), // Need to refetch or...
)),
// Better pattern: bind first, then parallel for truly independent operations
)
// BETTER: Restructure for clarity
const getOrderDetailsBetter = (orderId: string) =>
pipe(
fetchOrder(orderId),
TE.bindTo('order'),
TE.bind('user', ({ order }) => fetchUser(order.userId)),
// Now these are independent of each other (but dependent on user/order)
TE.apS('shippingOptions', fetchShippingOptions(orderId)),
TE.apS('paymentMethods', fetchPaymentMethods(orderId)),
TE.let('canCheckout', ({ user, shippingOptions }) =>
user.verified && shippingOptions.length > 0
)
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
import * as E from 'fp-ts/Either'
interface RegistrationInput {
email: string
password: string
name: string
}
interface User {
id: string
email: string
name: string
}
const registerUser = (input: RegistrationInput): TE.TaskEither<Error, User> =>
pipe(
TE.Do,
// Validate input (synchronous)
TE.bind('validated', () => pipe(
validateEmail(input.email),
E.chain(() => validatePassword(input.password)),
E.map(() => input),
TE.fromEither
)),
// Check if email exists (async)
TE.bind('emailAvailable', ({ validated }) =>
checkEmailAvailable(validated.email)
),
// Hash password (async, CPU-intensive)
TE.bind('hashedPassword', ({ validated }) =>
hashPassword(validated.password)
),
// Create user in database
TE.bind('user', ({ validated, hashedPassword }) =>
createUser({
email: validated.email,
name: validated.name,
passwordHash: hashedPassword
})
),
// Send welcome email (fire and forget, but still in chain)
TE.chainFirst(({ user }) => sendWelcomeEmail(user.email)),
// Return just the user
TE.map(({ user }) => user)
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
interface CheckoutResult {
orderId: string
paymentId: string
estimatedDelivery: Date
}
const checkout = (
userId: string,
cartId: string,
shippingAddressId: string
): TE.TaskEither<CheckoutError, CheckoutResult> =>
pipe(
TE.Do,
// Fetch required data in parallel where possible
TE.bind('cart', () => fetchCart(cartId)),
TE.bind('user', () => fetchUser(userId)),
TE.apS('shippingAddress', fetchAddress(shippingAddressId)),
// Validate cart has items
TE.bind('validatedCart', ({ cart }) =>
cart.items.length === 0
? TE.left(new CheckoutError('Cart is empty'))
: TE.right(cart)
),
// Check inventory for all items (parallel)
TE.bind('inventoryCheck', ({ validatedCart }) =>
pipe(
validatedCart.items,
TE.traverseArray((item) => checkInventory(item.productId, item.quantity))
)
),
// Calculate totals (synchronous)
TE.let('subtotal', ({ validatedCart }) =>
validatedCart.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
),
TE.let('tax', ({ subtotal, shippingAddress }) =>
calculateTax(subtotal, shippingAddress.state)
),
TE.let('shippingCost', ({ shippingAddress, validatedCart }) =>
calculateShipping(shippingAddress, validatedCart.totalWeight)
),
TE.let('total', ({ subtotal, tax, shippingCost }) =>
subtotal + tax + shippingCost
),
// Process payment
TE.bind('payment', ({ user, total }) =>
processPayment(user.defaultPaymentMethod, total)
),
// Create order
TE.bind('order', ({ user, validatedCart, shippingAddress, payment, total }) =>
createOrder({
userId: user.id,
items: validatedCart.items,
shippingAddressId: shippingAddress.id,
paymentId: payment.id,
total
})
),
// Reserve inventory
TE.chainFirst(({ order, validatedCart }) =>
pipe(
validatedCart.items,
TE.traverseArray((item) => reserveInventory(item.productId, item.quantity, order.id))
)
),
// Clear cart
TE.chainFirst(({ cart }) => clearCart(cart.id)),
// Calculate delivery estimate
TE.let('estimatedDelivery', ({ shippingAddress }) =>
calculateDeliveryDate(shippingAddress)
),
// Return result
TE.map(({ order, payment, estimatedDelivery }) => ({
orderId: order.id,
paymentId: payment.id,
estimatedDelivery
}))
)
import { pipe } from 'fp-ts/function'
import * as RTE from 'fp-ts/ReaderTaskEither'
import * as TE from 'fp-ts/TaskEither'
// Define dependencies
interface Deps {
userRepo: UserRepository
orderRepo: OrderRepository
paymentService: PaymentService
emailService: EmailService
logger: Logger
}
// Use RTE.Do for dependency-injected workflows
const processRefund = (
orderId: string,
reason: string
): RTE.ReaderTaskEither<Deps, RefundError, RefundResult> =>
pipe(
RTE.Do,
// Access dependencies via RTE.asks
RTE.bind('deps', () => RTE.ask<Deps>()),
// Fetch order
RTE.bind('order', ({ deps }) =>
RTE.fromTaskEither(deps.orderRepo.findById(orderId))
),
// Validate refund is possible
RTE.bind('validatedOrder', ({ order }) =>
order.status !== 'completed'
? RTE.left(new RefundError('Order not eligible for refund'))
: RTE.right(order)
),
// Process refund with payment service
RTE.bind('refund', ({ deps, validatedOrder }) =>
RTE.fromTaskEither(
deps.paymentService.refund(validatedOrder.paymentId, validatedOrder.total)
)
),
// Update order status
RTE.bind('updatedOrder', ({ deps, validatedOrder, refund }) =>
RTE.fromTaskEither(
deps.orderRepo.update(validatedOrder.id, {
status: 'refunded',
refundId: refund.id,
refundReason: reason
})
)
),
// Fetch user for email
RTE.bind('user', ({ deps, validatedOrder }) =>
RTE.fromTaskEither(deps.userRepo.findById(validatedOrder.userId))
),
// Send notification (fire and forget)
RTE.chainFirst(({ deps, user, refund }) =>
RTE.fromTaskEither(
deps.emailService.sendRefundConfirmation(user.email, refund)
)
),
// Log the refund
RTE.chainFirst(({ deps, order, refund }) =>
RTE.fromTaskEither(
deps.logger.info('Refund processed', { orderId: order.id, refundId: refund.id })
)
),
// Return result
RTE.map(({ refund, updatedOrder }) => ({
refundId: refund.id,
orderId: updatedOrder.id,
amount: refund.amount,
status: 'completed'
}))
)
// Execute with dependencies
const runRefund = (deps: Deps, orderId: string, reason: string) =>
processRefund(orderId, reason)(deps)()
import { pipe } from 'fp-ts/function'
import * as E from 'fp-ts/Either'
import * as TE from 'fp-ts/TaskEither'
import * as A from 'fp-ts/Apply'
import { sequenceS } from 'fp-ts/Apply'
// For parallel validation that accumulates ALL errors (not short-circuit)
// Use Apply.sequenceS instead of Do notation
interface ValidationError {
field: string
message: string
}
type ValidationResult<A> = E.Either<ValidationError[], A>
const validateUserInput = (input: unknown): ValidationResult<ValidUser> => {
const validateField = <A>(
field: string,
value: unknown,
validator: (v: unknown) => E.Either<string, A>
): ValidationResult<A> =>
pipe(
validator(value),
E.mapLeft((message) => [{ field, message }])
)
// Use sequenceS with validation applicative to collect ALL errors
return pipe(
sequenceS(E.getApplicativeValidation(A.getSemigroup<ValidationError>()))({
email: validateField('email', input.email, validateEmail),
password: validateField('password', input.password, validatePassword),
age: validateField('age', input.age, validateAge),
name: validateField('name', input.name, validateName)
})
)
}
// For TaskEither with parallel execution AND error accumulation:
const validateUserAsync = (input: UserInput): TE.TaskEither<ValidationError[], ValidUser> =>
pipe(
sequenceS(TE.ApplicativePar)({
emailUnique: checkEmailUnique(input.email),
usernameAvailable: checkUsernameAvailable(input.username),
phoneValid: validatePhoneNumber(input.phone)
}),
TE.map(({ emailUnique, usernameAvailable, phoneValid }) => ({
...input,
emailVerified: emailUnique,
usernameVerified: usernameAvailable,
phoneVerified: phoneValid
}))
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
import * as A from 'fp-ts/Array'
interface DashboardData {
user: User
stats: UserStats
recentOrders: Order[]
recommendations: Product[]
notifications: Notification[]
}
const loadDashboard = (userId: string): TE.TaskEither<Error, DashboardData> =>
pipe(
TE.Do,
// First, get user (required for everything)
TE.bind('user', () => fetchUser(userId)),
// These are all independent - parallel execution
TE.apS('stats', fetchUserStats(userId)),
TE.apS('recentOrders', fetchRecentOrders(userId)),
TE.apS('notifications', fetchNotifications(userId)),
// Recommendations depend on user preferences
TE.bind('recommendations', ({ user }) =>
fetchRecommendations(user.preferences)
),
// Enhance orders with product details (depends on recentOrders)
TE.bind('ordersWithProducts', ({ recentOrders }) =>
pipe(
recentOrders,
A.map((order) =>
pipe(
fetchProductDetails(order.productId),
TE.map((product) => ({ ...order, product }))
)
),
TE.sequenceArray
)
),
// Compute derived data
TE.let('unreadCount', ({ notifications }) =>
notifications.filter((n) => !n.read).length
),
TE.let('totalSpent', ({ recentOrders }) =>
recentOrders.reduce((sum, o) => sum + o.total, 0)
),
// Return final shape
TE.map(({ user, stats, ordersWithProducts, recommendations, notifications }) => ({
user,
stats,
recentOrders: ordersWithProducts,
recommendations,
notifications
}))
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const deleteAccount = (userId: string, confirmationCode: string) =>
pipe(
TE.Do,
TE.bind('user', () => fetchUser(userId)),
// Guard: Check confirmation code
TE.bind('confirmed', ({ user }) =>
confirmationCode === user.deleteConfirmationCode
? TE.right(true)
: TE.left(new Error('Invalid confirmation code'))
),
// Guard: Check no pending orders
TE.bind('pendingOrders', ({ user }) => fetchPendingOrders(user.id)),
TE.bind('canDelete', ({ pendingOrders }) =>
pendingOrders.length === 0
? TE.right(true)
: TE.left(new Error('Cannot delete account with pending orders'))
),
// Proceed with deletion
TE.bind('deleted', ({ user }) => deleteUserAccount(user.id))
)
Use chainFirst when you want to perform a side effect but keep the original value:
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
const createPost = (input: PostInput) =>
pipe(
TE.Do,
TE.bind('post', () => savePost(input)),
// Log creation (side effect, ignore result)
TE.chainFirst(({ post }) => logPostCreation(post.id)),
// Notify followers (side effect, ignore result)
TE.chainFirst(({ post }) => notifyFollowers(post.authorId, post.id)),
// Index for search (side effect, ignore result)
TE.chainFirst(({ post }) => indexForSearch(post)),
// Return just the post
TE.map(({ post }) => post)
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
import * as O from 'fp-ts/Option'
const processOrder = (orderId: string, promoCode?: string) =>
pipe(
TE.Do,
TE.bind('order', () => fetchOrder(orderId)),
// Conditionally apply promo code
TE.bind('discount', ({ order }) =>
promoCode
? validatePromoCode(promoCode, order.total)
: TE.right(0)
),
TE.let('finalTotal', ({ order, discount }) => order.total - discount)
)
import { pipe } from 'fp-ts/function'
import * as TE from 'fp-ts/TaskEither'
import * as A from 'fp-ts/Array'
const processOrders = (orderIds: string[]) =>
pipe(
TE.Do,
// Fetch all orders in parallel
TE.bind('orders', () =>
pipe(
orderIds,
A.map(fetchOrder),
TE.sequenceArray
)
),
// Process each order
TE.bind('processed', ({ orders }) =>
pipe(
orders,
A.map(processOrder),
TE.sequenceArray
)
),
// Aggregate results
TE.let('summary', ({ processed }) => ({
total: processed.length,
successful: processed.filter((p) => p.status === 'success').length,
failed: processed.filter((p) => p.status === 'failed').length
}))
)
// SLOW: 3 sequential API calls
pipe(
TE.Do,
TE.bind('a', () => fetchA()), // 100ms
TE.bind('b', () => fetchB()), // 100ms
TE.bind('c', () => fetchC()) // 100ms
) // Total: 300ms
// FAST: 3 parallel API calls
pipe(
TE.Do,
TE.apS('a', fetchA()), // 100ms
TE.apS('b', fetchB()), // 100ms (parallel)
TE.apS('c', fetchC()) // 100ms (parallel)
) // Total: ~100ms
// WRONG: Using bind for pure computation
TE.bind('total', ({ items }) => TE.right(items.reduce((s, i) => s + i.price, 0)))
// RIGHT: Using let for pure computation
TE.let('total', ({ items }) => items.reduce((s, i) => s + i.price, 0))
// SLOW: N+1 queries
pipe(
TE.Do,
TE.bind('orders', () => fetchOrders(userId)),
TE.bind('products', ({ orders }) =>
pipe(
orders,
A.map((o) => fetchProduct(o.productId)), // N queries!
TE.sequenceArray
)
)
)
// FAST: Batch query
pipe(
TE.Do,
TE.bind('orders', () => fetchOrders(userId)),
TE.bind('products', ({ orders }) =>
fetchProductsByIds(orders.map((o) => o.productId)) // 1 query
)
)
// INEFFICIENT: Rebuilding large context
pipe(
TE.Do,
TE.bind('hugeData', () => fetchHugeData()),
TE.map(({ hugeData }) => ({ hugeData, processed: true })), // Copies hugeData
TE.bind('more', () => fetchMore()) // hugeData still in context
)
// BETTER: Extract what you need early
pipe(
TE.Do,
TE.bind('hugeData', () => fetchHugeData()),
TE.let('summary', ({ hugeData }) => summarize(hugeData)), // Extract summary
TE.map(({ summary }) => summary) // Drop hugeData from context
)
Do notation transforms deeply nested callback chains into flat, readable pipelines:
| Function | Purpose | When to Use |
|---|---|---|
Do | Start empty context | Beginning of chain |
bindTo | Start with value | When you have initial value |
bind | Sequential operation | Depends on previous values |
apS | Parallel operation | Independent of other values |
let | Pure computation | Derive values synchronously |
Key principles:
bind for dependencies, apS for independencelet for pure computations, never bind with TE.rightsequenceArray/traverseArray for collectionschainFirst for side effects that shouldn't affect the resultDo notation is the key to writing maintainable fp-ts code. Master it, and functional programming becomes as readable as imperative code while retaining all its benefits.
Weekly Installs
–
Repository
GitHub Stars
4
First Seen
–
Security Audits
Tailwind CSS v4 + shadcn/ui 生产级技术栈配置指南与最佳实践
2,600 周安装
chainFirst | Side effect | Fire-and-forget operations |