async-io-model by tursodatabase/turso
npx skills add https://github.com/tursodatabase/turso --skill async-io-modelTurso 使用协作式让出与显式状态机,而非 Rust 的 async/await。
pub enum IOCompletions {
Single(Completion),
}
#[must_use]
pub enum IOResult<T> {
Done(T), // 操作完成,这是结果
IO(IOCompletions), // 需要 I/O,在完成操作后再次调用我
}
返回 IOResult 的函数必须被重复调用,直到返回 Done。
Completion 跟踪单个 I/O 操作:
pub struct Completion { /* ... */ }
impl Completion {
pub fn finished(&self) -> bool;
pub fn succeeded(&self) -> bool;
pub fn get_error(&self) -> Option<CompletionError>;
}
要等待多个 I/O 操作,使用 CompletionGroup:
let mut group = CompletionGroup::new(|_| {});
// 添加单个完成项
group.add(&completion1);
group.add(&completion2);
// 构建成单个完成项,当所有操作完成时结束
let combined = group.build();
io_yield_one!(combined);
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
CompletionGroup 特性:
group.cancel() 取消return_if_io!展开 IOResult,将 IO 变体向上传播到调用栈:
let result = return_if_io!(some_io_operation());
// 仅当操作返回 Done 时才到达此处
io_yield_one!让出一个完成项:
io_yield_one!(completion); // 返回 Ok(IOResult::IO(Single(completion)))
可能让出的操作使用显式状态枚举:
enum MyOperationState {
Start,
WaitingForRead { page: PageRef },
Processing { data: Vec<u8> },
Done,
}
函数循环,匹配状态并进行转换:
fn my_operation(&mut self) -> Result<IOResult<Output>> {
loop {
match &mut self.state {
MyOperationState::Start => {
let (page, completion) = start_read();
self.state = MyOperationState::WaitingForRead { page };
io_yield_one!(completion);
}
MyOperationState::WaitingForRead { page } => {
let data = page.get_contents();
self.state = MyOperationState::Processing { data: data.to_vec() };
// 不让出,继续循环
}
MyOperationState::Processing { data } => {
let result = process(data);
self.state = MyOperationState::Done;
return Ok(IOResult::Done(result));
}
MyOperationState::Done => unreachable!(),
}
}
}
在让出点之前进行状态修改会导致重入时出现错误。
fn bad_example(&mut self) -> Result<IOResult<()>> {
self.counter += 1; // 修改状态
return_if_io!(something_that_might_yield()); // 如果让出,重入时会再次递增!
Ok(IOResult::Done(()))
}
如果 something_that_might_yield() 返回 IO,调用者会等待完成,然后再次调用 bad_example()。counter 会被递增两次(或更多次)。
fn good_example(&mut self) -> Result<IOResult<()>> {
return_if_io!(something_that_might_yield());
self.counter += 1; // 仅在 IO 完成后到达一次
Ok(IOResult::Done(()))
}
enum State { Start, AfterIO }
fn good_example(&mut self) -> Result<IOResult<()>> {
loop {
match self.state {
State::Start => {
// 不要在此处修改共享状态
self.state = State::AfterIO;
return_if_io!(something_that_might_yield());
}
State::AfterIO => {
self.counter += 1; // 安全:仅进入一次
return Ok(IOResult::Done(()));
}
}
}
}
| 模式 | 问题 |
|---|---|
vec.push(x); return_if_io!(...) | 每次重入时 Vec 都会增长 |
idx += 1; return_if_io!(...) | 索引多次前进 |
map.insert(k,v); return_if_io!(...) | 重复插入或覆盖 |
flag = true; return_if_io!(...) | 通常没问题,但需检查逻辑 |
将进度编码在状态变体中:
// 良好:索引是状态的一部分,在让出期间保留
enum ProcessState {
Start,
ProcessingItem { idx: usize, items: Vec<Item> },
Done,
}
// 循环仅在转换状态时推进 idx
ProcessingItem { idx, items } => {
return_if_io!(process_item(&items[idx]));
if idx + 1 < items.len() {
self.state = ProcessingItem { idx: idx + 1, items };
} else {
self.state = Done;
}
}
关键文件:
core/types.rs - IOResult、IOCompletions、return_if_io!、return_and_restore_if_io!core/io/completions.rs - Completion、CompletionGroupcore/util.rs - io_yield_one! 宏core/state_machine.rs - 通用的 StateMachine 包装器core/storage/btree.rs - 许多状态机示例core/storage/pager.rs - CompletionGroup 使用示例重入性错误通常只在特定的 I/O 时序下才会显现。使用:
testing/simulator/)testing/concurrent-simulator/)docs/manual.md 中关于 I/O 的部分每周安装量
380
代码仓库
GitHub 星标数
17.9K
首次出现
2026年1月28日
安全审计
安装于
claude-code297
github-copilot183
codex120
opencode120
gemini-cli118
cursor110
Turso uses cooperative yielding with explicit state machines instead of Rust async/await.
pub enum IOCompletions {
Single(Completion),
}
#[must_use]
pub enum IOResult<T> {
Done(T), // Operation complete, here's the result
IO(IOCompletions), // Need I/O, call me again after completions finish
}
Functions returning IOResult must be called repeatedly until Done.
A Completion tracks a single I/O operation:
pub struct Completion { /* ... */ }
impl Completion {
pub fn finished(&self) -> bool;
pub fn succeeded(&self) -> bool;
pub fn get_error(&self) -> Option<CompletionError>;
}
To wait for multiple I/O operations, use CompletionGroup:
let mut group = CompletionGroup::new(|_| {});
// Add individual completions
group.add(&completion1);
group.add(&completion2);
// Build into single completion that finishes when all complete
let combined = group.build();
io_yield_one!(combined);
CompletionGroup features:
group.cancel()return_if_io!Unwraps IOResult, propagates IO variant up the call stack:
let result = return_if_io!(some_io_operation());
// Only reaches here if operation returned Done
io_yield_one!Yields a single completion:
io_yield_one!(completion); // Returns Ok(IOResult::IO(Single(completion)))
Operations that may yield use explicit state enums:
enum MyOperationState {
Start,
WaitingForRead { page: PageRef },
Processing { data: Vec<u8> },
Done,
}
The function loops, matching on state and transitioning:
fn my_operation(&mut self) -> Result<IOResult<Output>> {
loop {
match &mut self.state {
MyOperationState::Start => {
let (page, completion) = start_read();
self.state = MyOperationState::WaitingForRead { page };
io_yield_one!(completion);
}
MyOperationState::WaitingForRead { page } => {
let data = page.get_contents();
self.state = MyOperationState::Processing { data: data.to_vec() };
// No yield, continue loop
}
MyOperationState::Processing { data } => {
let result = process(data);
self.state = MyOperationState::Done;
return Ok(IOResult::Done(result));
}
MyOperationState::Done => unreachable!(),
}
}
}
State mutations before yield points cause bugs on re-entry.
fn bad_example(&mut self) -> Result<IOResult<()>> {
self.counter += 1; // Mutates state
return_if_io!(something_that_might_yield()); // If yields, re-entry will increment again!
Ok(IOResult::Done(()))
}
If something_that_might_yield() returns IO, caller waits for completion, then calls bad_example() again. counter gets incremented twice (or more).
fn good_example(&mut self) -> Result<IOResult<()>> {
return_if_io!(something_that_might_yield());
self.counter += 1; // Only reached once, after IO completes
Ok(IOResult::Done(()))
}
enum State { Start, AfterIO }
fn good_example(&mut self) -> Result<IOResult<()>> {
loop {
match self.state {
State::Start => {
// Don't mutate shared state here
self.state = State::AfterIO;
return_if_io!(something_that_might_yield());
}
State::AfterIO => {
self.counter += 1; // Safe: only entered once
return Ok(IOResult::Done(()));
}
}
}
}
| Pattern | Problem |
|---|---|
vec.push(x); return_if_io!(...) | Vec grows on each re-entry |
idx += 1; return_if_io!(...) | Index advances multiple times |
map.insert(k,v); return_if_io!(...) | Duplicate inserts or overwrites |
flag = true; return_if_io!(...) | Usually ok, but check logic |
Encode progress in state variants:
// Good: index is part of state, preserved across yields
enum ProcessState {
Start,
ProcessingItem { idx: usize, items: Vec<Item> },
Done,
}
// Loop advances idx only when transitioning states
ProcessingItem { idx, items } => {
return_if_io!(process_item(&items[idx]));
if idx + 1 < items.len() {
self.state = ProcessingItem { idx: idx + 1, items };
} else {
self.state = Done;
}
}
Key files:
core/types.rs - IOResult, IOCompletions, return_if_io!, return_and_restore_if_io!core/io/completions.rs - Completion, CompletionGroupcore/util.rs - io_yield_one! macrocore/state_machine.rs - Generic wrapperRe-entrancy bugs often only manifest under specific IO timing. Use:
testing/simulator/)testing/concurrent-simulator/)docs/manual.md section on I/OWeekly Installs
380
Repository
GitHub Stars
17.9K
First Seen
Jan 28, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
claude-code297
github-copilot183
codex120
opencode120
gemini-cli118
cursor110
Android 整洁架构指南:模块化设计、依赖注入与数据层实现
902 周安装
StateMachinecore/storage/btree.rs - Many state machine examplescore/storage/pager.rs - CompletionGroup usage examples