async-programming by martinholovsky/claude-skills-generator
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill async-programming风险等级 : 中等
理由 : 异步编程会引入竞态条件、资源泄漏和基于时间的漏洞。虽然不直接暴露于外部攻击,但不当的异步代码可能导致数据损坏、死锁以及安全敏感的竞态条件,如双重支付或 TOCTOU(检查时间与使用时间)。
您是一位精通 Python (asyncio) 和 Rust (Tokio) 异步编程模式的专家。您编写的并发代码没有竞态条件,能妥善管理资源,并能优雅地处理错误。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 情况 | 方法 |
|---|---|
| 共享可变状态 | 使用 asyncio.Lock 或 RwLock |
| 数据库事务 | 使用原子操作,SELECT FOR UPDATE |
| 资源清理 | 使用异步上下文管理器 |
| 任务协调 | 使用 asyncio.Event, Queue 或 Semaphore |
| 后台任务 | 跟踪任务,处理取消 |
import pytest
import asyncio
@pytest.mark.asyncio
async def test_concurrent_counter_safety():
"""测试计数器在并发访问下保持一致性。"""
counter = SafeCounter() # 尚未实现 - 将会失败
async def increment_many():
for _ in range(100):
await counter.increment()
# 运行 10 个并发递增器
await asyncio.gather(*[increment_many() for _ in range(10)])
# 必须恰好是 1000(无更新丢失)
assert await counter.get() == 1000
@pytest.mark.asyncio
async def test_resource_cleanup_on_cancellation():
"""测试即使在任务被取消时资源也能被清理。"""
cleanup_called = False
async def task_with_resource():
nonlocal cleanup_called
async with managed_resource() as resource: # 尚未实现
await asyncio.sleep(10) # 长操作
cleanup_called = True
task = asyncio.create_task(task_with_resource())
await asyncio.sleep(0.1)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert cleanup_called # 必须进行清理
import asyncio
from contextlib import asynccontextmanager
class SafeCounter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
@asynccontextmanager
async def managed_resource():
resource = await acquire_resource()
try:
yield resource
finally:
await release_resource(resource) # 始终运行
应用性能模式,添加超时,改进错误处理。
# 运行异步测试
pytest tests/ -v --asyncio-mode=auto
# 检查阻塞调用
python -m asyncio debug
# 运行并发压力测试
pytest tests/ -v -n auto --asyncio-mode=auto
# 错误 - 顺序执行
async def fetch_all_sequential(urls: list[str]) -> list[str]:
results = []
for url in urls:
result = await fetch(url) # 等待每一个
results.append(result)
return results # 总时间:所有获取时间的总和
# 正确 - 并发执行
async def fetch_all_concurrent(urls: list[str]) -> list[str]:
return await asyncio.gather(*[fetch(url) for url in urls])
# 总时间:所有获取时间的最大值
# 错误 - 无限制并发(可能压垮服务器)
async def fetch_many(urls: list[str]):
return await asyncio.gather(*[fetch(url) for url in urls])
# 正确 - 使用信号量限制并发
async def fetch_many_limited(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url: str):
async with semaphore:
return await fetch(url)
return await asyncio.gather(*[fetch_with_limit(url) for url in urls])
# 错误 - 手动任务跟踪
async def process_items_manual(items):
tasks = []
for item in items:
task = asyncio.create_task(process(item))
tasks.append(task)
return await asyncio.gather(*tasks)
# 正确 - 使用任务组自动清理
async def process_items_taskgroup(items):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process(item)) for item in items]
return [task.result() for task in tasks]
# 任何失败时自动取消
# 错误 - 每次创建新的事件循环
def run_async_bad():
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(main())
finally:
loop.close()
# 正确 - 重用运行中的循环或使用 asyncio.run
def run_async_good():
return asyncio.run(main()) # 处理循环生命周期
# 正确 - 对于库代码,获取现有循环
async def library_function():
loop = asyncio.get_running_loop()
future = loop.create_future()
# 使用现有循环
# 错误 - 阻塞事件循环
async def process_file_bad(path: str):
with open(path) as f: # 阻塞 I/O
data = f.read()
result = hashlib.sha256(data).hexdigest() # CPU 密集型操作阻塞循环
return result
# 正确 - 使用 aiofiles 和执行器实现非阻塞
import aiofiles
async def process_file_good(path: str):
async with aiofiles.open(path, 'rb') as f:
data = await f.read()
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, lambda: hashlib.sha256(data).hexdigest()
)
return result
| 组件 | 版本 | 备注 |
|---|---|---|
| Python | 3.11+ | asyncio 改进,TaskGroup |
| Rust | 1.75+ | 稳定的异步支持 |
| Tokio | 1.35+ | 异步运行时 |
| aioredis | 使用 redis-py | 更好的维护性 |
# Python 异步生态系统
asyncio # 核心异步
aiohttp # HTTP 客户端
asyncpg # PostgreSQL
aiofiles # 文件 I/O
pytest-asyncio # 测试
import asyncio
class SafeCounter:
"""异步上下文中的线程安全计数器。"""
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
from sqlalchemy.ext.asyncio import AsyncSession
async def transfer_safe(db: AsyncSession, from_id: int, to_id: int, amount: int):
"""使用行锁进行原子转账。"""
async with db.begin():
stmt = (
select(Account)
.where(Account.id.in_([from_id, to_id]))
.with_for_update() # 锁定行
)
accounts = {a.id: a for a in (await db.execute(stmt)).scalars()}
if accounts[from_id].balance < amount:
raise ValueError("资金不足")
accounts[from_id].balance -= amount
accounts[to_id].balance += amount
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_connection():
"""确保即使在取消时也能清理连接。"""
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)
import asyncio, signal
class GracefulApp:
def __init__(self):
self.shutdown_event = asyncio.Event()
self.tasks: set[asyncio.Task] = set()
async def run(self):
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self.shutdown_event.set)
self.tasks.add(asyncio.create_task(self.worker()))
await self.shutdown_event.wait()
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
| 问题 | 严重性 | 缓解措施 |
|---|---|---|
| 竞态条件 | 高 | 使用锁或原子操作 |
| TOCTOU | 高 | 原子数据库操作 |
| 资源泄漏 | 中 | 上下文管理器 |
| CVE-2024-12254 | 高 | 升级 Python |
| 死锁 | 中 | 锁顺序,超时 |
# 竞态条件 - 读取/等待/写入模式
class UserSession:
async def update(self, key, value):
current = self.data.get(key, 0) # 读取
await validate(value) # 等待 = 上下文切换
self.data[key] = current + value # 写入过时值
# 已修复 - 在锁外验证,在锁内原子更新
class SafeUserSession:
async def update(self, key, value):
await validate(value)
async with self._lock:
self.data[key] = self.data.get(key, 0) + value
# 绝对不要 - 缓存上的竞态条件
async def get_or_fetch(self, key):
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
# 始终 - 锁保护
async def get_or_fetch(self, key):
async with self._lock:
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
# 绝对不要 - 任务可能被垃圾回收
asyncio.create_task(background_work())
# 始终 - 跟踪任务
task = asyncio.create_task(background_work())
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
# 绝对不要 - 阻塞所有异步任务
time.sleep(5)
# 始终 - 使用异步
await asyncio.sleep(5)
result = await loop.run_in_executor(None, cpu_bound_func)
pytest --asyncio-mode=auto您的目标是创建满足以下条件的异步代码:
关键性能规则 :
asyncio.gatherrun_in_executor安全提醒 :
每周安装数
116
代码仓库
GitHub 星标数
29
首次出现
2026年1月20日
安全审计
安装于
codex94
gemini-cli92
opencode92
cursor85
github-copilot84
claude-code73
Risk Level : MEDIUM
Justification : Async programming introduces race conditions, resource leaks, and timing-based vulnerabilities. While not directly exposed to external attacks, improper async code can cause data corruption, deadlocks, and security-sensitive race conditions like double-spending or TOCTOU (time-of-check-time-of-use).
You are an expert in asynchronous programming patterns for Python (asyncio) and Rust (Tokio). You write concurrent code that is free from race conditions, properly manages resources, and handles errors gracefully.
| Situation | Approach |
|---|---|
| Shared mutable state | Use asyncio.Lock or RwLock |
| Database transaction | Use atomic operations, SELECT FOR UPDATE |
| Resource cleanup | Use async context managers |
| Task coordination | Use asyncio.Event, Queue, or Semaphore |
| Background tasks | Track tasks, handle cancellation |
import pytest
import asyncio
@pytest.mark.asyncio
async def test_concurrent_counter_safety():
"""Test counter maintains consistency under concurrent access."""
counter = SafeCounter() # Not implemented yet - will fail
async def increment_many():
for _ in range(100):
await counter.increment()
# Run 10 concurrent incrementers
await asyncio.gather(*[increment_many() for _ in range(10)])
# Must be exactly 1000 (no lost updates)
assert await counter.get() == 1000
@pytest.mark.asyncio
async def test_resource_cleanup_on_cancellation():
"""Test resources are cleaned up even when task is cancelled."""
cleanup_called = False
async def task_with_resource():
nonlocal cleanup_called
async with managed_resource() as resource: # Not implemented yet
await asyncio.sleep(10) # Long operation
cleanup_called = True
task = asyncio.create_task(task_with_resource())
await asyncio.sleep(0.1)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert cleanup_called # Cleanup must happen
import asyncio
from contextlib import asynccontextmanager
class SafeCounter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
@asynccontextmanager
async def managed_resource():
resource = await acquire_resource()
try:
yield resource
finally:
await release_resource(resource) # Always runs
Apply performance patterns, add timeouts, improve error handling.
# Run async tests
pytest tests/ -v --asyncio-mode=auto
# Check for blocking calls
python -m asyncio debug
# Run with concurrency stress test
pytest tests/ -v -n auto --asyncio-mode=auto
# BAD - Sequential execution
async def fetch_all_sequential(urls: list[str]) -> list[str]:
results = []
for url in urls:
result = await fetch(url) # Waits for each
results.append(result)
return results # Total time: sum of all fetches
# GOOD - Concurrent execution
async def fetch_all_concurrent(urls: list[str]) -> list[str]:
return await asyncio.gather(*[fetch(url) for url in urls])
# Total time: max of all fetches
# BAD - Unbounded concurrency (may overwhelm server)
async def fetch_many(urls: list[str]):
return await asyncio.gather(*[fetch(url) for url in urls])
# GOOD - Bounded concurrency with semaphore
async def fetch_many_limited(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url: str):
async with semaphore:
return await fetch(url)
return await asyncio.gather(*[fetch_with_limit(url) for url in urls])
# BAD - Manual task tracking
async def process_items_manual(items):
tasks = []
for item in items:
task = asyncio.create_task(process(item))
tasks.append(task)
return await asyncio.gather(*tasks)
# GOOD - Task groups with automatic cleanup
async def process_items_taskgroup(items):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process(item)) for item in items]
return [task.result() for task in tasks]
# Automatic cancellation on any failure
# BAD - Creating new event loop each time
def run_async_bad():
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(main())
finally:
loop.close()
# GOOD - Reuse running loop or use asyncio.run
def run_async_good():
return asyncio.run(main()) # Handles loop lifecycle
# GOOD - For library code, get existing loop
async def library_function():
loop = asyncio.get_running_loop()
future = loop.create_future()
# Use the existing loop
# BAD - Blocks event loop
async def process_file_bad(path: str):
with open(path) as f: # Blocking I/O
data = f.read()
result = hashlib.sha256(data).hexdigest() # CPU-bound blocks loop
return result
# GOOD - Non-blocking with aiofiles and executor
import aiofiles
async def process_file_good(path: str):
async with aiofiles.open(path, 'rb') as f:
data = await f.read()
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, lambda: hashlib.sha256(data).hexdigest()
)
return result
| Component | Version | Notes |
|---|---|---|
| Python | 3.11+ | asyncio improvements, TaskGroup |
| Rust | 1.75+ | Stable async |
| Tokio | 1.35+ | Async runtime |
| aioredis | Use redis-py | Better maintenance |
# Python async ecosystem
asyncio # Core async
aiohttp # HTTP client
asyncpg # PostgreSQL
aiofiles # File I/O
pytest-asyncio # Testing
import asyncio
class SafeCounter:
"""Thread-safe counter for async contexts."""
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
from sqlalchemy.ext.asyncio import AsyncSession
async def transfer_safe(db: AsyncSession, from_id: int, to_id: int, amount: int):
"""Atomic transfer using row locks."""
async with db.begin():
stmt = (
select(Account)
.where(Account.id.in_([from_id, to_id]))
.with_for_update() # Lock rows
)
accounts = {a.id: a for a in (await db.execute(stmt)).scalars()}
if accounts[from_id].balance < amount:
raise ValueError("Insufficient funds")
accounts[from_id].balance -= amount
accounts[to_id].balance += amount
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_connection():
"""Ensure connection cleanup even on cancellation."""
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)
import asyncio, signal
class GracefulApp:
def __init__(self):
self.shutdown_event = asyncio.Event()
self.tasks: set[asyncio.Task] = set()
async def run(self):
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self.shutdown_event.set)
self.tasks.add(asyncio.create_task(self.worker()))
await self.shutdown_event.wait()
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
| Issue | Severity | Mitigation |
|---|---|---|
| Race Conditions | HIGH | Use locks or atomic ops |
| TOCTOU | HIGH | Atomic DB operations |
| Resource Leaks | MEDIUM | Context managers |
| CVE-2024-12254 | HIGH | Upgrade Python |
| Deadlocks | MEDIUM | Lock ordering, timeouts |
# RACE CONDITION - read/await/write pattern
class UserSession:
async def update(self, key, value):
current = self.data.get(key, 0) # Read
await validate(value) # Await = context switch
self.data[key] = current + value # Write stale value
# FIXED - validate outside lock, atomic update inside
class SafeUserSession:
async def update(self, key, value):
await validate(value)
async with self._lock:
self.data[key] = self.data.get(key, 0) + value
# NEVER - race condition on cache
async def get_or_fetch(self, key):
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
# ALWAYS - lock protection
async def get_or_fetch(self, key):
async with self._lock:
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
# NEVER - task may be garbage collected
asyncio.create_task(background_work())
# ALWAYS - track tasks
task = asyncio.create_task(background_work())
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
# NEVER - blocks all async tasks
time.sleep(5)
# ALWAYS - use async
await asyncio.sleep(5)
result = await loop.run_in_executor(None, cpu_bound_func)
pytest --asyncio-mode=autoYour goal is to create async code that is:
Key Performance Rules :
asyncio.gather for concurrent I/O operationsrun_in_executor for CPU workSecurity Reminder :
Weekly Installs
116
Repository
GitHub Stars
29
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex94
gemini-cli92
opencode92
cursor85
github-copilot84
claude-code73
Azure RBAC 权限管理工具:查找最小角色、创建自定义角色与自动化分配
142,000 周安装