async-python-patterns by wshobson/agents
npx skills add https://github.com/wshobson/agents --skill async-python-patterns使用 asyncio、并发编程模式以及 async/await 构建高性能、非阻塞系统的异步 Python 应用程序实施综合指南。
在采用异步之前,请考虑它是否适合您的用例。
| 用例 | 推荐方法 |
|---|---|
| 大量并发网络/数据库调用 | asyncio |
| CPU 密集型计算 | multiprocessing 或线程池 |
| 混合 I/O + CPU | 使用 asyncio.to_thread() 卸载 CPU 工作 |
| 简单脚本,连接数少 | 同步(更简单,更易于调试) |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 高并发 Web API | 异步框架(FastAPI、aiohttp) |
关键规则: 在调用路径内保持完全同步或完全异步。混合使用会创建隐藏的阻塞和复杂性。
事件循环是 asyncio 的核心,负责管理和调度异步任务。
关键特性:
使用 async def 定义的函数,可以暂停和恢复。
语法:
async def my_coroutine():
result = await some_async_operation()
return result
在事件循环上并发运行的已调度协程。
表示异步操作最终结果的低级对象。
支持 async with 以进行适当清理的资源。
支持 async for 以遍历异步数据源的对象。
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7+
asyncio.run(main())
import asyncio
async def fetch_data(url: str) -> dict:
"""异步从 URL 获取数据。"""
await asyncio.sleep(1) # 模拟 I/O
return {"url": url, "data": "result"}
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())
import asyncio
from typing import List
async def fetch_user(user_id: int) -> dict:
"""获取用户数据。"""
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
"""并发获取多个用户。"""
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
async def main():
user_ids = [1, 2, 3, 4, 5]
users = await fetch_all_users(user_ids)
print(f"Fetched {len(users)} users")
asyncio.run(main())
import asyncio
async def background_task(name: str, delay: int):
"""长时间运行的后台任务。"""
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} completed")
return f"Result from {name}"
async def main():
# 创建任务
task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))
# 执行其他工作
print("Main: doing other work")
await asyncio.sleep(0.5)
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
import asyncio
from typing import List, Optional
async def risky_operation(item_id: int) -> dict:
"""可能失败的操作。"""
await asyncio.sleep(0.1)
if item_id % 3 == 0:
raise ValueError(f"Item {item_id} failed")
return {"id": item_id, "status": "success"}
async def safe_operation(item_id: int) -> Optional[dict]:
"""带有错误处理的包装器。"""
try:
return await risky_operation(item_id)
except ValueError as e:
print(f"Error: {e}")
return None
async def process_items(item_ids: List[int]):
"""处理多个项目并处理错误。"""
tasks = [safe_operation(iid) for iid in item_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤掉失败项
successful = [r for r in results if r is not None and not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"Success: {len(successful)}, Failed: {len(failed)}")
return successful
asyncio.run(process_items([1, 2, 3, 4, 5, 6]))
import asyncio
async def slow_operation(delay: int) -> str:
"""耗时的操作。"""
await asyncio.sleep(delay)
return f"Completed after {delay}s"
async def with_timeout():
"""带超时执行操作。"""
try:
result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())
import asyncio
from typing import Optional
class AsyncDatabaseConnection:
"""异步数据库连接上下文管理器。"""
def __init__(self, dsn: str):
self.dsn = dsn
self.connection: Optional[object] = None
async def __aenter__(self):
print("Opening connection")
await asyncio.sleep(0.1) # 模拟连接
self.connection = {"dsn": self.dsn, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.1) # 模拟清理
self.connection = None
async def query_database():
"""使用异步上下文管理器。"""
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
print(f"Using connection: {conn}")
await asyncio.sleep(0.2) # 模拟查询
return {"rows": 10}
asyncio.run(query_database())
import asyncio
from typing import AsyncIterator
async def async_range(start: int, end: int, delay: float = 0.1) -> AsyncIterator[int]:
"""带有延迟的异步生成器,生成数字。"""
for i in range(start, end):
await asyncio.sleep(delay)
yield i
async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
"""异步获取分页数据。"""
for page in range(1, max_pages + 1):
await asyncio.sleep(0.2) # 模拟 API 调用
yield {
"page": page,
"url": f"{url}?page={page}",
"data": [f"item_{page}_{i}" for i in range(5)]
}
async def consume_async_iterator():
"""消费异步迭代器。"""
async for number in async_range(1, 5):
print(f"Number: {number}")
print("\nFetching pages:")
async for page_data in fetch_pages("https://api.example.com/items", 3):
print(f"Page {page_data['page']}: {len(page_data['data'])} items")
asyncio.run(consume_async_iterator())
import asyncio
from asyncio import Queue
from typing import Optional
async def producer(queue: Queue, producer_id: int, num_items: int):
"""生产项目并将其放入队列。"""
for i in range(num_items):
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # 发送完成信号
async def consumer(queue: Queue, consumer_id: int):
"""从队列中消费项目。"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer {consumer_id} processing: {item}")
await asyncio.sleep(0.2) # 模拟工作
queue.task_done()
async def producer_consumer_example():
"""运行生产者-消费者模式。"""
queue = Queue(maxsize=10)
# 创建任务
producers = [
asyncio.create_task(producer(queue, i, 5))
for i in range(2)
]
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# 等待生产者完成
await asyncio.gather(*producers)
# 等待队列清空
await queue.join()
# 取消消费者任务
for c in consumers:
c.cancel()
asyncio.run(producer_consumer_example())
import asyncio
from typing import List
async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
"""进行带速率限制的 API 调用。"""
async with semaphore:
print(f"Calling {url}")
await asyncio.sleep(0.5) # 模拟 API 调用
return {"url": url, "status": 200}
async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
"""进行带速率限制的多个请求。"""
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [api_call(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
results = await rate_limited_requests(urls, max_concurrent=3)
print(f"Completed {len(results)} requests")
asyncio.run(main())
import asyncio
class AsyncCounter:
"""线程安全的异步计数器。"""
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
"""安全地递增计数器。"""
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # 模拟工作
self.value = current + 1
async def get_value(self) -> int:
"""获取当前值。"""
async with self.lock:
return self.value
async def worker(counter: AsyncCounter, worker_id: int):
"""递增计数器的工作线程。"""
for _ in range(10):
await counter.increment()
print(f"Worker {worker_id} incremented")
async def test_counter():
"""测试并发计数器。"""
counter = AsyncCounter()
workers = [asyncio.create_task(worker(counter, i)) for i in range(5)]
await asyncio.gather(*workers)
final_value = await counter.get_value()
print(f"Final counter value: {final_value}")
asyncio.run(test_counter())
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""获取单个 URL。"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
text = await response.text()
return {
"url": url,
"status": response.status,
"length": len(text)
}
except Exception as e:
return {"url": url, "error": str(e)}
async def scrape_urls(urls: List[str]) -> List[Dict]:
"""并发爬取多个 URL。"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404",
]
results = await scrape_urls(urls)
for result in results:
print(result)
asyncio.run(main())
import asyncio
from typing import List, Optional
# 模拟的异步数据库客户端
class AsyncDB:
"""模拟的异步数据库。"""
async def execute(self, query: str) -> List[dict]:
"""执行查询。"""
await asyncio.sleep(0.1)
return [{"id": 1, "name": "Example"}]
async def fetch_one(self, query: str) -> Optional[dict]:
"""获取单行数据。"""
await asyncio.sleep(0.1)
return {"id": 1, "name": "Example"}
async def get_user_data(db: AsyncDB, user_id: int) -> dict:
"""并发获取用户及相关数据。"""
user_task = db.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
orders_task = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}")
profile_task = db.fetch_one(f"SELECT * FROM profiles WHERE user_id = {user_id}")
user, orders, profile = await asyncio.gather(user_task, orders_task, profile_task)
return {
"user": user,
"orders": orders,
"profile": profile
}
async def main():
db = AsyncDB()
user_data = await get_user_data(db, 1)
print(user_data)
asyncio.run(main())
import asyncio
from typing import Set
# 模拟的 WebSocket 连接
class WebSocket:
"""模拟的 WebSocket。"""
def __init__(self, client_id: str):
self.client_id = client_id
async def send(self, message: str):
"""发送消息。"""
print(f"Sending to {self.client_id}: {message}")
await asyncio.sleep(0.01)
async def recv(self) -> str:
"""接收消息。"""
await asyncio.sleep(1)
return f"Message from {self.client_id}"
class WebSocketServer:
"""简单的 WebSocket 服务器。"""
def __init__(self):
self.clients: Set[WebSocket] = set()
async def register(self, websocket: WebSocket):
"""注册新客户端。"""
self.clients.add(websocket)
print(f"Client {websocket.client_id} connected")
async def unregister(self, websocket: WebSocket):
"""注销客户端。"""
self.clients.remove(websocket)
print(f"Client {websocket.client_id} disconnected")
async def broadcast(self, message: str):
"""向所有客户端广播消息。"""
if self.clients:
tasks = [client.send(message) for client in self.clients]
await asyncio.gather(*tasks)
async def handle_client(self, websocket: WebSocket):
"""处理单个客户端连接。"""
await self.register(websocket)
try:
async for message in self.message_iterator(websocket):
await self.broadcast(f"{websocket.client_id}: {message}")
finally:
await self.unregister(websocket)
async def message_iterator(self, websocket: WebSocket):
"""遍历来自客户端的消息。"""
for _ in range(3): # 模拟 3 条消息
yield await websocket.recv()
import asyncio
import aiohttp
async def with_connection_pool():
"""使用连接池以提高效率。"""
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [session.get(f"https://api.example.com/item/{i}") for i in range(50)]
responses = await asyncio.gather(*tasks)
return responses
async def batch_process(items: List[str], batch_size: int = 10):
"""分批处理项目。"""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
tasks = [process_item(item) for item in batch]
await asyncio.gather(*tasks)
print(f"Processed batch {i // batch_size + 1}")
async def process_item(item: str):
"""处理单个项目。"""
await asyncio.sleep(0.1)
return f"Processed: {item}"
切勿使用同步操作阻塞事件循环。单个阻塞调用会停滞所有并发任务。
# 错误 - 阻塞整个事件循环
async def fetch_data_bad():
import time
import requests
time.sleep(1) # 阻塞!
response = requests.get(url) # 同样阻塞!
# 正确 - 使用异步原生库(例如,用于异步 HTTP 的 httpx)
import httpx
async def fetch_data_good(url: str):
await asyncio.sleep(1)
async with httpx.AsyncClient() as client:
response = await client.get(url)
使用 asyncio.to_thread() 包装阻塞代码(Python 3.9+):
当必须使用同步库时,将其卸载到线程池:
import asyncio
from pathlib import Path
async def read_file_async(path: str) -> str:
"""读取文件而不阻塞事件循环。"""
# asyncio.to_thread() 在线程池中运行同步代码
return await asyncio.to_thread(Path(path).read_text)
async def call_sync_library(data: dict) -> dict:
"""包装同步库调用。"""
# 适用于同步数据库驱动、文件 I/O、CPU 工作
return await asyncio.to_thread(sync_library.process, data)
使用 run_in_executor() 的低级方法:
import asyncio
import concurrent.futures
from typing import Any
def blocking_operation(data: Any) -> Any:
"""CPU 密集型阻塞操作。"""
import time
time.sleep(1)
return data * 2
async def run_in_executor(data: Any) -> Any:
"""在线程池中运行阻塞操作。"""
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_operation, data)
return result
async def main():
results = await asyncio.gather(*[run_in_executor(i) for i in range(5)])
print(results)
asyncio.run(main())
# 错误 - 返回协程对象,不执行
result = async_function()
# 正确
result = await async_function()
# 错误 - 阻塞事件循环
import time
async def bad():
time.sleep(1) # 阻塞!
# 正确
async def good():
await asyncio.sleep(1) # 非阻塞
async def cancelable_task():
"""处理取消的任务。"""
try:
while True:
await asyncio.sleep(1)
print("Working...")
except asyncio.CancelledError:
print("Task cancelled, cleaning up...")
# 执行清理
raise # 重新抛出以传播取消
# 错误 - 不能直接从同步代码调用异步代码
def sync_function():
result = await async_function() # 语法错误!
# 正确
def sync_function():
result = asyncio.run(async_function())
import asyncio
import pytest
# 使用 pytest-asyncio
@pytest.mark.asyncio
async def test_async_function():
"""测试异步函数。"""
result = await fetch_data("https://api.example.com")
assert result is not None
@pytest.mark.asyncio
async def test_with_timeout():
"""带超时测试。"""
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(5), timeout=1.0)
每周安装量
6.1K
仓库
GitHub 星标数
32.2K
首次出现
2026 年 1 月 20 日
安全审计
安装于
opencode4.7K
gemini-cli4.6K
codex4.4K
claude-code4.2K
cursor4.0K
github-copilot4.0K
Comprehensive guidance for implementing asynchronous Python applications using asyncio, concurrent programming patterns, and async/await for building high-performance, non-blocking systems.
Before adopting async, consider whether it's the right choice for your use case.
| Use Case | Recommended Approach |
|---|---|
| Many concurrent network/DB calls | asyncio |
| CPU-bound computation | multiprocessing or thread pool |
| Mixed I/O + CPU | Offload CPU work with asyncio.to_thread() |
| Simple scripts, few connections | Sync (simpler, easier to debug) |
| Web APIs with high concurrency | Async frameworks (FastAPI, aiohttp) |
Key Rule: Stay fully sync or fully async within a call path. Mixing creates hidden blocking and complexity.
The event loop is the heart of asyncio, managing and scheduling asynchronous tasks.
Key characteristics:
Functions defined with async def that can be paused and resumed.
Syntax:
async def my_coroutine():
result = await some_async_operation()
return result
Scheduled coroutines that run concurrently on the event loop.
Low-level objects representing eventual results of async operations.
Resources that support async with for proper cleanup.
Objects that support async for for iterating over async data sources.
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7+
asyncio.run(main())
import asyncio
async def fetch_data(url: str) -> dict:
"""Fetch data from URL asynchronously."""
await asyncio.sleep(1) # Simulate I/O
return {"url": url, "data": "result"}
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())
import asyncio
from typing import List
async def fetch_user(user_id: int) -> dict:
"""Fetch user data."""
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
"""Fetch multiple users concurrently."""
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
async def main():
user_ids = [1, 2, 3, 4, 5]
users = await fetch_all_users(user_ids)
print(f"Fetched {len(users)} users")
asyncio.run(main())
import asyncio
async def background_task(name: str, delay: int):
"""Long-running background task."""
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} completed")
return f"Result from {name}"
async def main():
# Create tasks
task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))
# Do other work
print("Main: doing other work")
await asyncio.sleep(0.5)
# Wait for tasks
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
import asyncio
from typing import List, Optional
async def risky_operation(item_id: int) -> dict:
"""Operation that might fail."""
await asyncio.sleep(0.1)
if item_id % 3 == 0:
raise ValueError(f"Item {item_id} failed")
return {"id": item_id, "status": "success"}
async def safe_operation(item_id: int) -> Optional[dict]:
"""Wrapper with error handling."""
try:
return await risky_operation(item_id)
except ValueError as e:
print(f"Error: {e}")
return None
async def process_items(item_ids: List[int]):
"""Process multiple items with error handling."""
tasks = [safe_operation(iid) for iid in item_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out failures
successful = [r for r in results if r is not None and not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"Success: {len(successful)}, Failed: {len(failed)}")
return successful
asyncio.run(process_items([1, 2, 3, 4, 5, 6]))
import asyncio
async def slow_operation(delay: int) -> str:
"""Operation that takes time."""
await asyncio.sleep(delay)
return f"Completed after {delay}s"
async def with_timeout():
"""Execute operation with timeout."""
try:
result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())
import asyncio
from typing import Optional
class AsyncDatabaseConnection:
"""Async database connection context manager."""
def __init__(self, dsn: str):
self.dsn = dsn
self.connection: Optional[object] = None
async def __aenter__(self):
print("Opening connection")
await asyncio.sleep(0.1) # Simulate connection
self.connection = {"dsn": self.dsn, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.1) # Simulate cleanup
self.connection = None
async def query_database():
"""Use async context manager."""
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
print(f"Using connection: {conn}")
await asyncio.sleep(0.2) # Simulate query
return {"rows": 10}
asyncio.run(query_database())
import asyncio
from typing import AsyncIterator
async def async_range(start: int, end: int, delay: float = 0.1) -> AsyncIterator[int]:
"""Async generator that yields numbers with delay."""
for i in range(start, end):
await asyncio.sleep(delay)
yield i
async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
"""Fetch paginated data asynchronously."""
for page in range(1, max_pages + 1):
await asyncio.sleep(0.2) # Simulate API call
yield {
"page": page,
"url": f"{url}?page={page}",
"data": [f"item_{page}_{i}" for i in range(5)]
}
async def consume_async_iterator():
"""Consume async iterator."""
async for number in async_range(1, 5):
print(f"Number: {number}")
print("\nFetching pages:")
async for page_data in fetch_pages("https://api.example.com/items", 3):
print(f"Page {page_data['page']}: {len(page_data['data'])} items")
asyncio.run(consume_async_iterator())
import asyncio
from asyncio import Queue
from typing import Optional
async def producer(queue: Queue, producer_id: int, num_items: int):
"""Produce items and put them in queue."""
for i in range(num_items):
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # Signal completion
async def consumer(queue: Queue, consumer_id: int):
"""Consume items from queue."""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer {consumer_id} processing: {item}")
await asyncio.sleep(0.2) # Simulate work
queue.task_done()
async def producer_consumer_example():
"""Run producer-consumer pattern."""
queue = Queue(maxsize=10)
# Create tasks
producers = [
asyncio.create_task(producer(queue, i, 5))
for i in range(2)
]
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# Wait for producers
await asyncio.gather(*producers)
# Wait for queue to be empty
await queue.join()
# Cancel consumers
for c in consumers:
c.cancel()
asyncio.run(producer_consumer_example())
import asyncio
from typing import List
async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
"""Make API call with rate limiting."""
async with semaphore:
print(f"Calling {url}")
await asyncio.sleep(0.5) # Simulate API call
return {"url": url, "status": 200}
async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
"""Make multiple requests with rate limiting."""
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [api_call(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
results = await rate_limited_requests(urls, max_concurrent=3)
print(f"Completed {len(results)} requests")
asyncio.run(main())
import asyncio
class AsyncCounter:
"""Thread-safe async counter."""
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
"""Safely increment counter."""
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # Simulate work
self.value = current + 1
async def get_value(self) -> int:
"""Get current value."""
async with self.lock:
return self.value
async def worker(counter: AsyncCounter, worker_id: int):
"""Worker that increments counter."""
for _ in range(10):
await counter.increment()
print(f"Worker {worker_id} incremented")
async def test_counter():
"""Test concurrent counter."""
counter = AsyncCounter()
workers = [asyncio.create_task(worker(counter, i)) for i in range(5)]
await asyncio.gather(*workers)
final_value = await counter.get_value()
print(f"Final counter value: {final_value}")
asyncio.run(test_counter())
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""Fetch single URL."""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
text = await response.text()
return {
"url": url,
"status": response.status,
"length": len(text)
}
except Exception as e:
return {"url": url, "error": str(e)}
async def scrape_urls(urls: List[str]) -> List[Dict]:
"""Scrape multiple URLs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404",
]
results = await scrape_urls(urls)
for result in results:
print(result)
asyncio.run(main())
import asyncio
from typing import List, Optional
# Simulated async database client
class AsyncDB:
"""Simulated async database."""
async def execute(self, query: str) -> List[dict]:
"""Execute query."""
await asyncio.sleep(0.1)
return [{"id": 1, "name": "Example"}]
async def fetch_one(self, query: str) -> Optional[dict]:
"""Fetch single row."""
await asyncio.sleep(0.1)
return {"id": 1, "name": "Example"}
async def get_user_data(db: AsyncDB, user_id: int) -> dict:
"""Fetch user and related data concurrently."""
user_task = db.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
orders_task = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}")
profile_task = db.fetch_one(f"SELECT * FROM profiles WHERE user_id = {user_id}")
user, orders, profile = await asyncio.gather(user_task, orders_task, profile_task)
return {
"user": user,
"orders": orders,
"profile": profile
}
async def main():
db = AsyncDB()
user_data = await get_user_data(db, 1)
print(user_data)
asyncio.run(main())
import asyncio
from typing import Set
# Simulated WebSocket connection
class WebSocket:
"""Simulated WebSocket."""
def __init__(self, client_id: str):
self.client_id = client_id
async def send(self, message: str):
"""Send message."""
print(f"Sending to {self.client_id}: {message}")
await asyncio.sleep(0.01)
async def recv(self) -> str:
"""Receive message."""
await asyncio.sleep(1)
return f"Message from {self.client_id}"
class WebSocketServer:
"""Simple WebSocket server."""
def __init__(self):
self.clients: Set[WebSocket] = set()
async def register(self, websocket: WebSocket):
"""Register new client."""
self.clients.add(websocket)
print(f"Client {websocket.client_id} connected")
async def unregister(self, websocket: WebSocket):
"""Unregister client."""
self.clients.remove(websocket)
print(f"Client {websocket.client_id} disconnected")
async def broadcast(self, message: str):
"""Broadcast message to all clients."""
if self.clients:
tasks = [client.send(message) for client in self.clients]
await asyncio.gather(*tasks)
async def handle_client(self, websocket: WebSocket):
"""Handle individual client connection."""
await self.register(websocket)
try:
async for message in self.message_iterator(websocket):
await self.broadcast(f"{websocket.client_id}: {message}")
finally:
await self.unregister(websocket)
async def message_iterator(self, websocket: WebSocket):
"""Iterate over messages from client."""
for _ in range(3): # Simulate 3 messages
yield await websocket.recv()
import asyncio
import aiohttp
async def with_connection_pool():
"""Use connection pool for efficiency."""
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [session.get(f"https://api.example.com/item/{i}") for i in range(50)]
responses = await asyncio.gather(*tasks)
return responses
async def batch_process(items: List[str], batch_size: int = 10):
"""Process items in batches."""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
tasks = [process_item(item) for item in batch]
await asyncio.gather(*tasks)
print(f"Processed batch {i // batch_size + 1}")
async def process_item(item: str):
"""Process single item."""
await asyncio.sleep(0.1)
return f"Processed: {item}"
Never block the event loop with synchronous operations. A single blocking call stalls all concurrent tasks.
# BAD - blocks the entire event loop
async def fetch_data_bad():
import time
import requests
time.sleep(1) # Blocks!
response = requests.get(url) # Also blocks!
# GOOD - use async-native libraries (e.g., httpx for async HTTP)
import httpx
async def fetch_data_good(url: str):
await asyncio.sleep(1)
async with httpx.AsyncClient() as client:
response = await client.get(url)
Wrapping Blocking Code withasyncio.to_thread() (Python 3.9+):
When you must use synchronous libraries, offload to a thread pool:
import asyncio
from pathlib import Path
async def read_file_async(path: str) -> str:
"""Read file without blocking event loop."""
# asyncio.to_thread() runs sync code in a thread pool
return await asyncio.to_thread(Path(path).read_text)
async def call_sync_library(data: dict) -> dict:
"""Wrap a synchronous library call."""
# Useful for sync database drivers, file I/O, CPU work
return await asyncio.to_thread(sync_library.process, data)
Lower-level approach withrun_in_executor():
import asyncio
import concurrent.futures
from typing import Any
def blocking_operation(data: Any) -> Any:
"""CPU-intensive blocking operation."""
import time
time.sleep(1)
return data * 2
async def run_in_executor(data: Any) -> Any:
"""Run blocking operation in thread pool."""
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_operation, data)
return result
async def main():
results = await asyncio.gather(*[run_in_executor(i) for i in range(5)])
print(results)
asyncio.run(main())
# Wrong - returns coroutine object, doesn't execute
result = async_function()
# Correct
result = await async_function()
# Wrong - blocks event loop
import time
async def bad():
time.sleep(1) # Blocks!
# Correct
async def good():
await asyncio.sleep(1) # Non-blocking
async def cancelable_task():
"""Task that handles cancellation."""
try:
while True:
await asyncio.sleep(1)
print("Working...")
except asyncio.CancelledError:
print("Task cancelled, cleaning up...")
# Perform cleanup
raise # Re-raise to propagate cancellation
# Wrong - can't call async from sync directly
def sync_function():
result = await async_function() # SyntaxError!
# Correct
def sync_function():
result = asyncio.run(async_function())
import asyncio
import pytest
# Using pytest-asyncio
@pytest.mark.asyncio
async def test_async_function():
"""Test async function."""
result = await fetch_data("https://api.example.com")
assert result is not None
@pytest.mark.asyncio
async def test_with_timeout():
"""Test with timeout."""
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(5), timeout=1.0)
Weekly Installs
6.1K
Repository
GitHub Stars
32.2K
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
opencode4.7K
gemini-cli4.6K
codex4.4K
claude-code4.2K
cursor4.0K
github-copilot4.0K
99,500 周安装