rust-async-patterns by wshobson/agents
npx skills add https://github.com/wshobson/agents --skill rust-async-patterns使用 Tokio 运行时的异步 Rust 编程生产模式,包括任务、通道、流和错误处理。
Future (lazy) → poll() → Ready(value) | Pending
↑ ↓
Waker ← Runtime schedules
| 概念 | 用途 |
|---|---|
Future | 可能稍后完成的惰性计算 |
async fn | 返回 impl Future 的函数 |
await |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 挂起直到 future 完成 |
Task | 并发运行的已生成 future |
Runtime | 轮询 future 的执行器 |
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
use tokio::time::{sleep, Duration};
use anyhow::Result;
#[tokio::main]
async fn main() -> Result<()> {
// 初始化 tracing
tracing_subscriber::fmt::init();
// 异步操作
let result = fetch_data("https://api.example.com").await?;
println!("Got: {}", result);
Ok(())
}
async fn fetch_data(url: &str) -> Result<String> {
// 模拟异步操作
sleep(Duration::from_millis(100)).await;
Ok(format!("Data from {}", url))
}
use tokio::task::JoinSet;
use anyhow::Result;
// 生成多个并发任务
async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
let mut set = JoinSet::new();
for url in urls {
set.spawn(async move {
fetch_data(&url).await
});
}
let mut results = Vec::new();
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(data)) => results.push(data),
Ok(Err(e)) => tracing::error!("Task failed: {}", e),
Err(e) => tracing::error!("Join error: {}", e),
}
}
Ok(results)
}
// 带并发限制
use futures::stream::{self, StreamExt};
async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
stream::iter(urls)
.map(|url| async move { fetch_data(&url).await })
.buffer_unordered(limit) // 最大并发任务数
.collect()
.await
}
// 选择最先完成的
use tokio::select;
async fn race_requests(url1: &str, url2: &str) -> Result<String> {
select! {
result = fetch_data(url1) => result,
result = fetch_data(url2) => result,
}
}
use tokio::sync::{mpsc, broadcast, oneshot, watch};
// 多生产者,单消费者
async fn mpsc_example() {
let (tx, mut rx) = mpsc::channel::<String>(100);
// 生成生产者
let tx2 = tx.clone();
tokio::spawn(async move {
tx2.send("Hello".to_string()).await.unwrap();
});
// 消费
while let Some(msg) = rx.recv().await {
println!("Got: {}", msg);
}
}
// 广播:多生产者,多消费者
async fn broadcast_example() {
let (tx, _) = broadcast::channel::<String>(100);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tx.send("Event".to_string()).unwrap();
// 两个接收器都收到消息
let _ = rx1.recv().await;
let _ = rx2.recv().await;
}
// 单次:单值,单次使用
async fn oneshot_example() -> String {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
tx.send("Result".to_string()).unwrap();
});
rx.await.unwrap()
}
// 监视:单生产者,多消费者,最新值
async fn watch_example() {
let (tx, mut rx) = watch::channel("initial".to_string());
tokio::spawn(async move {
loop {
// 等待变化
rx.changed().await.unwrap();
println!("New value: {}", *rx.borrow());
}
});
tx.send("updated".to_string()).unwrap();
}
use anyhow::{Context, Result, bail};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ServiceError {
#[error("Network error: {0}")]
Network(#[from] reqwest::Error),
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Not found: {0}")]
NotFound(String),
#[error("Timeout after {0:?}")]
Timeout(std::time::Duration),
}
// 使用 anyhow 处理应用程序错误
async fn process_request(id: &str) -> Result<Response> {
let data = fetch_data(id)
.await
.context("Failed to fetch data")?;
let parsed = parse_response(&data)
.context("Failed to parse response")?;
Ok(parsed)
}
// 库代码使用自定义错误
async fn get_user(id: &str) -> Result<User, ServiceError> {
let result = db.query(id).await?;
match result {
Some(user) => Ok(user),
None => Err(ServiceError::NotFound(id.to_string())),
}
}
// 超时包装器
use tokio::time::timeout;
async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
where
F: std::future::Future<Output = Result<T, ServiceError>>,
{
timeout(duration, future)
.await
.map_err(|_| ServiceError::Timeout(duration))?
}
use tokio::signal;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
async fn run_server() -> Result<()> {
// 方法 1:CancellationToken
let token = CancellationToken::new();
let token_clone = token.clone();
// 生成尊重取消的任务
tokio::spawn(async move {
loop {
tokio::select! {
_ = token_clone.cancelled() => {
tracing::info!("Task shutting down");
break;
}
_ = do_work() => {}
}
}
});
// 等待关闭信号
signal::ctrl_c().await?;
tracing::info!("Shutdown signal received");
// 取消所有任务
token.cancel();
// 给任务时间进行清理
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
}
// 方法 2:用于关闭的广播通道
async fn run_with_broadcast() -> Result<()> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let mut rx = shutdown_tx.subscribe();
tokio::spawn(async move {
tokio::select! {
_ = rx.recv() => {
tracing::info!("Received shutdown");
}
_ = async { loop { do_work().await } } => {}
}
});
signal::ctrl_c().await?;
let _ = shutdown_tx.send(());
Ok(())
}
use async_trait::async_trait;
#[async_trait]
pub trait Repository {
async fn get(&self, id: &str) -> Result<Entity>;
async fn save(&self, entity: &Entity) -> Result<()>;
async fn delete(&self, id: &str) -> Result<()>;
}
pub struct PostgresRepository {
pool: sqlx::PgPool,
}
#[async_trait]
impl Repository for PostgresRepository {
async fn get(&self, id: &str) -> Result<Entity> {
sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
.fetch_one(&self.pool)
.await
.map_err(Into::into)
}
async fn save(&self, entity: &Entity) -> Result<()> {
sqlx::query!(
"INSERT INTO entities (id, data) VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET data = $2",
entity.id,
entity.data
)
.execute(&self.pool)
.await?;
Ok(())
}
async fn delete(&self, id: &str) -> Result<()> {
sqlx::query!("DELETE FROM entities WHERE id = $1", id)
.execute(&self.pool)
.await?;
Ok(())
}
}
// 特征对象用法
async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
let entity = repo.get(id).await?;
// 处理...
repo.save(&entity).await
}
use futures::stream::{self, Stream, StreamExt};
use async_stream::stream;
// 从异步迭代器创建流
fn numbers_stream() -> impl Stream<Item = i32> {
stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
yield i;
}
}
}
// 处理流
async fn process_stream() {
let stream = numbers_stream();
// 映射和过滤
let processed: Vec<_> = stream
.filter(|n| futures::future::ready(*n % 2 == 0))
.map(|n| n * 2)
.collect()
.await;
println!("{:?}", processed);
}
// 分块处理
async fn process_in_chunks() {
let stream = numbers_stream();
let mut chunks = stream.chunks(3);
while let Some(chunk) = chunks.next().await {
println!("Processing chunk: {:?}", chunk);
}
}
// 合并多个流
async fn merge_streams() {
let stream1 = numbers_stream();
let stream2 = numbers_stream();
let merged = stream::select(stream1, stream2);
merged
.for_each(|n| async move {
println!("Got: {}", n);
})
.await;
}
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Semaphore};
// 使用 RwLock 的共享状态(读密集型场景优先)
struct Cache {
data: RwLock<HashMap<String, String>>,
}
impl Cache {
async fn get(&self, key: &str) -> Option<String> {
self.data.read().await.get(key).cloned()
}
async fn set(&self, key: String, value: String) {
self.data.write().await.insert(key, value);
}
}
// 使用信号量的连接池
struct Pool {
semaphore: Semaphore,
connections: Mutex<Vec<Connection>>,
}
impl Pool {
fn new(size: usize) -> Self {
Self {
semaphore: Semaphore::new(size),
connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
}
}
async fn acquire(&self) -> PooledConnection<'_> {
let permit = self.semaphore.acquire().await.unwrap();
let conn = self.connections.lock().await.pop().unwrap();
PooledConnection { pool: self, conn: Some(conn), _permit: permit }
}
}
struct PooledConnection<'a> {
pool: &'a Pool,
conn: Option<Connection>,
_permit: tokio::sync::SemaphorePermit<'a>,
}
impl Drop for PooledConnection<'_> {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
let pool = self.pool;
tokio::spawn(async move {
pool.connections.lock().await.push(conn);
});
}
}
}
// 启用 tokio-console 进行运行时调试
// Cargo.toml: tokio = { features = ["tracing"] }
// 运行:RUSTFLAGS="--cfg tokio_unstable" cargo run
// 然后:tokio-console
// 为异步函数添加仪器
use tracing::instrument;
#[instrument(skip(pool))]
async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
tracing::debug!("Fetching user");
// ...
}
// 跟踪任务生成
let span = tracing::info_span!("worker", id = %worker_id);
tokio::spawn(async move {
// 轮询时进入 span
}.instrument(span));
tokio::select! - 用于竞争 futureJoinSet - 用于管理多个任务CancellationTokenstd::thread::sleep? 传播或记录每周安装量
6.0K
代码仓库
GitHub 星标数
32.2K
首次出现
2026年1月20日
安全审计
安装于
claude-code4.8K
opencode3.8K
gemini-cli3.7K
codex3.6K
cursor3.4K
github-copilot3.3K
Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.
Future (lazy) → poll() → Ready(value) | Pending
↑ ↓
Waker ← Runtime schedules
| Concept | Purpose |
|---|---|
Future | Lazy computation that may complete later |
async fn | Function returning impl Future |
await | Suspend until future completes |
Task | Spawned future running concurrently |
Runtime | Executor that polls futures |
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
use tokio::time::{sleep, Duration};
use anyhow::Result;
#[tokio::main]
async fn main() -> Result<()> {
// Initialize tracing
tracing_subscriber::fmt::init();
// Async operations
let result = fetch_data("https://api.example.com").await?;
println!("Got: {}", result);
Ok(())
}
async fn fetch_data(url: &str) -> Result<String> {
// Simulated async operation
sleep(Duration::from_millis(100)).await;
Ok(format!("Data from {}", url))
}
use tokio::task::JoinSet;
use anyhow::Result;
// Spawn multiple concurrent tasks
async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
let mut set = JoinSet::new();
for url in urls {
set.spawn(async move {
fetch_data(&url).await
});
}
let mut results = Vec::new();
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(data)) => results.push(data),
Ok(Err(e)) => tracing::error!("Task failed: {}", e),
Err(e) => tracing::error!("Join error: {}", e),
}
}
Ok(results)
}
// With concurrency limit
use futures::stream::{self, StreamExt};
async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
stream::iter(urls)
.map(|url| async move { fetch_data(&url).await })
.buffer_unordered(limit) // Max concurrent tasks
.collect()
.await
}
// Select first to complete
use tokio::select;
async fn race_requests(url1: &str, url2: &str) -> Result<String> {
select! {
result = fetch_data(url1) => result,
result = fetch_data(url2) => result,
}
}
use tokio::sync::{mpsc, broadcast, oneshot, watch};
// Multi-producer, single-consumer
async fn mpsc_example() {
let (tx, mut rx) = mpsc::channel::<String>(100);
// Spawn producer
let tx2 = tx.clone();
tokio::spawn(async move {
tx2.send("Hello".to_string()).await.unwrap();
});
// Consume
while let Some(msg) = rx.recv().await {
println!("Got: {}", msg);
}
}
// Broadcast: multi-producer, multi-consumer
async fn broadcast_example() {
let (tx, _) = broadcast::channel::<String>(100);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tx.send("Event".to_string()).unwrap();
// Both receivers get the message
let _ = rx1.recv().await;
let _ = rx2.recv().await;
}
// Oneshot: single value, single use
async fn oneshot_example() -> String {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
tx.send("Result".to_string()).unwrap();
});
rx.await.unwrap()
}
// Watch: single producer, multi-consumer, latest value
async fn watch_example() {
let (tx, mut rx) = watch::channel("initial".to_string());
tokio::spawn(async move {
loop {
// Wait for changes
rx.changed().await.unwrap();
println!("New value: {}", *rx.borrow());
}
});
tx.send("updated".to_string()).unwrap();
}
use anyhow::{Context, Result, bail};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ServiceError {
#[error("Network error: {0}")]
Network(#[from] reqwest::Error),
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Not found: {0}")]
NotFound(String),
#[error("Timeout after {0:?}")]
Timeout(std::time::Duration),
}
// Using anyhow for application errors
async fn process_request(id: &str) -> Result<Response> {
let data = fetch_data(id)
.await
.context("Failed to fetch data")?;
let parsed = parse_response(&data)
.context("Failed to parse response")?;
Ok(parsed)
}
// Using custom errors for library code
async fn get_user(id: &str) -> Result<User, ServiceError> {
let result = db.query(id).await?;
match result {
Some(user) => Ok(user),
None => Err(ServiceError::NotFound(id.to_string())),
}
}
// Timeout wrapper
use tokio::time::timeout;
async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
where
F: std::future::Future<Output = Result<T, ServiceError>>,
{
timeout(duration, future)
.await
.map_err(|_| ServiceError::Timeout(duration))?
}
use tokio::signal;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
async fn run_server() -> Result<()> {
// Method 1: CancellationToken
let token = CancellationToken::new();
let token_clone = token.clone();
// Spawn task that respects cancellation
tokio::spawn(async move {
loop {
tokio::select! {
_ = token_clone.cancelled() => {
tracing::info!("Task shutting down");
break;
}
_ = do_work() => {}
}
}
});
// Wait for shutdown signal
signal::ctrl_c().await?;
tracing::info!("Shutdown signal received");
// Cancel all tasks
token.cancel();
// Give tasks time to cleanup
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
}
// Method 2: Broadcast channel for shutdown
async fn run_with_broadcast() -> Result<()> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let mut rx = shutdown_tx.subscribe();
tokio::spawn(async move {
tokio::select! {
_ = rx.recv() => {
tracing::info!("Received shutdown");
}
_ = async { loop { do_work().await } } => {}
}
});
signal::ctrl_c().await?;
let _ = shutdown_tx.send(());
Ok(())
}
use async_trait::async_trait;
#[async_trait]
pub trait Repository {
async fn get(&self, id: &str) -> Result<Entity>;
async fn save(&self, entity: &Entity) -> Result<()>;
async fn delete(&self, id: &str) -> Result<()>;
}
pub struct PostgresRepository {
pool: sqlx::PgPool,
}
#[async_trait]
impl Repository for PostgresRepository {
async fn get(&self, id: &str) -> Result<Entity> {
sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
.fetch_one(&self.pool)
.await
.map_err(Into::into)
}
async fn save(&self, entity: &Entity) -> Result<()> {
sqlx::query!(
"INSERT INTO entities (id, data) VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET data = $2",
entity.id,
entity.data
)
.execute(&self.pool)
.await?;
Ok(())
}
async fn delete(&self, id: &str) -> Result<()> {
sqlx::query!("DELETE FROM entities WHERE id = $1", id)
.execute(&self.pool)
.await?;
Ok(())
}
}
// Trait object usage
async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
let entity = repo.get(id).await?;
// Process...
repo.save(&entity).await
}
use futures::stream::{self, Stream, StreamExt};
use async_stream::stream;
// Create stream from async iterator
fn numbers_stream() -> impl Stream<Item = i32> {
stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
yield i;
}
}
}
// Process stream
async fn process_stream() {
let stream = numbers_stream();
// Map and filter
let processed: Vec<_> = stream
.filter(|n| futures::future::ready(*n % 2 == 0))
.map(|n| n * 2)
.collect()
.await;
println!("{:?}", processed);
}
// Chunked processing
async fn process_in_chunks() {
let stream = numbers_stream();
let mut chunks = stream.chunks(3);
while let Some(chunk) = chunks.next().await {
println!("Processing chunk: {:?}", chunk);
}
}
// Merge multiple streams
async fn merge_streams() {
let stream1 = numbers_stream();
let stream2 = numbers_stream();
let merged = stream::select(stream1, stream2);
merged
.for_each(|n| async move {
println!("Got: {}", n);
})
.await;
}
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Semaphore};
// Shared state with RwLock (prefer for read-heavy)
struct Cache {
data: RwLock<HashMap<String, String>>,
}
impl Cache {
async fn get(&self, key: &str) -> Option<String> {
self.data.read().await.get(key).cloned()
}
async fn set(&self, key: String, value: String) {
self.data.write().await.insert(key, value);
}
}
// Connection pool with semaphore
struct Pool {
semaphore: Semaphore,
connections: Mutex<Vec<Connection>>,
}
impl Pool {
fn new(size: usize) -> Self {
Self {
semaphore: Semaphore::new(size),
connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
}
}
async fn acquire(&self) -> PooledConnection<'_> {
let permit = self.semaphore.acquire().await.unwrap();
let conn = self.connections.lock().await.pop().unwrap();
PooledConnection { pool: self, conn: Some(conn), _permit: permit }
}
}
struct PooledConnection<'a> {
pool: &'a Pool,
conn: Option<Connection>,
_permit: tokio::sync::SemaphorePermit<'a>,
}
impl Drop for PooledConnection<'_> {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
let pool = self.pool;
tokio::spawn(async move {
pool.connections.lock().await.push(conn);
});
}
}
}
// Enable tokio-console for runtime debugging
// Cargo.toml: tokio = { features = ["tracing"] }
// Run: RUSTFLAGS="--cfg tokio_unstable" cargo run
// Then: tokio-console
// Instrument async functions
use tracing::instrument;
#[instrument(skip(pool))]
async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
tracing::debug!("Fetching user");
// ...
}
// Track task spawning
let span = tracing::info_span!("worker", id = %worker_id);
tokio::spawn(async move {
// Enters span when polled
}.instrument(span));
tokio::select! - For racing futuresJoinSet - For managing multiple tasksCancellationTokenstd::thread::sleep in async? or logWeekly Installs
6.0K
Repository
GitHub Stars
32.2K
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
claude-code4.8K
opencode3.8K
gemini-cli3.7K
codex3.6K
cursor3.4K
github-copilot3.3K
99,500 周安装