asyncio by bobmatnyc/claude-mpm-skills
npx skills add https://github.com/bobmatnyc/claude-mpm-skills --skill asyncioPython 的 asyncio 库支持使用 async/await 语法编写并发代码。它非常适合 I/O 密集型操作,如 HTTP 请求、数据库查询、文件操作和 WebSocket 连接。asyncio 提供非阻塞执行,无需处理线程或多进程的复杂性。
主要特性:
安装:
# asyncio 是内置库(Python 3.7+)
# 异步 HTTP 客户端
pip install aiohttp
# 异步 HTTP 请求(替代方案)
pip install httpx
# 异步数据库驱动
pip install asyncpg aiomysql motor # PostgreSQL、MySQL、MongoDB
# 支持异步的 FastAPI
pip install fastapi uvicorn[standard]
# 异步测试
pip install pytest-asyncio
import asyncio
async def hello():
"""基础异步函数(协程)。"""
print("Hello")
await asyncio.sleep(1) # 异步休眠(非阻塞)
print("World")
return "Done"
# 运行异步函数
result = asyncio.run(hello())
print(result) # "Done"
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
关键点:
async def 定义协程函数await 暂停执行直到可等待对象完成asyncio.run() 是异步程序的入口点import asyncio
import time
async def task(name, duration):
"""模拟异步任务。"""
print(f"{name}: Starting (duration: {duration}s)")
await asyncio.sleep(duration)
print(f"{name}: Complete")
return f"{name} result"
async def run_concurrent():
"""并发运行多个任务。"""
start = time.time()
# 顺序执行(慢)- 总计 6 秒
# result1 = await task("Task 1", 3)
# result2 = await task("Task 2", 2)
# result3 = await task("Task 3", 1)
# 并发执行(快)- 总计 3 秒
results = await asyncio.gather(
task("Task 1", 3),
task("Task 2", 2),
task("Task 3", 1)
)
elapsed = time.time() - start
print(f"Total time: {elapsed:.2f}s")
print(f"Results: {results}")
asyncio.run(run_concurrent())
# 输出:Total time: 3.00s(任务并发运行)
import asyncio
async def background_task(name):
"""长时间运行的后台任务。"""
for i in range(5):
print(f"{name}: iteration {i}")
await asyncio.sleep(1)
return f"{name} complete"
async def main():
# 创建任务(立即开始)
task1 = asyncio.create_task(background_task("Task-1"))
task2 = asyncio.create_task(background_task("Task-2"))
# 任务运行时执行其他工作
print("Main: doing other work")
await asyncio.sleep(2)
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
import asyncio
async def risky_operation(fail=False):
"""可能失败的操作。"""
await asyncio.sleep(1)
if fail:
raise ValueError("Operation failed")
return "Success"
async def handle_errors():
# 单独的 try/except
try:
result = await risky_operation(fail=True)
except ValueError as e:
print(f"Caught error: {e}")
result = "Fallback value"
# 带错误处理的 gather
results = await asyncio.gather(
risky_operation(fail=False),
risky_operation(fail=True),
risky_operation(fail=False),
return_exceptions=True # 返回异常而不是抛出
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(handle_errors())
import asyncio
# 现代方法(Python 3.7+)
async def main():
print("Main coroutine")
await asyncio.sleep(1)
asyncio.run(main()) # 创建循环,运行 main,关闭循环
# 手动循环管理(高级用例)
async def manual_example():
loop = asyncio.get_event_loop()
# 调度协程
task = loop.create_task(some_coroutine())
# 调度回调
loop.call_later(5, callback_function)
# 运行直到完成
result = await task
return result
# 获取当前事件循环
async def get_current_loop():
loop = asyncio.get_running_loop()
print(f"Loop: {loop}")
# 在事件循环中调度回调
loop.call_soon(lambda: print("Callback executed"))
await asyncio.sleep(0) # 让回调执行
import asyncio
from datetime import datetime
def callback(name, loop):
"""回调函数(非异步)。"""
print(f"{datetime.now()}: {name} callback executed")
# 回调后停止循环
# loop.stop()
async def schedule_callbacks():
loop = asyncio.get_running_loop()
# 调度立即回调
loop.call_soon(callback, "Immediate", loop)
# 调度延迟回调
loop.call_later(2, callback, "Delayed 2s", loop)
# 调度特定时间回调
loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)
# 等待回调执行
await asyncio.sleep(5)
asyncio.run(schedule_callbacks())
import asyncio
import time
def blocking_io():
"""CPU 密集型或阻塞 I/O 操作。"""
print("Blocking operation started")
time.sleep(2) # 阻塞线程
print("Blocking operation complete")
return "Blocking result"
async def run_in_executor():
"""在线程池中运行阻塞代码。"""
loop = asyncio.get_running_loop()
# 在默认执行器(线程池)中运行
result = await loop.run_in_executor(
None, # 使用默认执行器
blocking_io
)
print(f"Result: {result}")
# 并发运行阻塞操作
async def concurrent_blocking():
loop = asyncio.get_running_loop()
# 这些在线程池中运行,不阻塞事件循环
results = await asyncio.gather(
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io)
)
print(f"All results: {results}")
asyncio.run(concurrent_blocking())
import asyncio
# 共享资源
counter = 0
lock = asyncio.Lock()
async def increment_with_lock(name):
"""使用锁保护递增计数器。"""
global counter
async with lock:
# 临界区 - 一次只有一个任务
print(f"{name}: acquired lock")
current = counter
await asyncio.sleep(0.1) # 模拟处理
counter = current + 1
print(f"{name}: released lock, counter={counter}")
async def increment_without_lock(name):
"""无锁递增 - 竞态条件!"""
global counter
current = counter
await asyncio.sleep(0.1) # 竞态条件窗口
counter = current + 1
print(f"{name}: counter={counter}")
async def test_locks():
global counter
# 无锁(竞态条件)
counter = 0
await asyncio.gather(
increment_without_lock("Task-1"),
increment_without_lock("Task-2"),
increment_without_lock("Task-3")
)
print(f"Without lock: {counter}") # 通常错误(< 3)
# 有锁(正确)
counter = 0
await asyncio.gather(
increment_with_lock("Task-1"),
increment_with_lock("Task-2"),
increment_with_lock("Task-3")
)
print(f"With lock: {counter}") # 总是 3
asyncio.run(test_locks())
import asyncio
# 限制并发操作
semaphore = asyncio.Semaphore(2) # 最多 2 个并发
async def limited_operation(name):
"""受信号量限制的操作。"""
print(f"{name}: waiting for semaphore")
async with semaphore:
print(f"{name}: acquired semaphore")
await asyncio.sleep(2) # 模拟工作
print(f"{name}: releasing semaphore")
async def test_semaphore():
# 创建 5 个任务,但只有 2 个并发运行
await asyncio.gather(
limited_operation("Task-1"),
limited_operation("Task-2"),
limited_operation("Task-3"),
limited_operation("Task-4"),
limited_operation("Task-5")
)
asyncio.run(test_semaphore())
# 任何时候只有 2 个任务持有信号量
import asyncio
event = asyncio.Event()
async def waiter(name):
"""等待事件被设置。"""
print(f"{name}: waiting for event")
await event.wait() # 阻塞直到事件被设置
print(f"{name}: event received!")
async def setter():
"""延迟后设置事件。"""
await asyncio.sleep(2)
print("Setter: setting event")
event.set() # 唤醒所有等待者
async def test_event():
# 创建等待者
await asyncio.gather(
waiter("Waiter-1"),
waiter("Waiter-2"),
waiter("Waiter-3"),
setter()
)
asyncio.run(test_event())
import asyncio
import random
async def producer(queue, name):
"""生产项目并添加到队列。"""
for i in range(5):
item = f"{name}-item-{i}"
await queue.put(item)
print(f"{name}: produced {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# 发送完成信号
await queue.put(None)
async def consumer(queue, name):
"""从队列消费项目。"""
while True:
item = await queue.get() # 阻塞直到有项目可用
if item is None: # 关闭信号
queue.task_done()
break
print(f"{name}: consumed {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def test_queue():
queue = asyncio.Queue(maxsize=10)
# 创建生产者和消费者
await asyncio.gather(
producer(queue, "Producer-1"),
producer(queue, "Producer-2"),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
consumer(queue, "Consumer-3")
)
# 等待所有项目被处理
await queue.join()
print("All tasks complete")
asyncio.run(test_queue())
import asyncio
condition = asyncio.Condition()
items = []
async def consumer(name):
"""等待项目可用。"""
async with condition:
# 等待直到有项目可用
await condition.wait_for(lambda: len(items) > 0)
item = items.pop(0)
print(f"{name}: consumed {item}")
async def producer(name):
"""添加项目并通知消费者。"""
async with condition:
item = f"{name}-item"
items.append(item)
print(f"{name}: produced {item}")
# 通知一个等待的消费者
condition.notify(n=1)
# 或通知所有:condition.notify_all()
async def test_condition():
await asyncio.gather(
consumer("Consumer-1"),
consumer("Consumer-2"),
producer("Producer-1"),
producer("Producer-2")
)
asyncio.run(test_condition())
import asyncio
import aiohttp
async def fetch_url(session, url):
"""获取单个 URL。"""
async with session.get(url) as response:
status = response.status
text = await response.text()
return {"url": url, "status": status, "length": len(text)}
async def fetch_multiple_urls():
"""并发获取多个 URL。"""
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/json",
]
async with aiohttp.ClientSession() as session:
# 并发请求
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
asyncio.run(fetch_multiple_urls())
import asyncio
import aiohttp
from typing import Dict, Any
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Dict[str, Any]:
"""带重试逻辑的 URL 获取。"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
response.raise_for_status() # 为 4xx/5xx 抛出异常
data = await response.json()
return {"success": True, "data": data}
except aiohttp.ClientError as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
return {"success": False, "error": str(e)}
# 指数退避
await asyncio.sleep(2 ** attempt)
except asyncio.TimeoutError:
print(f"Attempt {attempt + 1} timed out")
if attempt == max_retries - 1:
return {"success": False, "error": "Timeout"}
await asyncio.sleep(2 ** attempt)
async def parallel_api_calls():
"""进行并行 API 调用并处理错误。"""
urls = [
"https://httpbin.org/json",
"https://httpbin.org/status/500", # 会失败
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_with_retry(session, url) for url in urls],
return_exceptions=True # 出错时不停止
)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"{url}: Exception - {result}")
elif result["success"]:
print(f"{url}: Success")
else:
print(f"{url}: Failed - {result['error']}")
asyncio.run(parallel_api_calls())
from aiohttp import web
import asyncio
async def handle_hello(request):
"""简单的 GET 处理器。"""
name = request.query.get("name", "World")
return web.json_response({"message": f"Hello, {name}!"})
async def handle_post(request):
"""处理 JSON 请求体的 POST 处理器。"""
data = await request.json()
# 模拟异步处理
await asyncio.sleep(1)
return web.json_response({
"received": data,
"status": "processed"
})
async def handle_stream(request):
"""流式响应。"""
response = web.StreamResponse()
await response.prepare(request)
for i in range(10):
await response.write(f"Chunk {i}\n".encode())
await asyncio.sleep(0.5)
await response.write_eof()
return response
# 创建应用
app = web.Application()
app.router.add_get("/hello", handle_hello)
app.router.add_post("/process", handle_post)
app.router.add_get("/stream", handle_stream)
# 运行服务器
if __name__ == "__main__":
web.run_app(app, host="0.0.0.0", port=8080)
import asyncio
import aiohttp
async def websocket_client():
"""连接到 WebSocket 服务器。"""
url = "wss://echo.websocket.org"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
# 发送消息
await ws.send_str("Hello WebSocket")
await ws.send_json({"type": "greeting", "data": "test"})
# 接收消息
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"Received: {msg.data}")
if msg.data == "close":
await ws.close()
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"Error: {ws.exception()}")
break
asyncio.run(websocket_client())
import asyncio
import asyncpg
async def database_operations():
"""异步 PostgreSQL 操作。"""
# 创建连接池
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="user",
password="password",
min_size=5,
max_size=20
)
try:
# 从池中获取连接
async with pool.acquire() as conn:
# 执行查询
rows = await conn.fetch(
"SELECT id, name, email FROM users WHERE active = $1",
True
)
for row in rows:
print(f"User: {row['name']} ({row['email']})")
# 插入数据
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
# 事务
async with conn.transaction():
await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")
finally:
await pool.close()
asyncio.run(database_operations())
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
async def mongodb_operations():
"""异步 MongoDB 操作。"""
# 创建客户端
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.mydb
collection = db.users
try:
# 插入文档
result = await collection.insert_one({
"name": "Alice",
"email": "alice@example.com",
"age": 30
})
print(f"Inserted ID: {result.inserted_id}")
# 查找文档
cursor = collection.find({"age": {"$gte": 25}})
async for document in cursor:
print(f"User: {document['name']}")
# 更新文档
await collection.update_one(
{"name": "Alice"},
{"$set": {"age": 31}}
)
# 聚合管道
pipeline = [
{"$match": {"age": {"$gte": 25}}},
{"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
]
async for result in collection.aggregate(pipeline):
print(f"Average age: {result['avg_age']}")
finally:
client.close()
asyncio.run(mongodb_operations())
import asyncio
import asyncpg
from typing import Optional
class DatabasePool:
"""异步数据库连接池管理器。"""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
"""创建连接池。"""
self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)
async def close(self):
"""关闭连接池。"""
if self.pool:
await self.pool.close()
async def execute(self, query: str, *args):
"""执行查询。"""
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
"""获取多行数据。"""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args):
"""获取单行数据。"""
async with self.pool.acquire() as conn:
return await conn.fetchrow(query, *args)
# 用法
async def use_pool():
db = DatabasePool("postgresql://user:pass@localhost/mydb")
await db.connect()
try:
# 执行操作
rows = await db.fetch("SELECT * FROM users")
for row in rows:
print(row)
finally:
await db.close()
asyncio.run(use_pool())
from fastapi import FastAPI, HTTPException
import asyncio
import httpx
app = FastAPI()
@app.get("/")
async def root():
"""简单的异步端点。"""
return {"message": "Hello World"}
@app.get("/delay/{seconds}")
async def delayed_response(seconds: int):
"""带异步延迟的端点。"""
await asyncio.sleep(seconds)
return {"message": f"Waited {seconds} seconds"}
@app.get("/fetch")
async def fetch_external():
"""从外部 API 获取数据。"""
async with httpx.AsyncClient() as client:
response = await client.get("https://httpbin.org/json")
return response.json()
@app.get("/parallel")
async def parallel_requests():
"""进行并行 API 调用。"""
async with httpx.AsyncClient() as client:
responses = await asyncio.gather(
client.get("https://httpbin.org/delay/1"),
client.get("https://httpbin.org/delay/2"),
client.get("https://httpbin.org/json")
)
return {
"results": [r.json() for r in responses]
}
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
async def send_email(email: str, message: str):
"""模拟发送邮件。"""
print(f"Sending email to {email}")
await asyncio.sleep(5) # 模拟慢速邮件服务
print(f"Email sent to {email}: {message}")
@app.post("/send-notification")
async def send_notification(
email: str,
message: str,
background_tasks: BackgroundTasks
):
"""在后台发送通知。"""
# 将任务添加到后台
background_tasks.add_task(send_email, email, message)
# 立即返回
return {"status": "notification queued"}
# 替代方案:手动创建任务
@app.post("/send-notification-manual")
async def send_notification_manual(email: str, message: str):
"""手动创建后台任务。"""
asyncio.create_task(send_email(email, message))
return {"status": "notification queued"}
from fastapi import FastAPI, Depends
import asyncpg
app = FastAPI()
# 数据库池(全局)
db_pool = None
async def get_db():
"""依赖项:数据库连接。"""
async with db_pool.acquire() as conn:
yield conn
@app.on_event("startup")
async def startup():
"""启动时初始化数据库池。"""
global db_pool
db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mydb"
)
@app.on_event("shutdown")
async def shutdown():
"""关闭时关闭数据库池。"""
await db_pool.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int, conn=Depends(get_db)):
"""使用异步数据库依赖项获取用户。"""
user = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1",
user_id
)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return dict(user)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
app = FastAPI()
# 活跃连接
active_connections: List[WebSocket] = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket 端点。"""
await websocket.accept()
active_connections.append(websocket)
try:
while True:
# 接收消息
data = await websocket.receive_text()
# 广播给所有连接
for connection in active_connections:
await connection.send_text(f"Broadcast: {data}")
except WebSocketDisconnect:
active_connections.remove(websocket)
print("Client disconnected")
# 后台任务发送定期更新
async def broadcast_updates():
"""向所有客户端发送定期更新。"""
while True:
await asyncio.sleep(5)
for connection in active_connections:
try:
await connection.send_text("Periodic update")
except:
active_connections.remove(connection)
@app.on_event("startup")
async def startup():
"""启动后台更新任务。"""
asyncio.create_task(broadcast_updates())
import asyncio
async def slow_operation():
"""慢速操作。"""
await asyncio.sleep(10)
return "Result"
async def with_timeout():
"""带超时运行操作。"""
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())
import asyncio
async def cancellable_task():
"""可取消的任务。"""
try:
for i in range(10):
print(f"Working: {i}")
await asyncio.sleep(1)
return "Complete"
except asyncio.CancelledError:
print("Task was cancelled")
# 清理
raise # 重新抛出以传播取消
async def cancel_example():
"""任务取消示例。"""
task = asyncio.create_task(cancellable_task())
# 让它运行一会儿
await asyncio.sleep(3)
# 取消任务
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Confirmed: task was cancelled")
asyncio.run(cancel_example())
import asyncio
class AsyncResource:
"""用于资源管理的异步上下文管理器。"""
async def __aenter__(self):
"""异步设置。"""
print("Acquiring resource")
await asyncio.sleep(1) # 模拟异步设置
self.connection = "connected"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步清理。"""
print("Releasing resource")
await asyncio.sleep(1) # 模拟异步清理
self.connection = None
async def use_resource():
"""使用异步资源。"""
async with AsyncResource() as resource:
print(f"Using resource: {resource.connection}")
await asyncio.sleep(1)
# 资源自动清理
asyncio.run(use_resource())
import asyncio
from datetime import datetime
class Debouncer:
"""防抖异步函数调用。"""
def __init__(self, delay: float):
self.delay = delay
self.task = None
async def call(self, func, *args, **kwargs):
"""防抖函数调用。"""
# 取消之前的任务
if self.task:
self.task.cancel()
# 创建新任务
async def delayed_call():
await asyncio.sleep(self.delay)
await func(*args, **kwargs)
self.task = asyncio.create_task(delayed_call())
async def api_call(query: str):
"""模拟 API 调用。"""
print(f"{datetime.now()}: API call with query: {query}")
async def debounce_example():
"""防抖示例。"""
debouncer = Debouncer(delay=1.0)
# 快速调用 - 只有最后一个执行
await debouncer.call(api_call, "query1")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query2")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query3")
# 等待防抖调用
await asyncio.sleep(2)
asyncio.run(debounce_example())
# 输出:只有 "query3" API 调用执行
import asyncio
from datetime import datetime
class RateLimiter:
"""限制异步操作的速率。"""
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
self.semaphore = asyncio.Semaphore(max_calls)
self.calls = []
async def __aenter__(self):
"""获取速率限制槽位。"""
await self.semaphore.acquire()
now = asyncio.get_event_loop().time()
# 移除周期外的旧调用
self.calls = [t for t in self.calls if now - t < self.period]
if len(self.calls) >= self.max_calls:
# 等待直到最旧的调用过期
sleep_time = self.period - (now - self.calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.calls.append(now)
return self
async def __aexit__(self, *args):
"""释放信号量。"""
self.semaphore.release()
async def rate_limited_operation(limiter, name):
"""带速率限制的操作。"""
async with limiter:
print(f"{datetime.now()}: {name}")
await asyncio.sleep(0.1)
async def rate_limit_example():
"""速率限制示例。"""
# 每 2 秒最多 3 次调用
limiter = RateLimiter(max_calls=3, period=2.0)
# 尝试进行 6 次调用
await asyncio.gather(*[
rate_limited_operation(limiter, f"Call-{i}")
for i in range(6)
])
asyncio.run(rate_limit_example())
import asyncio
import logging
# 启用 asyncio 调试模式
asyncio.run(main(), debug=True)
# 或设置环境变量:
# PYTHONASYNCIODEBUG=1 python script.py
# 配置日志记录
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def debug_example():
logger.debug("Starting operation")
await asyncio.sleep(1)
logger.debug("Operation complete")
import asyncio
import time
async def problematic_code():
"""包含阻塞操作的代码。"""
print("Starting")
# 错误:阻塞休眠
time.sleep(2) # 这会阻塞事件循环!
print("Complete")
# 使用调试模式运行以检测阻塞
asyncio.run(problematic_code(), debug=True)
# 警告:Executing <Task> took 2.001 seconds
import asyncio
async def track_tasks():
"""跟踪所有待处理任务。"""
# 获取所有任务
tasks = asyncio.all_tasks()
print(f"Total tasks: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}: {task}")
# 检查任务是否完成
if task.done():
try:
result = task.result()
print(f" Result: {result}")
except Exception as e:
print(f" Exception: {e}")
# 创建一些任务
async def main():
task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task")
task2 = asyncio.create_task(track_tasks(), name="tracking")
await task2
task1.cancel()
asyncio.run(main())
# test_async.py
import pytest
import asyncio
# 将测试标记为异步
@pytest.mark.asyncio
async def test_async_function():
"""测试异步函数。"""
result = await some_async_function()
assert result == "expected"
@pytest.mark.asyncio
async def test_async_http():
"""测试异步 HTTP 客户端。"""
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/json") as response:
assert response.status == 200
data = await response.json()
assert "slideshow" in data
# 异步夹具
@pytest.fixture
async def async_client():
"""异步测试夹具。"""
client = await create_async_client()
yield client
await client.close()
@pytest.mark.asyncio
async def test_with_fixture(async_client):
"""使用异步夹具进行测试。"""
result = await async_client.fetch_data()
assert result is not None
Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.
Key Features :
Installation :
# asyncio is built-in (Python 3.7+)
# Async HTTP client
pip install aiohttp
# Async HTTP requests (alternative)
pip install httpx
# Async database drivers
pip install asyncpg aiomysql motor # PostgreSQL, MySQL, MongoDB
# FastAPI with async support
pip install fastapi uvicorn[standard]
# Async testing
pip install pytest-asyncio
import asyncio
async def hello():
"""Basic async function (coroutine)."""
print("Hello")
await asyncio.sleep(1) # Async sleep (non-blocking)
print("World")
return "Done"
# Run async function
result = asyncio.run(hello())
print(result) # "Done"
Key Points :
async def defines a coroutine functionawait suspends execution until awaitable completesasyncio.run() is the entry point for async programsimport asyncio
import time
async def task(name, duration):
"""Simulate async task."""
print(f"{name}: Starting (duration: {duration}s)")
await asyncio.sleep(duration)
print(f"{name}: Complete")
return f"{name} result"
async def run_concurrent():
"""Run multiple tasks concurrently."""
start = time.time()
# Sequential (slow) - 6 seconds total
# result1 = await task("Task 1", 3)
# result2 = await task("Task 2", 2)
# result3 = await task("Task 3", 1)
# Concurrent (fast) - 3 seconds total
results = await asyncio.gather(
task("Task 1", 3),
task("Task 2", 2),
task("Task 3", 1)
)
elapsed = time.time() - start
print(f"Total time: {elapsed:.2f}s")
print(f"Results: {results}")
asyncio.run(run_concurrent())
# Output: Total time: 3.00s (tasks ran concurrently)
import asyncio
async def background_task(name):
"""Long-running background task."""
for i in range(5):
print(f"{name}: iteration {i}")
await asyncio.sleep(1)
return f"{name} complete"
async def main():
# Create task (starts immediately)
task1 = asyncio.create_task(background_task("Task-1"))
task2 = asyncio.create_task(background_task("Task-2"))
# Do other work while tasks run
print("Main: doing other work")
await asyncio.sleep(2)
# Wait for tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
import asyncio
async def risky_operation(fail=False):
"""Operation that might fail."""
await asyncio.sleep(1)
if fail:
raise ValueError("Operation failed")
return "Success"
async def handle_errors():
# Individual try/except
try:
result = await risky_operation(fail=True)
except ValueError as e:
print(f"Caught error: {e}")
result = "Fallback value"
# Gather with error handling
results = await asyncio.gather(
risky_operation(fail=False),
risky_operation(fail=True),
risky_operation(fail=False),
return_exceptions=True # Return exceptions instead of raising
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(handle_errors())
import asyncio
# Modern approach (Python 3.7+)
async def main():
print("Main coroutine")
await asyncio.sleep(1)
asyncio.run(main()) # Creates loop, runs main, closes loop
# Manual loop management (advanced use cases)
async def manual_example():
loop = asyncio.get_event_loop()
# Schedule coroutine
task = loop.create_task(some_coroutine())
# Schedule callback
loop.call_later(5, callback_function)
# Run until complete
result = await task
return result
# Get current event loop
async def get_current_loop():
loop = asyncio.get_running_loop()
print(f"Loop: {loop}")
# Schedule callback in event loop
loop.call_soon(lambda: print("Callback executed"))
await asyncio.sleep(0) # Let callback execute
import asyncio
from datetime import datetime
def callback(name, loop):
"""Callback function (not async)."""
print(f"{datetime.now()}: {name} callback executed")
# Stop loop after callback
# loop.stop()
async def schedule_callbacks():
loop = asyncio.get_running_loop()
# Schedule immediate callback
loop.call_soon(callback, "Immediate", loop)
# Schedule callback after delay
loop.call_later(2, callback, "Delayed 2s", loop)
# Schedule callback at specific time
loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)
# Wait for callbacks to execute
await asyncio.sleep(5)
asyncio.run(schedule_callbacks())
import asyncio
import time
def blocking_io():
"""CPU-intensive or blocking I/O operation."""
print("Blocking operation started")
time.sleep(2) # Blocks thread
print("Blocking operation complete")
return "Blocking result"
async def run_in_executor():
"""Run blocking code in thread pool."""
loop = asyncio.get_running_loop()
# Run in default executor (thread pool)
result = await loop.run_in_executor(
None, # Use default executor
blocking_io
)
print(f"Result: {result}")
# Run blocking operations concurrently
async def concurrent_blocking():
loop = asyncio.get_running_loop()
# These run in thread pool, don't block event loop
results = await asyncio.gather(
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io)
)
print(f"All results: {results}")
asyncio.run(concurrent_blocking())
import asyncio
# Shared resource
counter = 0
lock = asyncio.Lock()
async def increment_with_lock(name):
"""Increment counter with lock protection."""
global counter
async with lock:
# Critical section - only one task at a time
print(f"{name}: acquired lock")
current = counter
await asyncio.sleep(0.1) # Simulate processing
counter = current + 1
print(f"{name}: released lock, counter={counter}")
async def increment_without_lock(name):
"""Increment without lock - race condition!"""
global counter
current = counter
await asyncio.sleep(0.1) # Race condition window
counter = current + 1
print(f"{name}: counter={counter}")
async def test_locks():
global counter
# Without lock (race condition)
counter = 0
await asyncio.gather(
increment_without_lock("Task-1"),
increment_without_lock("Task-2"),
increment_without_lock("Task-3")
)
print(f"Without lock: {counter}") # Often wrong (< 3)
# With lock (correct)
counter = 0
await asyncio.gather(
increment_with_lock("Task-1"),
increment_with_lock("Task-2"),
increment_with_lock("Task-3")
)
print(f"With lock: {counter}") # Always 3
asyncio.run(test_locks())
import asyncio
# Limit concurrent operations
semaphore = asyncio.Semaphore(2) # Max 2 concurrent
async def limited_operation(name):
"""Operation limited by semaphore."""
print(f"{name}: waiting for semaphore")
async with semaphore:
print(f"{name}: acquired semaphore")
await asyncio.sleep(2) # Simulate work
print(f"{name}: releasing semaphore")
async def test_semaphore():
# Create 5 tasks, but only 2 run concurrently
await asyncio.gather(
limited_operation("Task-1"),
limited_operation("Task-2"),
limited_operation("Task-3"),
limited_operation("Task-4"),
limited_operation("Task-5")
)
asyncio.run(test_semaphore())
# Only 2 tasks hold semaphore at any time
import asyncio
event = asyncio.Event()
async def waiter(name):
"""Wait for event to be set."""
print(f"{name}: waiting for event")
await event.wait() # Block until event is set
print(f"{name}: event received!")
async def setter():
"""Set event after delay."""
await asyncio.sleep(2)
print("Setter: setting event")
event.set() # Wake up all waiters
async def test_event():
# Create waiters
await asyncio.gather(
waiter("Waiter-1"),
waiter("Waiter-2"),
waiter("Waiter-3"),
setter()
)
asyncio.run(test_event())
import asyncio
import random
async def producer(queue, name):
"""Produce items and add to queue."""
for i in range(5):
item = f"{name}-item-{i}"
await queue.put(item)
print(f"{name}: produced {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# Signal completion
await queue.put(None)
async def consumer(queue, name):
"""Consume items from queue."""
while True:
item = await queue.get() # Block until item available
if item is None: # Shutdown signal
queue.task_done()
break
print(f"{name}: consumed {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def test_queue():
queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
await asyncio.gather(
producer(queue, "Producer-1"),
producer(queue, "Producer-2"),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
consumer(queue, "Consumer-3")
)
# Wait for all items to be processed
await queue.join()
print("All tasks complete")
asyncio.run(test_queue())
import asyncio
condition = asyncio.Condition()
items = []
async def consumer(name):
"""Wait for items to be available."""
async with condition:
# Wait until items are available
await condition.wait_for(lambda: len(items) > 0)
item = items.pop(0)
print(f"{name}: consumed {item}")
async def producer(name):
"""Add items and notify consumers."""
async with condition:
item = f"{name}-item"
items.append(item)
print(f"{name}: produced {item}")
# Notify one waiting consumer
condition.notify(n=1)
# Or notify all: condition.notify_all()
async def test_condition():
await asyncio.gather(
consumer("Consumer-1"),
consumer("Consumer-2"),
producer("Producer-1"),
producer("Producer-2")
)
asyncio.run(test_condition())
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Fetch single URL."""
async with session.get(url) as response:
status = response.status
text = await response.text()
return {"url": url, "status": status, "length": len(text)}
async def fetch_multiple_urls():
"""Fetch multiple URLs concurrently."""
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/json",
]
async with aiohttp.ClientSession() as session:
# Concurrent requests
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
asyncio.run(fetch_multiple_urls())
import asyncio
import aiohttp
from typing import Dict, Any
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Dict[str, Any]:
"""Fetch URL with retry logic."""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
response.raise_for_status() # Raise for 4xx/5xx
data = await response.json()
return {"success": True, "data": data}
except aiohttp.ClientError as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
return {"success": False, "error": str(e)}
# Exponential backoff
await asyncio.sleep(2 ** attempt)
except asyncio.TimeoutError:
print(f"Attempt {attempt + 1} timed out")
if attempt == max_retries - 1:
return {"success": False, "error": "Timeout"}
await asyncio.sleep(2 ** attempt)
async def parallel_api_calls():
"""Make parallel API calls with error handling."""
urls = [
"https://httpbin.org/json",
"https://httpbin.org/status/500", # Will fail
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_with_retry(session, url) for url in urls],
return_exceptions=True # Don't stop on errors
)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"{url}: Exception - {result}")
elif result["success"]:
print(f"{url}: Success")
else:
print(f"{url}: Failed - {result['error']}")
asyncio.run(parallel_api_calls())
from aiohttp import web
import asyncio
async def handle_hello(request):
"""Simple GET handler."""
name = request.query.get("name", "World")
return web.json_response({"message": f"Hello, {name}!"})
async def handle_post(request):
"""POST handler with JSON body."""
data = await request.json()
# Simulate async processing
await asyncio.sleep(1)
return web.json_response({
"received": data,
"status": "processed"
})
async def handle_stream(request):
"""Streaming response."""
response = web.StreamResponse()
await response.prepare(request)
for i in range(10):
await response.write(f"Chunk {i}\n".encode())
await asyncio.sleep(0.5)
await response.write_eof()
return response
# Create application
app = web.Application()
app.router.add_get("/hello", handle_hello)
app.router.add_post("/process", handle_post)
app.router.add_get("/stream", handle_stream)
# Run server
if __name__ == "__main__":
web.run_app(app, host="0.0.0.0", port=8080)
import asyncio
import aiohttp
async def websocket_client():
"""Connect to WebSocket server."""
url = "wss://echo.websocket.org"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
# Send messages
await ws.send_str("Hello WebSocket")
await ws.send_json({"type": "greeting", "data": "test"})
# Receive messages
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"Received: {msg.data}")
if msg.data == "close":
await ws.close()
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"Error: {ws.exception()}")
break
asyncio.run(websocket_client())
import asyncio
import asyncpg
async def database_operations():
"""Async PostgreSQL operations."""
# Create connection pool
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="user",
password="password",
min_size=5,
max_size=20
)
try:
# Acquire connection from pool
async with pool.acquire() as conn:
# Execute query
rows = await conn.fetch(
"SELECT id, name, email FROM users WHERE active = $1",
True
)
for row in rows:
print(f"User: {row['name']} ({row['email']})")
# Insert data
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
# Transaction
async with conn.transaction():
await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")
finally:
await pool.close()
asyncio.run(database_operations())
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
async def mongodb_operations():
"""Async MongoDB operations."""
# Create client
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.mydb
collection = db.users
try:
# Insert document
result = await collection.insert_one({
"name": "Alice",
"email": "alice@example.com",
"age": 30
})
print(f"Inserted ID: {result.inserted_id}")
# Find documents
cursor = collection.find({"age": {"$gte": 25}})
async for document in cursor:
print(f"User: {document['name']}")
# Update document
await collection.update_one(
{"name": "Alice"},
{"$set": {"age": 31}}
)
# Aggregation pipeline
pipeline = [
{"$match": {"age": {"$gte": 25}}},
{"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
]
async for result in collection.aggregate(pipeline):
print(f"Average age: {result['avg_age']}")
finally:
client.close()
asyncio.run(mongodb_operations())
import asyncio
import asyncpg
from typing import Optional
class DatabasePool:
"""Async database connection pool manager."""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
"""Create connection pool."""
self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)
async def close(self):
"""Close connection pool."""
if self.pool:
await self.pool.close()
async def execute(self, query: str, *args):
"""Execute query."""
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
"""Fetch multiple rows."""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args):
"""Fetch single row."""
async with self.pool.acquire() as conn:
return await conn.fetchrow(query, *args)
# Usage
async def use_pool():
db = DatabasePool("postgresql://user:pass@localhost/mydb")
await db.connect()
try:
# Execute operations
rows = await db.fetch("SELECT * FROM users")
for row in rows:
print(row)
finally:
await db.close()
asyncio.run(use_pool())
from fastapi import FastAPI, HTTPException
import asyncio
import httpx
app = FastAPI()
@app.get("/")
async def root():
"""Simple async endpoint."""
return {"message": "Hello World"}
@app.get("/delay/{seconds}")
async def delayed_response(seconds: int):
"""Endpoint with async delay."""
await asyncio.sleep(seconds)
return {"message": f"Waited {seconds} seconds"}
@app.get("/fetch")
async def fetch_external():
"""Fetch data from external API."""
async with httpx.AsyncClient() as client:
response = await client.get("https://httpbin.org/json")
return response.json()
@app.get("/parallel")
async def parallel_requests():
"""Make parallel API calls."""
async with httpx.AsyncClient() as client:
responses = await asyncio.gather(
client.get("https://httpbin.org/delay/1"),
client.get("https://httpbin.org/delay/2"),
client.get("https://httpbin.org/json")
)
return {
"results": [r.json() for r in responses]
}
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
async def send_email(email: str, message: str):
"""Simulate sending email."""
print(f"Sending email to {email}")
await asyncio.sleep(5) # Simulate slow email service
print(f"Email sent to {email}: {message}")
@app.post("/send-notification")
async def send_notification(
email: str,
message: str,
background_tasks: BackgroundTasks
):
"""Send notification in background."""
# Add task to background
background_tasks.add_task(send_email, email, message)
# Return immediately
return {"status": "notification queued"}
# Alternative: manual task creation
@app.post("/send-notification-manual")
async def send_notification_manual(email: str, message: str):
"""Create background task manually."""
asyncio.create_task(send_email(email, message))
return {"status": "notification queued"}
from fastapi import FastAPI, Depends
import asyncpg
app = FastAPI()
# Database pool (global)
db_pool = None
async def get_db():
"""Dependency: database connection."""
async with db_pool.acquire() as conn:
yield conn
@app.on_event("startup")
async def startup():
"""Initialize database pool on startup."""
global db_pool
db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mydb"
)
@app.on_event("shutdown")
async def shutdown():
"""Close database pool on shutdown."""
await db_pool.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int, conn=Depends(get_db)):
"""Get user with async database dependency."""
user = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1",
user_id
)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return dict(user)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
app = FastAPI()
# Active connections
active_connections: List[WebSocket] = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint."""
await websocket.accept()
active_connections.append(websocket)
try:
while True:
# Receive message
data = await websocket.receive_text()
# Broadcast to all connections
for connection in active_connections:
await connection.send_text(f"Broadcast: {data}")
except WebSocketDisconnect:
active_connections.remove(websocket)
print("Client disconnected")
# Background task to send periodic updates
async def broadcast_updates():
"""Send periodic updates to all clients."""
while True:
await asyncio.sleep(5)
for connection in active_connections:
try:
await connection.send_text("Periodic update")
except:
active_connections.remove(connection)
@app.on_event("startup")
async def startup():
"""Start background update task."""
asyncio.create_task(broadcast_updates())
import asyncio
async def slow_operation():
"""Slow operation."""
await asyncio.sleep(10)
return "Result"
async def with_timeout():
"""Run operation with timeout."""
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())
import asyncio
async def cancellable_task():
"""Task that can be cancelled."""
try:
for i in range(10):
print(f"Working: {i}")
await asyncio.sleep(1)
return "Complete"
except asyncio.CancelledError:
print("Task was cancelled")
# Cleanup
raise # Re-raise to propagate cancellation
async def cancel_example():
"""Example of task cancellation."""
task = asyncio.create_task(cancellable_task())
# Let it run for a bit
await asyncio.sleep(3)
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Confirmed: task was cancelled")
asyncio.run(cancel_example())
import asyncio
class AsyncResource:
"""Async context manager for resource management."""
async def __aenter__(self):
"""Async setup."""
print("Acquiring resource")
await asyncio.sleep(1) # Simulate async setup
self.connection = "connected"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async cleanup."""
print("Releasing resource")
await asyncio.sleep(1) # Simulate async cleanup
self.connection = None
async def use_resource():
"""Use async resource."""
async with AsyncResource() as resource:
print(f"Using resource: {resource.connection}")
await asyncio.sleep(1)
# Resource automatically cleaned up
asyncio.run(use_resource())
import asyncio
from datetime import datetime
class Debouncer:
"""Debounce async function calls."""
def __init__(self, delay: float):
self.delay = delay
self.task = None
async def call(self, func, *args, **kwargs):
"""Debounced function call."""
# Cancel previous task
if self.task:
self.task.cancel()
# Create new task
async def delayed_call():
await asyncio.sleep(self.delay)
await func(*args, **kwargs)
self.task = asyncio.create_task(delayed_call())
async def api_call(query: str):
"""Simulated API call."""
print(f"{datetime.now()}: API call with query: {query}")
async def debounce_example():
"""Example of debouncing."""
debouncer = Debouncer(delay=1.0)
# Rapid calls - only last one executes
await debouncer.call(api_call, "query1")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query2")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query3")
# Wait for debounced call
await asyncio.sleep(2)
asyncio.run(debounce_example())
# Output: Only "query3" API call executes
import asyncio
from datetime import datetime
class RateLimiter:
"""Limit rate of async operations."""
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
self.semaphore = asyncio.Semaphore(max_calls)
self.calls = []
async def __aenter__(self):
"""Acquire rate limit slot."""
await self.semaphore.acquire()
now = asyncio.get_event_loop().time()
# Remove old calls outside period
self.calls = [t for t in self.calls if now - t < self.period]
if len(self.calls) >= self.max_calls:
# Wait until oldest call expires
sleep_time = self.period - (now - self.calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.calls.append(now)
return self
async def __aexit__(self, *args):
"""Release semaphore."""
self.semaphore.release()
async def rate_limited_operation(limiter, name):
"""Operation with rate limiting."""
async with limiter:
print(f"{datetime.now()}: {name}")
await asyncio.sleep(0.1)
async def rate_limit_example():
"""Example of rate limiting."""
# Max 3 calls per 2 seconds
limiter = RateLimiter(max_calls=3, period=2.0)
# Try to make 6 calls
await asyncio.gather(*[
rate_limited_operation(limiter, f"Call-{i}")
for i in range(6)
])
asyncio.run(rate_limit_example())
import asyncio
import logging
# Enable asyncio debug mode
asyncio.run(main(), debug=True)
# Or set environment variable:
# PYTHONASYNCIODEBUG=1 python script.py
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def debug_example():
logger.debug("Starting operation")
await asyncio.sleep(1)
logger.debug("Operation complete")
import asyncio
import time
async def problematic_code():
"""Code with blocking operation."""
print("Starting")
# BAD: Blocking sleep
time.sleep(2) # This blocks the event loop!
print("Complete")
# Run with debug mode to detect blocking
asyncio.run(problematic_code(), debug=True)
# Warning: Executing <Task> took 2.001 seconds
import asyncio
async def track_tasks():
"""Track all pending tasks."""
# Get all tasks
tasks = asyncio.all_tasks()
print(f"Total tasks: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}: {task}")
# Check if task is done
if task.done():
try:
result = task.result()
print(f" Result: {result}")
except Exception as e:
print(f" Exception: {e}")
# Create some tasks
async def main():
task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task")
task2 = asyncio.create_task(track_tasks(), name="tracking")
await task2
task1.cancel()
asyncio.run(main())
# test_async.py
import pytest
import asyncio
# Mark test as async
@pytest.mark.asyncio
async def test_async_function():
"""Test async function."""
result = await some_async_function()
assert result == "expected"
@pytest.mark.asyncio
async def test_async_http():
"""Test async HTTP client."""
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/json") as response:
assert response.status == 200
data = await response.json()
assert "slideshow" in data
# Async fixture
@pytest.fixture
async def async_client():
"""Async test fixture."""
client = await create_async_client()
yield client
await client.close()
@pytest.mark.asyncio
async def test_with_fixture(async_client):
"""Test using async fixture."""
result = await async_client.fetch_data()
assert result is not None
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_with_mock():
"""Test with async mock."""
# Create async mock
mock_func = AsyncMock(return_value="mocked result")
result = await mock_func()
assert result == "mocked result"
mock_func.assert_called_once()
@pytest.mark.asyncio
@patch("module.async_function", new_callable=AsyncMock)
async def test_with_patch(mock_async):
"""Test with patched async function."""
mock_async.return_value = {"status": "success"}
result = await some_function_that_calls_async()
assert result["status"] == "success"
mock_async.assert_called_once()
import asyncio
import time
async def slow_task(n):
await asyncio.sleep(1)
return n * 2
async def optimized():
"""Parallel execution."""
start = time.time()
# Sequential (slow) - 5 seconds
# results = []
# for i in range(5):
# result = await slow_task(i)
# results.append(result)
# Parallel (fast) - 1 second
results = await asyncio.gather(*[slow_task(i) for i in range(5)])
elapsed = time.time() - start
print(f"Time: {elapsed:.2f}s, Results: {results}")
asyncio.run(optimized())
import asyncio
import aiohttp
# BAD: Create new session for each request
async def bad_pattern():
for i in range(10):
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/json") as response:
await response.json()
# GOOD: Reuse session with connection pool
async def good_pattern():
async with aiohttp.ClientSession() as session:
tasks = [
session.get("https://httpbin.org/json")
for i in range(10)
]
responses = await asyncio.gather(*tasks)
for response in responses:
await response.json()
import asyncio
# BAD: Blocking I/O in async function
async def bad_file_read():
with open("large_file.txt") as f: # Blocks event loop!
data = f.read()
return data
# GOOD: Use async file I/O or run in executor
async def good_file_read():
loop = asyncio.get_running_loop()
# Run blocking operation in thread pool
data = await loop.run_in_executor(
None,
lambda: open("large_file.txt").read()
)
return data
# BETTER: Use aiofiles for async file I/O
import aiofiles
async def better_file_read():
async with aiofiles.open("large_file.txt") as f:
data = await f.read()
return data
# WRONG
async def bad():
result = async_function() # Returns coroutine, doesn't execute!
print(result) # Prints: <coroutine object>
# CORRECT
async def good():
result = await async_function() # Actually executes
print(result)
# WRONG
import time
async def bad():
time.sleep(5) # Blocks entire event loop!
# CORRECT
async def good():
await asyncio.sleep(5) # Non-blocking
# WRONG
async def bad():
await asyncio.sleep(10)
# No cleanup if cancelled
# CORRECT
async def good():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
# Cleanup resources
await cleanup()
raise # Re-raise to propagate
# WRONG (Python 3.7+)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# CORRECT (Python 3.7+)
asyncio.run(main())
# WRONG
async def bad():
session = aiohttp.ClientSession()
response = await session.get(url)
# Session never closed - resource leak!
# CORRECT
async def good():
async with aiohttp.ClientSession() as session:
response = await session.get(url)
# Session automatically closed
# Run async script
python script.py
# Run with debug mode
PYTHONASYNCIODEBUG=1 python script.py
# Run tests
pytest -v --asyncio-mode=auto
# Install async dependencies
pip install aiohttp asyncpg motor pytest-asyncio
import asyncio
import aiohttp
import asyncpg
from typing import List, Dict, Any
When using asyncio, consider these complementary skills:
# FastAPI async endpoint pattern
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
app = FastAPI()
# Async database setup
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
# Async database query
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# Background tasks with asyncio
@app.post("/send-email")
async def send_email_endpoint(email: EmailSchema):
# Non-blocking background task
asyncio.create_task(send_email_async(email))
return {"status": "email queued"}
# Testing async functions with pytest
import pytest
import pytest_asyncio
from httpx import AsyncClient
# Async test fixture
@pytest_asyncio.fixture
async def async_client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
# Async test function
@pytest.mark.asyncio
async def test_get_user(async_client):
response = await async_client.get("/users/1")
assert response.status_code == 200
assert response.json()["id"] == 1
# Testing concurrent operations
@pytest.mark.asyncio
async def test_concurrent_requests():
async with AsyncClient(app=app, base_url="http://test") as client:
# Run 10 requests concurrently
responses = await asyncio.gather(
*[client.get(f"/users/{i}") for i in range(1, 11)]
)
assert all(r.status_code == 200 for r in responses)
# Mock async dependencies
@pytest_asyncio.fixture
async def mock_db():
# Setup mock database
db = AsyncMock()
yield db
# Cleanup
Common Async Pitfalls:
Blocking the Event Loop
# ❌ WRONG: Blocking call in async function
async def bad_function():
time.sleep(5) # Blocks entire event loop!
return "done"
# ✅ CORRECT: Use asyncio.sleep
async def good_function():
await asyncio.sleep(5) # Releases event loop
return "done"
Debugging Race Conditions
# Add logging to track execution order
import logging
logging.basicConfig(level=logging.DEBUG)
async def debug_task(name):
logging.debug(f"{name}: Starting")
await asyncio.sleep(1)
logging.debug(f"{name}: Finished")
return name
# Run with detailed tracing
asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True)
Deadlock Detection
# Use timeout to detect deadlocks
try:
result = await asyncio.wait_for(some_async_function(), timeout=5.0)
except asyncio.TimeoutError:
logging.error("Deadlock detected: operation timed out")
# Investigate what's blocking
Inspecting Running Tasks
[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]
Python Version Compatibility: This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.
Weekly Installs
160
Repository
GitHub Stars
27
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex137
opencode137
gemini-cli136
cursor132
github-copilot130
claude-code121
Skills CLI 使用指南:AI Agent 技能包管理器安装与管理教程
33,600 周安装
# Check all pending tasks
tasks = asyncio.all_tasks()
for task in tasks:
print(f"Task: {task.get_name()}, Done: {task.done()}")
if not task.done():
print(f" Current coro: {task.get_coro()}")