Asyncio Programming by pluginagentmarketplace/custom-plugin-python
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-python --skill 'Asyncio Programming'掌握 Python 中的异步编程与 asyncio。学习编写能高效处理 I/O 密集型操作的并发代码,构建异步 Web 应用程序,并理解 async/await 范式。
代码示例:
import asyncio
import time
# 同步版本(慢)
def fetch_data_sync(url):
print(f"Fetching {url}...")
time.sleep(2) # 模拟网络延迟
return f"Data from {url}"
def main_sync():
urls = ['url1', 'url2', 'url3']
results = []
for url in urls:
data = fetch_data_sync(url)
results.append(data)
return results
# 耗时 6 秒 (2 * 3)
start = time.time()
main_sync()
print(f"Sync took: {time.time() - start:.2f}s")
# 异步版本(快)
async def fetch_data_async(url):
print(f"Fetching {url}...")
await asyncio.sleep(2) # 非阻塞休眠
return f"Data from {url}"
async def main_async():
urls = ['url1', 'url2', 'url3']
# 创建任务并并发运行
tasks = [fetch_data_async(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 耗时 2 秒(并发执行)
start = time.time()
asyncio.run(main_async())
print(f"Async took: {time.time() - start:.2f}s")
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
代码示例:
import asyncio
async def process_item(item_id, delay):
print(f"Processing item {item_id}")
await asyncio.sleep(delay)
if item_id == 3:
raise ValueError(f"Item {item_id} failed!")
return f"Result {item_id}"
async def main():
# 方法 1: gather(按顺序返回结果)
tasks = [
process_item(1, 1),
process_item(2, 2),
process_item(3, 1),
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} result: {result}")
except Exception as e:
print(f"Error: {e}")
# 方法 2: wait(返回已完成/待处理集合)
tasks = [
asyncio.create_task(process_item(i, i))
for i in range(1, 4)
]
done, pending = await asyncio.wait(tasks, timeout=2.5)
print(f"Completed: {len(done)}, Pending: {len(pending)}")
# 取消待处理任务
for task in pending:
task.cancel()
# 方法 3: 任务组(Python 3.11+)
async with asyncio.TaskGroup() as tg:
for i in range(1, 4):
tg.create_task(process_item(i, 1))
# 所有任务完成或引发异常
asyncio.run(main())
代码示例:
import asyncio
import aiohttp
import aiofiles
from typing import List
# 异步 HTTP 请求
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_multiple_urls(urls: List[str]):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 异步文件操作
async def read_file_async(filepath):
async with aiofiles.open(filepath, 'r') as f:
content = await f.read()
return content
async def write_file_async(filepath, content):
async with aiofiles.open(filepath, 'w') as f:
await f.write(content)
# 异步数据库操作(asyncpg 示例)
import asyncpg
async def fetch_users():
conn = await asyncpg.connect(
user='user',
password='password',
database='mydb',
host='localhost'
)
try:
rows = await conn.fetch('SELECT * FROM users')
return rows
finally:
await conn.close()
# 用法
async def main():
# 并发获取 URL
urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3',
]
results = await fetch_multiple_urls(urls)
# 读/写文件
content = await read_file_async('input.txt')
await write_file_async('output.txt', content.upper())
# 数据库操作
users = await fetch_users()
print(f"Found {len(users)} users")
asyncio.run(main())
代码示例:
# FastAPI 异步示例
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
# 异步路由
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# 异步数据库调用
user = await fetch_user_from_db(user_id)
return user
# 后台任务
async def send_notification(email: str, message: str):
await asyncio.sleep(2) # 模拟邮件发送
print(f"Sent email to {email}: {message}")
@app.post("/orders/")
async def create_order(order_data: dict, background_tasks: BackgroundTasks):
# 同步处理订单
order_id = save_order(order_data)
# 在后台发送通知
background_tasks.add_task(
send_notification,
order_data['customer_email'],
f"Order #{order_id} created"
)
return {"order_id": order_id}
# aiohttp Web 服务器
from aiohttp import web
async def handle_request(request):
name = request.match_info.get('name', 'Anonymous')
await asyncio.sleep(1) # 异步操作
return web.json_response({'message': f'Hello {name}'})
app = web.Application()
app.add_routes([web.get('/{name}', handle_request)])
# WebSocket 示例
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
await ws.send_str(f"Echo: {msg.data}")
elif msg.type == web.WSMsgType.ERROR:
print(f'Error: {ws.exception()}')
return ws
app.add_routes([web.get('/ws', websocket_handler)])
构建一个带速率限制的并发网络爬虫。
要求:
关键技能: aiohttp、异步 I/O、错误处理
创建一个基于 WebSocket 的聊天应用程序。
要求:
关键技能: WebSockets、异步服务器、状态管理
构建一个分布式任务处理系统。
要求:
关键技能: 消息队列、并发工作器、清理
掌握 asyncio 后,可以探索:
每周安装次数
–
仓库
GitHub 星标数
5
首次出现
–
安全审计
Master asynchronous programming in Python with asyncio. Learn to write concurrent code that efficiently handles I/O-bound operations, build async web applications, and understand the async/await paradigm.
Code Example:
import asyncio
import time
# Synchronous version (slow)
def fetch_data_sync(url):
print(f"Fetching {url}...")
time.sleep(2) # Simulating network delay
return f"Data from {url}"
def main_sync():
urls = ['url1', 'url2', 'url3']
results = []
for url in urls:
data = fetch_data_sync(url)
results.append(data)
return results
# Takes 6 seconds (2 * 3)
start = time.time()
main_sync()
print(f"Sync took: {time.time() - start:.2f}s")
# Asynchronous version (fast)
async def fetch_data_async(url):
print(f"Fetching {url}...")
await asyncio.sleep(2) # Non-blocking sleep
return f"Data from {url}"
async def main_async():
urls = ['url1', 'url2', 'url3']
# Create tasks and run concurrently
tasks = [fetch_data_async(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Takes 2 seconds (concurrent execution)
start = time.time()
asyncio.run(main_async())
print(f"Async took: {time.time() - start:.2f}s")
Code Example:
import asyncio
async def process_item(item_id, delay):
print(f"Processing item {item_id}")
await asyncio.sleep(delay)
if item_id == 3:
raise ValueError(f"Item {item_id} failed!")
return f"Result {item_id}"
async def main():
# Method 1: gather (returns results in order)
tasks = [
process_item(1, 1),
process_item(2, 2),
process_item(3, 1),
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} result: {result}")
except Exception as e:
print(f"Error: {e}")
# Method 2: wait (returns done/pending sets)
tasks = [
asyncio.create_task(process_item(i, i))
for i in range(1, 4)
]
done, pending = await asyncio.wait(tasks, timeout=2.5)
print(f"Completed: {len(done)}, Pending: {len(pending)}")
# Cancel pending tasks
for task in pending:
task.cancel()
# Method 3: Task groups (Python 3.11+)
async with asyncio.TaskGroup() as tg:
for i in range(1, 4):
tg.create_task(process_item(i, 1))
# All tasks completed or exception raised
asyncio.run(main())
Code Example:
import asyncio
import aiohttp
import aiofiles
from typing import List
# Async HTTP requests
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_multiple_urls(urls: List[str]):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Async file operations
async def read_file_async(filepath):
async with aiofiles.open(filepath, 'r') as f:
content = await f.read()
return content
async def write_file_async(filepath, content):
async with aiofiles.open(filepath, 'w') as f:
await f.write(content)
# Async database operations (example with asyncpg)
import asyncpg
async def fetch_users():
conn = await asyncpg.connect(
user='user',
password='password',
database='mydb',
host='localhost'
)
try:
rows = await conn.fetch('SELECT * FROM users')
return rows
finally:
await conn.close()
# Usage
async def main():
# Fetch URLs concurrently
urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3',
]
results = await fetch_multiple_urls(urls)
# Read/write files
content = await read_file_async('input.txt')
await write_file_async('output.txt', content.upper())
# Database operations
users = await fetch_users()
print(f"Found {len(users)} users")
asyncio.run(main())
Code Example:
# FastAPI async example
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
# Async route
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Async database call
user = await fetch_user_from_db(user_id)
return user
# Background task
async def send_notification(email: str, message: str):
await asyncio.sleep(2) # Simulate email sending
print(f"Sent email to {email}: {message}")
@app.post("/orders/")
async def create_order(order_data: dict, background_tasks: BackgroundTasks):
# Process order synchronously
order_id = save_order(order_data)
# Send notification in background
background_tasks.add_task(
send_notification,
order_data['customer_email'],
f"Order #{order_id} created"
)
return {"order_id": order_id}
# aiohttp web server
from aiohttp import web
async def handle_request(request):
name = request.match_info.get('name', 'Anonymous')
await asyncio.sleep(1) # Async operation
return web.json_response({'message': f'Hello {name}'})
app = web.Application()
app.add_routes([web.get('/{name}', handle_request)])
# WebSocket example
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
await ws.send_str(f"Echo: {msg.data}")
elif msg.type == web.WSMsgType.ERROR:
print(f'Error: {ws.exception()}')
return ws
app.add_routes([web.get('/ws', websocket_handler)])
Build a concurrent web scraper with rate limiting.
Requirements:
Key Skills: aiohttp, async I/O, error handling
Create a WebSocket-based chat application.
Requirements:
Key Skills: WebSockets, async server, state management
Build a distributed task processing system.
Requirements:
Key Skills: Message queues, concurrent workers, cleanup
After mastering asyncio, explore:
Weekly Installs
–
Repository
GitHub Stars
5
First Seen
–
Security Audits
agent-browser 浏览器自动化工具 - Vercel Labs 命令行网页操作与测试
147,400 周安装