golang-samber-ro by samber/cc-skills-golang
npx skills add https://github.com/samber/cc-skills-golang --skill golang-samber-ro角色设定: 你是一位 Go 工程师,当数据流是异步或无限时,你会选择使用响应式流。你使用 samber/ro 来构建声明式管道,而不是手动编写 goroutine/通道连接逻辑,但你也知道何时使用简单的切片 + samber/lo 就足够了。
思考模式: 在设计高级响应式管道或在冷/热可观察对象、主题以及组合操作符之间进行选择时,请使用 ultrathink。错误的架构会导致资源泄漏或事件丢失。
ReactiveX 的 Go 语言实现。采用泛型优先、类型安全、可组合的管道,用于处理异步数据流,具备自动背压、错误传播、上下文集成和资源清理功能。包含 150+ 个操作符、5 种主题类型、40+ 个插件。
官方资源:
此技能介绍并非详尽无遗。更多信息请参考库文档和代码示例。Context7 可以作为发现平台提供帮助。
对于复杂的异步管道,Go 的通道 + goroutine 会变得难以管理:需要手动关闭通道、冗长的 goroutine 生命周期管理、跨嵌套 select 语句的错误传播,以及缺乏可组合的操作符。samber/ro 通过声明式、可链式调用的流操作符解决了这些问题。
何时使用哪种工具:
| 场景 | 工具 |
|---|
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 原因 |
|---|
| 转换切片(map、filter、reduce) | samber/lo | 有限的、同步的、急切的——不需要流开销 |
| 带错误处理的简单 goroutine 扇出 | errgroup | 标准库、轻量级、适用于有限的并发 |
| 无限事件流(WebSocket、定时器、文件监视器) | samber/ro | 具备背压、重试、超时、组合功能的声明式管道 |
| 来自多个异步源的实时数据丰富 | samber/ro | CombineLatest/Zip 组合依赖流,无需手动 select |
| 具有多个消费者共享一个源的发布/订阅 | samber/ro | 热可观察对象(Share/Subjects)原生处理多播 |
主要区别:lo vs ro
| 方面 | samber/lo | samber/ro |
|---|---|---|
| 数据 | 有限的切片 | 无限的流 |
| 执行 | 同步、阻塞 | 异步、非阻塞 |
| 求值 | 急切(分配中间切片) | 惰性(处理到达的项) |
| 时序 | 立即 | 时间感知(延迟、节流、间隔、超时) |
| 错误模型 | 每次调用返回 (T, error) | 错误通道通过管道传播 |
| 用例 | 集合转换 | 事件驱动、实时、异步管道 |
go get github.com/samber/ro
四个构建模块:
onNext(T)、onError(error)、onComplete()Pipe 链接.Wait() 来阻塞,或调用 .Unsubscribe() 来取消observable := ro.Pipe2(
ro.RangeWithInterval(0, 5, 1*time.Second),
ro.Filter(func(x int) bool { return x%2 == 0 }),
ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)
observable.Subscribe(ro.NewObserver(
func(s string) { fmt.Println(s) }, // onNext
func(err error) { log.Println(err) }, // onError
func() { fmt.Println("Done!") }, // onComplete
))
// 输出:"even-0", "even-2", "even-4", "Done!"
// 或者同步收集:
values, err := ro.Collect(observable)
冷(默认):每次 .Subscribe() 都会启动一个新的独立执行。安全且可预测——默认使用。
热:多个订阅者共享一个执行。当源开销较大(WebSocket、数据库轮询)或订阅者必须看到相同事件时使用。
| 使用转换 | 行为 |
|---|---|
Share() | 冷 → 热,带引用计数。最后一个订阅取消时拆除 |
ShareReplay(n) | 与 Share 相同 + 为迟到的订阅者缓冲最后 N 个值 |
Connectable() | 冷 → 热,但等待显式的 .Connect() 调用 |
| Subjects | 原生热——直接调用 .Send()、.Error()、.Complete() |
| 主题 | 构造函数 |
| --- | --- |
PublishSubject | NewPublishSubject[T]() |
BehaviorSubject | NewBehaviorSubject[T](initial) |
ReplaySubject | NewReplaySubject[T](bufferSize) |
AsyncSubject | NewAsyncSubject[T]() |
UnicastSubject | NewUnicastSubject[T](bufferSize) |
关于主题详情和热可观察对象模式,请参阅主题指南。
| 类别 | 关键操作符 | 用途 |
|---|---|---|
| 创建 | Just、FromSlice、FromChannel、Range、Interval、Defer、Future | 从各种源创建可观察对象 |
| 转换 | Map、MapErr、FlatMap、Scan、Reduce、GroupBy | 转换或累积流值 |
| 过滤 | Filter、Take、TakeLast、Skip、Distinct、Find、First、Last | 选择性发出值 |
| 组合 | Merge、Concat、Zip2–Zip6、CombineLatest2–CombineLatest5、Race | 合并多个可观察对象 |
| 错误 | Catch、OnErrorReturn、OnErrorResumeNextWith、Retry、RetryWithConfig | 从错误中恢复 |
| 时序 | Delay、DelayEach、Timeout、ThrottleTime、SampleTime、BufferWithTime | 控制发出时序 |
| 副作用 | Tap/Do、TapOnNext、TapOnError、TapOnComplete | 观察而不改变流 |
| 终端 | Collect、ToSlice、ToChannel、ToMap | 将流消费为 Go 类型 |
使用类型化的 Pipe2、Pipe3 ... Pipe25 以获得跨操作符链的编译时类型安全。非类型化的 Pipe 使用 any 并失去类型检查。
关于完整的操作符目录(150+ 个操作符及其签名),请参阅操作符指南。
| 错误 | 失败原因 | 修复方法 |
|---|---|---|
使用 ro.OnNext() 而没有错误处理程序 | 错误被静默丢弃——生产环境中隐藏 bug | 使用 ro.NewObserver(onNext, onError, onComplete) 并包含全部 3 个回调 |
使用非类型化的 Pipe() 而不是 Pipe2/Pipe3 | 失去编译时类型安全,错误在运行时出现 | 使用 Pipe2、Pipe3...Pipe25 进行类型化操作符链 |
忘记在无限流上调用 .Unsubscribe() | Goroutine 泄漏——可观察对象永远运行 | 使用 TakeUntil(signal)、上下文取消或显式 Unsubscribe() |
在冷流足够时使用 Share() | 不必要的复杂性,生命周期更难推理 | 仅当多个消费者需要同一流时才使用热可观察对象 |
对有限的切片转换使用 samber/ro | 为同步操作带来流开销(goroutine、订阅) | 使用 samber/lo——它更简单、更快,专为切片构建 |
| 未传播上下文以进行取消 | 流忽略关闭信号,导致终止时资源泄漏 | 在管道中链接 ContextWithTimeout 或 ThrowOnContextCancel |
NewObserver(onNext, onError, onComplete),而不仅仅是 OnNext。未处理的错误会导致静默数据丢失Collect() 进行同步消费 — 当流是有限的且你需要 []T 时,Collect 会阻塞直到完成,并返回切片 + 错误Pipe2、Pipe3...Pipe25 在编译时捕获类型不匹配。将非类型化的 Pipe 保留用于动态操作符链Take(n)、TakeUntil(signal)、Timeout(d) 或上下文取消。无界流会导致 goroutine 泄漏Tap/Do 进行可观测性 — 在不改变流的情况下记录、追踪或计量发出事件。链接 TapOnError 进行错误监控samber/lo — 如果数据是有限的切片,并且你需要 Map/Filter/Reduce,请使用 lo。当数据随时间到达、来自多个源或需要重试/超时/背压时,再使用 ro40+ 个插件通过特定领域的操作符扩展了 ro:
| 类别 | 插件 | 导入路径前缀 |
|---|---|---|
| 编码 | JSON、CSV、Base64、Gob | plugins/encoding/... |
| 网络 | HTTP、I/O、FSNotify | plugins/http、plugins/io、plugins/fsnotify |
| 调度 | Cron、ICS | plugins/cron、plugins/ics |
| 可观测性 | Zap、Slog、Zerolog、Logrus、Sentry、Oops | plugins/observability/...、plugins/samber/oops |
| 速率限制 | Native、Ulule | plugins/ratelimit/... |
| 数据 | Bytes、Strings、Sort、Strconv、Regexp、Template | plugins/bytes、plugins/strings 等 |
| 系统 | Process、Signal | plugins/proc、plugins/signal |
关于完整的插件目录及其导入路径和使用示例,请参阅插件生态系统。
关于现实世界的响应式模式(重试+超时、WebSocket 扇出、优雅关闭、流组合),请参阅模式。
如果你在 samber/ro 中遇到 bug 或意外行为,请在 github.com/samber/ro/issues 提交 issue。
samber/cc-skills-golang@golang-samber-lo 技能,了解有限切片转换(Map、Filter、Reduce、GroupBy)——当数据已经在切片中时使用 losamber/cc-skills-golang@golang-samber-mo 技能,了解可与 ro 管道组合的单子类型(Option、Result、Either)samber/cc-skills-golang@golang-samber-hot 技能,了解内存缓存(也可作为 ro 插件使用)samber/cc-skills-golang@golang-concurrency 技能,了解当响应式流过度复杂时的 goroutine/通道模式samber/cc-skills-golang@golang-observability 技能,了解在生产环境中监控响应式管道每周安装次数
96
代码仓库
GitHub 星标数
276
首次出现
3 天前
安全审计
安装于
opencode91
gemini-cli88
codex88
cursor88
kimi-cli87
amp87
Persona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.
Thinking mode: Use ultrathink when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.
Go implementation of ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.
Official Resources:
This skill is not exhaustive. Please refer to library documentation and code examples for more informations. Context7 can help as a discoverability platform.
Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators. samber/ro solves this with declarative, chainable stream operators.
When to use which tool:
| Scenario | Tool | Why |
|---|---|---|
| Transform a slice (map, filter, reduce) | samber/lo | Finite, synchronous, eager — no stream overhead needed |
| Simple goroutine fan-out with error handling | errgroup | Standard lib, lightweight, sufficient for bounded concurrency |
| Infinite event stream (WebSocket, tickers, file watcher) | samber/ro | Declarative pipeline with backpressure, retry, timeout, combine |
| Real-time data enrichment from multiple async sources | samber/ro | CombineLatest/Zip compose dependent streams without manual select |
| Pub/sub with multiple consumers sharing one source |
Key differences: lo vs ro
| Aspect | samber/lo | samber/ro |
|---|---|---|
| Data | Finite slices | Infinite streams |
| Execution | Synchronous, blocking | Asynchronous, non-blocking |
| Evaluation | Eager (allocates intermediate slices) | Lazy (processes items as they arrive) |
| Timing | Immediate | Time-aware (delay, throttle, interval, timeout) |
| Error model | Return (T, error) per call | Error channel propagates through pipeline |
| Use case | Collection transforms | Event-driven, real-time, async pipelines |
go get github.com/samber/ro
Four building blocks:
onNext(T), onError(error), onComplete()Pipe.Wait() to block or .Unsubscribe() to cancelobservable := ro.Pipe2(
ro.RangeWithInterval(0, 5, 1*time.Second),
ro.Filter(func(x int) bool { return x%2 == 0 }),
ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)
observable.Subscribe(ro.NewObserver(
func(s string) { fmt.Println(s) }, // onNext
func(err error) { log.Println(err) }, // onError
func() { fmt.Println("Done!") }, // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"
// Or collect synchronously:
values, err := ro.Collect(observable)
Cold (default): each .Subscribe() starts a new independent execution. Safe and predictable — use by default.
Hot : multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.
| Convert with | Behavior |
|---|---|
Share() | Cold → hot with reference counting. Last unsubscribe tears down |
ShareReplay(n) | Same as Share + buffers last N values for late subscribers |
Connectable() | Cold → hot, but waits for explicit .Connect() call |
| Subjects | Natively hot — call .Send(), .Error(), .Complete() directly |
For subject details and hot observable patterns, see Subjects Guide.
| Category | Key operators | Purpose |
|---|---|---|
| Creation | Just, FromSlice, FromChannel, Range, Interval, Defer, Future | Create observables from various sources |
| Transform | Map, MapErr, , , , |
Use typed Pipe2, Pipe3 ... Pipe25 for compile-time type safety across operator chains. The untyped Pipe uses any and loses type checking.
For the complete operator catalog (150+ operators with signatures), see Operators Guide.
| Mistake | Why it fails | Fix |
|---|---|---|
Using ro.OnNext() without error handler | Errors are silently dropped — bugs hide in production | Use ro.NewObserver(onNext, onError, onComplete) with all 3 callbacks |
Using untyped Pipe() instead of Pipe2/Pipe3 | Loses compile-time type safety, errors surface at runtime | Use Pipe2, Pipe3...Pipe25 for typed operator chains |
NewObserver(onNext, onError, onComplete), not just OnNext. Unhandled errors cause silent data lossCollect() for synchronous consumption — when the stream is finite and you need []T, Collect blocks until complete and returns the slice + errorPipe2, Pipe3...Pipe25 catch type mismatches at compile time. Reserve untyped Pipe for dynamic operator chains40+ plugins extend ro with domain-specific operators:
| Category | Plugins | Import path prefix |
|---|---|---|
| Encoding | JSON, CSV, Base64, Gob | plugins/encoding/... |
| Network | HTTP, I/O, FSNotify | plugins/http, plugins/io, plugins/fsnotify |
| Scheduling | Cron, ICS | plugins/cron, plugins/ics |
| Observability | Zap, Slog, Zerolog, Logrus, Sentry, Oops |
For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.
For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.
If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.
samber/cc-skills-golang@golang-samber-lo skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slicesamber/cc-skills-golang@golang-samber-mo skill for monadic types (Option, Result, Either) that compose with ro pipelinessamber/cc-skills-golang@golang-samber-hot skill for in-memory caching (also available as an ro plugin)samber/cc-skills-golang@golang-concurrency skill for goroutine/channel patterns when reactive streams are overkillsamber/cc-skills-golang@golang-observability skill for monitoring reactive pipelines in productionWeekly Installs
96
Repository
GitHub Stars
276
First Seen
3 days ago
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode91
gemini-cli88
codex88
cursor88
kimi-cli87
amp87
免费AI数据抓取智能体:自动化收集、丰富与存储网站/API数据
1,100 周安装
samber/ro| Hot observables (Share/Subjects) handle multicast natively |
| Subject | Constructor |
| --- | --- |
PublishSubject | NewPublishSubject[T]() |
BehaviorSubject | NewBehaviorSubject[T](initial) |
ReplaySubject | NewReplaySubject[T](bufferSize) |
AsyncSubject | NewAsyncSubject[T]() |
UnicastSubject | NewUnicastSubject[T](bufferSize) |
FlatMapScanReduceGroupBy| Transform or accumulate stream values |
| Filter | Filter, Take, TakeLast, Skip, Distinct, Find, First, Last | Selectively emit values |
| Combine | Merge, Concat, Zip2–Zip6, CombineLatest2–CombineLatest5, Race | Merge multiple observables |
| Error | Catch, OnErrorReturn, OnErrorResumeNextWith, Retry, RetryWithConfig | Recover from errors |
| Timing | Delay, DelayEach, Timeout, ThrottleTime, SampleTime, BufferWithTime | Control emission timing |
| Side effect | Tap/Do, TapOnNext, TapOnError, TapOnComplete | Observe without altering stream |
| Terminal | Collect, ToSlice, ToChannel, ToMap | Consume stream into Go types |
Forgetting .Unsubscribe() on infinite streams | Goroutine leak — the observable runs forever | Use TakeUntil(signal), context cancellation, or explicit Unsubscribe() |
Using Share() when cold is sufficient | Unnecessary complexity, harder to reason about lifecycle | Use hot observables only when multiple consumers need the same stream |
Using samber/ro for finite slice transforms | Stream overhead (goroutines, subscriptions) for a synchronous operation | Use samber/lo — it's simpler, faster, and purpose-built for slices |
| Not propagating context for cancellation | Streams ignore shutdown signals, causing resource leaks on termination | Chain ContextWithTimeout or ThrowOnContextCancel in the pipeline |
Take(n)TakeUntil(signal)Timeout(d)Tap/Do for observability — log, trace, or meter emissions without altering the stream. Chain TapOnError for error monitoringsamber/lo for simple transforms — if the data is a finite slice and you need Map/Filter/Reduce, use lo. Reach for ro when data arrives over time, from multiple sources, or needs retry/timeout/backpressureplugins/observability/..., plugins/samber/oops |
| Rate limiting | Native, Ulule | plugins/ratelimit/... |
| Data | Bytes, Strings, Sort, Strconv, Regexp, Template | plugins/bytes, plugins/strings, etc. |
| System | Process, Signal | plugins/proc, plugins/signal |