kotlin-coroutines-flows by affaan-m/everything-claude-code
npx skills add https://github.com/affaan-m/everything-claude-code --skill kotlin-coroutines-flows适用于 Android 和 Kotlin 多平台项目的结构化并发模式、基于 Flow 的响应式流以及协程测试。
Application
└── viewModelScope (ViewModel)
└── coroutineScope { } (结构化子作用域)
├── async { } (并发任务)
└── async { } (并发任务)
始终使用结构化并发——绝不使用 GlobalScope:
// 错误做法
GlobalScope.launch { fetchData() }
// 正确做法——绑定到 ViewModel 生命周期
viewModelScope.launch { fetchData() }
// 正确做法——绑定到可组合项生命周期
LaunchedEffect(key) { fetchData() }
使用 coroutineScope + async 处理并行工作:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
suspend fun loadDashboard(): Dashboard = coroutineScope {
val items = async { itemRepository.getRecent() }
val stats = async { statsRepository.getToday() }
val profile = async { userRepository.getCurrent() }
Dashboard(
items = items.await(),
stats = stats.await(),
profile = profile.await()
)
}
当子协程失败不应取消兄弟协程时,使用 supervisorScope:
suspend fun syncAll() = supervisorScope {
launch { syncItems() } // 此处失败不会取消 syncStats
launch { syncStats() }
launch { syncSettings() }
}
fun observeItems(): Flow<List<Item>> = flow {
// 每当数据库变化时重新发射数据
itemDao.observeAll()
.map { entities -> entities.map { it.toDomain() } }
.collect { emit(it) }
}
class DashboardViewModel(
observeProgress: ObserveUserProgressUseCase
) : ViewModel() {
val progress: StateFlow<UserProgress> = observeProgress()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = UserProgress.EMPTY
)
}
WhileSubscribed(5_000) 会在最后一个订阅者离开后保持上游活跃 5 秒——在配置变更时无需重启即可存活。
val uiState: StateFlow<HomeState> = combine(
itemRepository.observeItems(),
settingsRepository.observeTheme(),
userRepository.observeProfile()
) { items, theme, profile ->
HomeState(items = items, theme = theme, profile = profile)
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState())
// 防抖搜索输入
searchQuery
.debounce(300)
.distinctUntilChanged()
.flatMapLatest { query -> repository.search(query) }
.catch { emit(emptyList()) }
.collect { results -> _state.update { it.copy(results = results) } }
// 指数退避重试
fun fetchWithRetry(): Flow<Data> = flow { emit(api.fetch()) }
.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
delay(1000L * (1 shl attempt.toInt()))
true
} else {
false
}
}
class ItemListViewModel : ViewModel() {
private val _effects = MutableSharedFlow<Effect>()
val effects: SharedFlow<Effect> = _effects.asSharedFlow()
sealed interface Effect {
data class ShowSnackbar(val message: String) : Effect
data class NavigateTo(val route: String) : Effect
}
private fun deleteItem(id: String) {
viewModelScope.launch {
repository.delete(id)
_effects.emit(Effect.ShowSnackbar("Item deleted"))
}
}
}
// 在可组合函数中收集
LaunchedEffect(Unit) {
viewModel.effects.collect { effect ->
when (effect) {
is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
is Effect.NavigateTo -> navController.navigate(effect.route)
}
}
}
// CPU 密集型工作
withContext(Dispatchers.Default) { parseJson(largePayload) }
// IO 密集型工作
withContext(Dispatchers.IO) { database.query() }
// 主线程(UI)——viewModelScope 中的默认调度器
withContext(Dispatchers.Main) { updateUi() }
在 KMP 中,使用 Dispatchers.Default 和 Dispatchers.Main(所有平台可用)。Dispatchers.IO 仅适用于 JVM/Android——在其他平台上使用 Dispatchers.Default 或通过依赖注入提供。
长时间运行的循环必须检查取消状态:
suspend fun processItems(items: List<Item>) = coroutineScope {
for (item in items) {
ensureActive() // 如果已取消则抛出 CancellationException
process(item)
}
}
viewModelScope.launch {
try {
_state.update { it.copy(isLoading = true) }
val data = repository.fetch()
_state.update { it.copy(data = data) }
} finally {
_state.update { it.copy(isLoading = false) } // 始终运行,即使在取消时
}
}
@Test
fun `search updates item list`() = runTest {
val fakeRepository = FakeItemRepository().apply { emit(testItems) }
val viewModel = ItemListViewModel(GetItemsUseCase(fakeRepository))
viewModel.state.test {
assertEquals(ItemListState(), awaitItem()) // 初始状态
viewModel.onSearch("query")
val loading = awaitItem()
assertTrue(loading.isLoading)
val loaded = awaitItem()
assertFalse(loaded.isLoading)
assertEquals(1, loaded.items.size)
}
}
@Test
fun `parallel load completes correctly`() = runTest {
val viewModel = DashboardViewModel(
itemRepo = FakeItemRepo(),
statsRepo = FakeStatsRepo()
)
viewModel.load()
advanceUntilIdle()
val state = viewModel.state.value
assertNotNull(state.items)
assertNotNull(state.stats)
}
class FakeItemRepository : ItemRepository {
private val _items = MutableStateFlow<List<Item>>(emptyList())
override fun observeItems(): Flow<List<Item>> = _items
fun emit(items: List<Item>) { _items.value = items }
override suspend fun getItemsByCategory(category: String): Result<List<Item>> {
return Result.success(_items.value.filter { it.category == category })
}
}
GlobalScope——会导致协程泄漏,无法结构化取消init {} 中收集 Flow——应使用 viewModelScope.launchMutableStateFlow——始终使用不可变副本:_state.update { it.copy(list = it.list + newItem) }CancellationException——应让其传播以实现正确的取消flowOn(Dispatchers.Main) 进行收集——收集调度器是调用者的调度器@Composable 中创建 Flow 而不使用 remember——每次重组都会重新创建 Flow查看技能:compose-multiplatform-patterns 了解 Flow 的 UI 消费方式。查看技能:android-clean-architecture 了解协程在各层中的位置。
每周安装量
17
代码库
GitHub 星标数
72.1K
首次出现时间
1 天前
安全审计
已安装于
codex17
github-copilot15
kimi-cli15
amp15
cline15
gemini-cli15
Patterns for structured concurrency, Flow-based reactive streams, and coroutine testing in Android and Kotlin Multiplatform projects.
Application
└── viewModelScope (ViewModel)
└── coroutineScope { } (structured child)
├── async { } (concurrent task)
└── async { } (concurrent task)
Always use structured concurrency — never GlobalScope:
// BAD
GlobalScope.launch { fetchData() }
// GOOD — scoped to ViewModel lifecycle
viewModelScope.launch { fetchData() }
// GOOD — scoped to composable lifecycle
LaunchedEffect(key) { fetchData() }
Use coroutineScope + async for parallel work:
suspend fun loadDashboard(): Dashboard = coroutineScope {
val items = async { itemRepository.getRecent() }
val stats = async { statsRepository.getToday() }
val profile = async { userRepository.getCurrent() }
Dashboard(
items = items.await(),
stats = stats.await(),
profile = profile.await()
)
}
Use supervisorScope when child failures should not cancel siblings:
suspend fun syncAll() = supervisorScope {
launch { syncItems() } // failure here won't cancel syncStats
launch { syncStats() }
launch { syncSettings() }
}
fun observeItems(): Flow<List<Item>> = flow {
// Re-emits whenever the database changes
itemDao.observeAll()
.map { entities -> entities.map { it.toDomain() } }
.collect { emit(it) }
}
class DashboardViewModel(
observeProgress: ObserveUserProgressUseCase
) : ViewModel() {
val progress: StateFlow<UserProgress> = observeProgress()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = UserProgress.EMPTY
)
}
WhileSubscribed(5_000) keeps the upstream active for 5 seconds after the last subscriber leaves — survives configuration changes without restarting.
val uiState: StateFlow<HomeState> = combine(
itemRepository.observeItems(),
settingsRepository.observeTheme(),
userRepository.observeProfile()
) { items, theme, profile ->
HomeState(items = items, theme = theme, profile = profile)
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState())
// Debounce search input
searchQuery
.debounce(300)
.distinctUntilChanged()
.flatMapLatest { query -> repository.search(query) }
.catch { emit(emptyList()) }
.collect { results -> _state.update { it.copy(results = results) } }
// Retry with exponential backoff
fun fetchWithRetry(): Flow<Data> = flow { emit(api.fetch()) }
.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
delay(1000L * (1 shl attempt.toInt()))
true
} else {
false
}
}
class ItemListViewModel : ViewModel() {
private val _effects = MutableSharedFlow<Effect>()
val effects: SharedFlow<Effect> = _effects.asSharedFlow()
sealed interface Effect {
data class ShowSnackbar(val message: String) : Effect
data class NavigateTo(val route: String) : Effect
}
private fun deleteItem(id: String) {
viewModelScope.launch {
repository.delete(id)
_effects.emit(Effect.ShowSnackbar("Item deleted"))
}
}
}
// Collect in Composable
LaunchedEffect(Unit) {
viewModel.effects.collect { effect ->
when (effect) {
is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
is Effect.NavigateTo -> navController.navigate(effect.route)
}
}
}
// CPU-intensive work
withContext(Dispatchers.Default) { parseJson(largePayload) }
// IO-bound work
withContext(Dispatchers.IO) { database.query() }
// Main thread (UI) — default in viewModelScope
withContext(Dispatchers.Main) { updateUi() }
In KMP, use Dispatchers.Default and Dispatchers.Main (available on all platforms). Dispatchers.IO is JVM/Android only — use Dispatchers.Default on other platforms or provide via DI.
Long-running loops must check for cancellation:
suspend fun processItems(items: List<Item>) = coroutineScope {
for (item in items) {
ensureActive() // throws CancellationException if cancelled
process(item)
}
}
viewModelScope.launch {
try {
_state.update { it.copy(isLoading = true) }
val data = repository.fetch()
_state.update { it.copy(data = data) }
} finally {
_state.update { it.copy(isLoading = false) } // always runs, even on cancellation
}
}
@Test
fun `search updates item list`() = runTest {
val fakeRepository = FakeItemRepository().apply { emit(testItems) }
val viewModel = ItemListViewModel(GetItemsUseCase(fakeRepository))
viewModel.state.test {
assertEquals(ItemListState(), awaitItem()) // initial
viewModel.onSearch("query")
val loading = awaitItem()
assertTrue(loading.isLoading)
val loaded = awaitItem()
assertFalse(loaded.isLoading)
assertEquals(1, loaded.items.size)
}
}
@Test
fun `parallel load completes correctly`() = runTest {
val viewModel = DashboardViewModel(
itemRepo = FakeItemRepo(),
statsRepo = FakeStatsRepo()
)
viewModel.load()
advanceUntilIdle()
val state = viewModel.state.value
assertNotNull(state.items)
assertNotNull(state.stats)
}
class FakeItemRepository : ItemRepository {
private val _items = MutableStateFlow<List<Item>>(emptyList())
override fun observeItems(): Flow<List<Item>> = _items
fun emit(items: List<Item>) { _items.value = items }
override suspend fun getItemsByCategory(category: String): Result<List<Item>> {
return Result.success(_items.value.filter { it.category == category })
}
}
GlobalScope — leaks coroutines, no structured cancellationinit {} without a scope — use viewModelScope.launchMutableStateFlow with mutable collections — always use immutable copies: _state.update { it.copy(list = it.list + newItem) }CancellationException — let it propagate for proper cancellationflowOn(Dispatchers.Main) to collect — collection dispatcher is the caller's dispatcherFlow in @Composable without — recreates the flow every recompositionSee skill: compose-multiplatform-patterns for UI consumption of Flows. See skill: android-clean-architecture for where coroutines fit in layers.
Weekly Installs
17
Repository
GitHub Stars
72.1K
First Seen
1 day ago
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex17
github-copilot15
kimi-cli15
amp15
cline15
gemini-cli15
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
106,200 周安装
remember