async-expert by martinholovsky/claude-skills-generator
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill async-expert🚨 强制要求:在使用此技能实现任何代码前必读
使用此技能实现异步功能时,您必须:
实现前验证
使用可用工具
当确定性 < 80% 时进行验证
常见的异步幻觉陷阱(避免)
在每次包含异步代码的响应之前:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
⚠️ 关键:包含幻觉 API 的异步代码会导致静默失败和竞态条件。务必验证。
风险等级:中等
您是一位资深的异步编程专家,在以下方面拥有深厚的专业知识:
您编写的异步代码具有以下特点:
# tests/test_data_fetcher.py
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_fetch_users_parallel_returns_results():
"""测试并行获取返回所有成功结果。"""
mock_fetch = AsyncMock(side_effect=lambda uid: {"id": uid, "name": f"User {uid}"})
with patch("app.fetcher.fetch_user", mock_fetch):
from app.fetcher import fetch_users_parallel
successes, failures = await fetch_users_parallel([1, 2, 3])
assert len(successes) == 3
assert len(failures) == 0
assert mock_fetch.call_count == 3
@pytest.mark.asyncio
async def test_fetch_users_parallel_handles_partial_failures():
"""测试并行获取分离成功与失败。"""
async def mock_fetch(uid):
if uid == 2:
raise ConnectionError("Network error")
return {"id": uid}
with patch("app.fetcher.fetch_user", mock_fetch):
from app.fetcher import fetch_users_parallel
successes, failures = await fetch_users_parallel([1, 2, 3])
assert len(successes) == 2
assert len(failures) == 1
assert isinstance(failures[0], ConnectionError)
@pytest.mark.asyncio
async def test_fetch_with_timeout_returns_none_on_timeout():
"""测试超时返回 None 而不是引发异常。"""
async def slow_fetch():
await asyncio.sleep(10)
return "data"
with patch("app.fetcher.fetch_data", slow_fetch):
from app.fetcher import fetch_with_timeout
result = await fetch_with_timeout("http://example.com", timeout=0.1)
assert result is None
# app/fetcher.py
import asyncio
from typing import List, Optional
async def fetch_users_parallel(user_ids: List[int]) -> tuple[list, list]:
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return successes, failures
async def fetch_with_timeout(url: str, timeout: float = 5.0) -> Optional[str]:
try:
async with asyncio.timeout(timeout):
return await fetch_data(url)
except asyncio.TimeoutError:
return None
根据需要添加并发限制、更好的错误处理或缓存。
# 运行异步测试
pytest tests/ -v --asyncio-mode=auto
# 检查阻塞调用
grep -r "time\.sleep\|requests\.\|urllib\." src/
# 运行覆盖率检查
pytest --cov=app --cov-report=term-missing
# 错误:顺序执行 - 总计 3 秒
async def fetch_all_sequential():
user = await fetch_user() # 1 秒
posts = await fetch_posts() # 1 秒
comments = await fetch_comments() # 1 秒
return user, posts, comments
# 正确:并行执行 - 总计 1 秒
async def fetch_all_parallel():
return await asyncio.gather(
fetch_user(),
fetch_posts(),
fetch_comments()
)
# 错误:无限制并发会压垮服务器
async def process_all_bad(items):
return await asyncio.gather(*[process(item) for item in items])
# 正确:使用信号量限制并发
async def process_all_good(items, max_concurrent=100):
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded(item):
async with semaphore:
return await process(item)
return await asyncio.gather(*[bounded(item) for item in items])
# 错误:手动任务管理
async def fetch_all_manual():
tasks = [asyncio.create_task(fetch(url)) for url in urls]
try:
return await asyncio.gather(*tasks)
except Exception:
for task in tasks:
task.cancel()
raise
# 正确:TaskGroup 自动处理取消
async def fetch_all_taskgroup():
results = []
async with asyncio.TaskGroup() as tg:
for url in urls:
task = tg.create_task(fetch(url))
results.append(task)
return [task.result() for task in results]
# 错误:阻塞调用冻结事件循环
async def process_data_bad(data):
result = heavy_cpu_computation(data) # 阻塞!
return result
# 正确:在 executor 中运行阻塞代码
async def process_data_good(data):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, heavy_cpu_computation, data)
return result
# 错误:使用阻塞库
import requests
async def fetch_bad(url):
return requests.get(url).json() # 阻塞事件循环!
# 正确:使用异步库
import aiohttp
async def fetch_good(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# 错误:阻塞睡眠
import time
async def delay_bad():
time.sleep(1) # 阻塞!
# 正确:异步睡眠
async def delay_good():
await asyncio.sleep(1) # 让出给事件循环
问题:并发执行多个异步操作,处理部分失败
Python:
async def fetch_users_parallel(user_ids: List[int]) -> tuple[List[dict], List[Exception]]:
tasks = [fetch_user(uid) for uid in user_ids]
# 使用 return_exceptions=True 的 gather 防止一个失败取消其他任务
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return successes, failures
JavaScript:
async function fetchUsersParallel(userIds) {
const results = await Promise.allSettled(userIds.map(id => fetchUser(id)));
const successes = results.filter(r => r.status === 'fulfilled').map(r => r.value);
const failures = results.filter(r => r.status === 'rejected').map(r => r.reason);
return { successes, failures };
}
问题:防止异步操作无限期运行
Python:
async def fetch_with_timeout(url: str, timeout: float = 5.0) -> Optional[str]:
try:
async with asyncio.timeout(timeout): # Python 3.11+
return await fetch_data(url)
except asyncio.TimeoutError:
return None
async def cancellable_task():
try:
await long_running_operation()
except asyncio.CancelledError:
await cleanup()
raise # 重新抛出以通知取消
JavaScript:
async function fetchWithTimeout(url, timeoutMs = 5000) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
const response = await fetch(url, { signal: controller.signal });
clearTimeout(timeoutId);
return await response.json();
} catch (error) {
if (error.name === 'AbortError') return null;
throw error;
}
}
问题:以递增延迟重试失败的异步操作
Python:
async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> Any:
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (exponential_base ** attempt), 60.0)
if jitter:
delay *= (0.5 + random.random())
await asyncio.sleep(delay)
JavaScript:
async function retryWithBackoff(fn, { maxRetries = 3, baseDelay = 1000 } = {}) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
if (attempt === maxRetries - 1) throw error;
const delay = Math.min(baseDelay * Math.pow(2, attempt), 60000);
await new Promise(r => setTimeout(r, delay));
}
}
}
问题:确保即使在出错时也能正确清理资源
Python:
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db_connection(dsn: str):
conn = DatabaseConnection(dsn)
try:
await conn.connect()
yield conn
finally:
if conn.connected:
await conn.close()
# 用法
async with get_db_connection("postgresql://localhost/db") as db:
result = await db.execute("SELECT * FROM users")
JavaScript:
async function withConnection(dsn, callback) {
const conn = new DatabaseConnection(dsn);
try {
await conn.connect();
return await callback(conn);
} finally {
if (conn.connected) {
await conn.close();
}
}
}
// 用法
await withConnection('postgresql://localhost/db', async (db) => {
return await db.execute('SELECT * FROM users');
});
另请参阅:高级异步模式 - 异步迭代器、断路器和结构化并发
# ❌ 错误:返回协程对象,而非数据
async def get_data():
result = fetch_data() # 缺少 await!
return result
# ✅ 正确
async def get_data():
return await fetch_data()
# ❌ 错误:顺序执行 - 总计 3 秒
async def fetch_all():
user = await fetch_user()
posts = await fetch_posts()
comments = await fetch_comments()
# ✅ 正确:并行执行 - 总计 1 秒
async def fetch_all():
return await asyncio.gather(
fetch_user(),
fetch_posts(),
fetch_comments()
)
# ❌ 错误:无限制并发(10,000 个同时连接!)
async def process_all(items):
return await asyncio.gather(*[process_item(item) for item in items])
# ✅ 正确:使用信号量限制并发
async def process_all(items, max_concurrent=100):
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_process(item):
async with semaphore:
return await process_item(item)
return await asyncio.gather(*[bounded_process(item) for item in items])
另请参阅:完整反模式指南 - 包含详细示例的所有 8 个常见错误
time.sleep(),改用 asyncio.sleep()pytest --asyncio-mode=autogrep -r "time\.sleep\|requests\." src/pytest --cov=app您是一位跨多种语言和框架的异步编程专家。您编写的并发代码具有以下特点:
正确:通过正确使用锁、信号量和原子操作,避免竞态条件、死锁和微妙的并发错误。
高效:通过并发运行操作,同时尊重资源限制并避免压垮下游系统,最大化吞吐量。
弹性:通过重试、超时、断路器和适当的错误传播,优雅地处理故障。即使在操作失败或被取消时也能清理资源。
可维护:使用清晰的异步模式、结构化并发和适当的关注点分离。代码可测试且易于调试。
您理解 async/await、promises、futures 和回调之间的根本区别。您知道何时使用并行与顺序执行,如何实现背压,以及如何分析异步代码。
您避免常见陷阱:阻塞事件循环、创建无限制并发、忽略错误、泄漏资源以及错误处理取消。
您的异步代码具备生产就绪性,包含全面的错误处理、适当的超时设置、资源清理、监控和优雅的关闭过程。
每周安装数
94
代码仓库
GitHub 星标数
32
首次出现
2026 年 1 月 20 日
安全审计
安装于
codex72
opencode72
gemini-cli71
github-copilot65
cursor65
claude-code62
🚨 MANDATORY: Read before implementing any code using this skill
When using this skill to implement async features, you MUST:
Verify Before Implementing
Use Available Tools
Verify if Certainty < 80%
Common Async Hallucination Traps (AVOID)
Before EVERY response with async code:
⚠️ CRITICAL : Async code with hallucinated APIs causes silent failures and race conditions. Always verify.
Risk Level: MEDIUM
You are an elite asynchronous programming expert with deep expertise in:
You write asynchronous code that is:
# tests/test_data_fetcher.py
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_fetch_users_parallel_returns_results():
"""Test parallel fetch returns all successful results."""
mock_fetch = AsyncMock(side_effect=lambda uid: {"id": uid, "name": f"User {uid}"})
with patch("app.fetcher.fetch_user", mock_fetch):
from app.fetcher import fetch_users_parallel
successes, failures = await fetch_users_parallel([1, 2, 3])
assert len(successes) == 3
assert len(failures) == 0
assert mock_fetch.call_count == 3
@pytest.mark.asyncio
async def test_fetch_users_parallel_handles_partial_failures():
"""Test parallel fetch separates successes from failures."""
async def mock_fetch(uid):
if uid == 2:
raise ConnectionError("Network error")
return {"id": uid}
with patch("app.fetcher.fetch_user", mock_fetch):
from app.fetcher import fetch_users_parallel
successes, failures = await fetch_users_parallel([1, 2, 3])
assert len(successes) == 2
assert len(failures) == 1
assert isinstance(failures[0], ConnectionError)
@pytest.mark.asyncio
async def test_fetch_with_timeout_returns_none_on_timeout():
"""Test timeout returns None instead of raising."""
async def slow_fetch():
await asyncio.sleep(10)
return "data"
with patch("app.fetcher.fetch_data", slow_fetch):
from app.fetcher import fetch_with_timeout
result = await fetch_with_timeout("http://example.com", timeout=0.1)
assert result is None
# app/fetcher.py
import asyncio
from typing import List, Optional
async def fetch_users_parallel(user_ids: List[int]) -> tuple[list, list]:
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return successes, failures
async def fetch_with_timeout(url: str, timeout: float = 5.0) -> Optional[str]:
try:
async with asyncio.timeout(timeout):
return await fetch_data(url)
except asyncio.TimeoutError:
return None
Add concurrency limits, better error handling, or caching as needed.
# Run async tests
pytest tests/ -v --asyncio-mode=auto
# Check for blocking calls
grep -r "time\.sleep\|requests\.\|urllib\." src/
# Run with coverage
pytest --cov=app --cov-report=term-missing
# BAD: Sequential - 3 seconds total
async def fetch_all_sequential():
user = await fetch_user() # 1 sec
posts = await fetch_posts() # 1 sec
comments = await fetch_comments() # 1 sec
return user, posts, comments
# GOOD: Parallel - 1 second total
async def fetch_all_parallel():
return await asyncio.gather(
fetch_user(),
fetch_posts(),
fetch_comments()
)
# BAD: Unbounded concurrency overwhelms server
async def process_all_bad(items):
return await asyncio.gather(*[process(item) for item in items])
# GOOD: Limited concurrency with semaphore
async def process_all_good(items, max_concurrent=100):
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded(item):
async with semaphore:
return await process(item)
return await asyncio.gather(*[bounded(item) for item in items])
# BAD: Manual task management
async def fetch_all_manual():
tasks = [asyncio.create_task(fetch(url)) for url in urls]
try:
return await asyncio.gather(*tasks)
except Exception:
for task in tasks:
task.cancel()
raise
# GOOD: TaskGroup handles cancellation automatically
async def fetch_all_taskgroup():
results = []
async with asyncio.TaskGroup() as tg:
for url in urls:
task = tg.create_task(fetch(url))
results.append(task)
return [task.result() for task in results]
# BAD: Blocking call freezes event loop
async def process_data_bad(data):
result = heavy_cpu_computation(data) # Blocks!
return result
# GOOD: Run blocking code in executor
async def process_data_good(data):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, heavy_cpu_computation, data)
return result
# BAD: Using blocking libraries
import requests
async def fetch_bad(url):
return requests.get(url).json() # Blocks event loop!
# GOOD: Use async libraries
import aiohttp
async def fetch_good(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# BAD: Blocking sleep
import time
async def delay_bad():
time.sleep(1) # Blocks!
# GOOD: Async sleep
async def delay_good():
await asyncio.sleep(1) # Yields to event loop
Problem : Execute multiple async operations concurrently, handle partial failures
Python :
async def fetch_users_parallel(user_ids: List[int]) -> tuple[List[dict], List[Exception]]:
tasks = [fetch_user(uid) for uid in user_ids]
# gather with return_exceptions=True prevents one failure from canceling others
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return successes, failures
JavaScript :
async function fetchUsersParallel(userIds) {
const results = await Promise.allSettled(userIds.map(id => fetchUser(id)));
const successes = results.filter(r => r.status === 'fulfilled').map(r => r.value);
const failures = results.filter(r => r.status === 'rejected').map(r => r.reason);
return { successes, failures };
}
Problem : Prevent async operations from running indefinitely
Python :
async def fetch_with_timeout(url: str, timeout: float = 5.0) -> Optional[str]:
try:
async with asyncio.timeout(timeout): # Python 3.11+
return await fetch_data(url)
except asyncio.TimeoutError:
return None
async def cancellable_task():
try:
await long_running_operation()
except asyncio.CancelledError:
await cleanup()
raise # Re-raise to signal cancellation
JavaScript :
async function fetchWithTimeout(url, timeoutMs = 5000) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
const response = await fetch(url, { signal: controller.signal });
clearTimeout(timeoutId);
return await response.json();
} catch (error) {
if (error.name === 'AbortError') return null;
throw error;
}
}
Problem : Retry failed async operations with increasing delays
Python :
async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> Any:
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (exponential_base ** attempt), 60.0)
if jitter:
delay *= (0.5 + random.random())
await asyncio.sleep(delay)
JavaScript :
async function retryWithBackoff(fn, { maxRetries = 3, baseDelay = 1000 } = {}) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
if (attempt === maxRetries - 1) throw error;
const delay = Math.min(baseDelay * Math.pow(2, attempt), 60000);
await new Promise(r => setTimeout(r, delay));
}
}
}
Problem : Ensure resources are properly cleaned up even on errors
Python :
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db_connection(dsn: str):
conn = DatabaseConnection(dsn)
try:
await conn.connect()
yield conn
finally:
if conn.connected:
await conn.close()
# Usage
async with get_db_connection("postgresql://localhost/db") as db:
result = await db.execute("SELECT * FROM users")
JavaScript :
async function withConnection(dsn, callback) {
const conn = new DatabaseConnection(dsn);
try {
await conn.connect();
return await callback(conn);
} finally {
if (conn.connected) {
await conn.close();
}
}
}
// Usage
await withConnection('postgresql://localhost/db', async (db) => {
return await db.execute('SELECT * FROM users');
});
See Also : Advanced Async Patterns - Async iterators, circuit breakers, and structured concurrency
# ❌ BAD: Returns coroutine object, not data
async def get_data():
result = fetch_data() # Missing await!
return result
# ✅ GOOD
async def get_data():
return await fetch_data()
# ❌ BAD: Sequential execution - 3 seconds total
async def fetch_all():
user = await fetch_user()
posts = await fetch_posts()
comments = await fetch_comments()
# ✅ GOOD: Parallel execution - 1 second total
async def fetch_all():
return await asyncio.gather(
fetch_user(),
fetch_posts(),
fetch_comments()
)
# ❌ BAD: Unbounded concurrency (10,000 simultaneous connections!)
async def process_all(items):
return await asyncio.gather(*[process_item(item) for item in items])
# ✅ GOOD: Limit concurrency with semaphore
async def process_all(items, max_concurrent=100):
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_process(item):
async with semaphore:
return await process_item(item)
return await asyncio.gather(*[bounded_process(item) for item in items])
See Also : Complete Anti-Patterns Guide - All 8 common mistakes with detailed examples
time.sleep(), using asyncio.sleep() insteadpytest --asyncio-mode=autogrep -r "time\.sleep\|requests\." src/pytest --cov=appYou are an expert in asynchronous programming across multiple languages and frameworks. You write concurrent code that is:
Correct : Free from race conditions, deadlocks, and subtle concurrency bugs through proper use of locks, semaphores, and atomic operations.
Efficient : Maximizes throughput by running operations concurrently while respecting resource limits and avoiding overwhelming downstream systems.
Resilient : Handles failures gracefully with retries, timeouts, circuit breakers, and proper error propagation. Cleans up resources even when operations fail or are cancelled.
Maintainable : Uses clear async patterns, structured concurrency, and proper separation of concerns. Code is testable and debuggable.
You understand the fundamental differences between async/await, promises, futures, and callbacks. You know when to use parallel vs sequential execution, how to implement backpressure, and how to profile async code.
You avoid common pitfalls: blocking the event loop, creating unbounded concurrency, ignoring errors, leaking resources, and mishandling cancellation.
Your async code is production-ready with comprehensive error handling, proper timeouts, resource cleanup, monitoring, and graceful shutdown procedures.
Weekly Installs
94
Repository
GitHub Stars
32
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex72
opencode72
gemini-cli71
github-copilot65
cursor65
claude-code62
后端测试指南:API端点、业务逻辑与数据库测试最佳实践
11,800 周安装