python-resilience by wshobson/agents
npx skills add https://github.com/wshobson/agents --skill python-resilience构建能够优雅处理瞬时故障、网络问题和服务中断的容错型 Python 应用程序。弹性模式能在依赖项不可靠时保持系统运行。
重试瞬时错误(网络超时、临时服务问题)。不要重试永久错误(无效凭据、错误请求)。
增加重试之间的等待时间,避免压垮正在恢复的服务。
为退避添加随机性,防止多个客户端同时重试时产生惊群效应。
限制尝试次数和总持续时间,防止无限重试循环。
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def call_external_service(request: dict) -> dict:
return httpx.post("https://api.example.com", json=request).json()
使用 tenacity 库实现生产级重试逻辑。对于更简单的情况,可以考虑内置的重试功能或轻量级的自定义实现。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from tenacity import (
retry,
stop_after_attempt,
stop_after_delay,
wait_exponential_jitter,
retry_if_exception_type,
)
TRANSIENT_ERRORS = (ConnectionError, TimeoutError, OSError)
@retry(
retry=retry_if_exception_type(TRANSIENT_ERRORS),
stop=stop_after_attempt(5) | stop_after_delay(60),
wait=wait_exponential_jitter(initial=1, max=30),
)
def fetch_data(url: str) -> dict:
"""在瞬时故障时自动重试获取数据。"""
response = httpx.get(url, timeout=30)
response.raise_for_status()
return response.json()
白名单特定的瞬时异常。绝不重试:
ValueError, TypeError - 这些是错误,不是瞬时问题AuthenticationError - 无效凭据不会变得有效from tenacity import retry, retry_if_exception_type
import httpx
# 定义可重试的内容
RETRYABLE_EXCEPTIONS = (
ConnectionError,
TimeoutError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
)
@retry(
retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def resilient_api_call(endpoint: str) -> dict:
"""在网络问题上进行重试的 API 调用。"""
return httpx.get(endpoint, timeout=10).json()
重试表示瞬时问题的特定 HTTP 状态码。
from tenacity import retry, retry_if_result, stop_after_attempt
import httpx
RETRY_STATUS_CODES = {429, 502, 503, 504}
def should_retry_response(response: httpx.Response) -> bool:
"""检查响应是否表示可重试的错误。"""
return response.status_code in RETRY_STATUS_CODES
@retry(
retry=retry_if_result(should_retry_response),
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def http_request(method: str, url: str, **kwargs) -> httpx.Response:
"""在瞬时状态码上进行重试的 HTTP 请求。"""
return httpx.request(method, url, timeout=30, **kwargs)
同时处理网络异常和 HTTP 状态码。
from tenacity import (
retry,
retry_if_exception_type,
retry_if_result,
stop_after_attempt,
wait_exponential_jitter,
before_sleep_log,
)
import logging
import httpx
logger = logging.getLogger(__name__)
TRANSIENT_EXCEPTIONS = (
ConnectionError,
TimeoutError,
httpx.ConnectError,
httpx.ReadTimeout,
)
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}
def is_retryable_response(response: httpx.Response) -> bool:
return response.status_code in RETRY_STATUS_CODES
@retry(
retry=(
retry_if_exception_type(TRANSIENT_EXCEPTIONS) |
retry_if_result(is_retryable_response)
),
stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=1, max=30),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def robust_http_call(
method: str,
url: str,
**kwargs,
) -> httpx.Response:
"""具有全面重试处理的 HTTP 调用。"""
return httpx.request(method, url, timeout=30, **kwargs)
跟踪重试行为以便调试和告警。
from tenacity import retry, stop_after_attempt, wait_exponential
import structlog
logger = structlog.get_logger()
def log_retry_attempt(retry_state):
"""记录详细的重试信息。"""
exception = retry_state.outcome.exception()
logger.warning(
"重试操作",
attempt=retry_state.attempt_number,
exception_type=type(exception).__name__,
exception_message=str(exception),
next_wait_seconds=retry_state.next_action.sleep if retry_state.next_action else None,
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, max=10),
before_sleep=log_retry_attempt,
)
def call_with_logging(request: dict) -> dict:
"""带有重试日志记录的外部调用。"""
...
创建可重用的超时装饰器,以实现一致的超时处理。
import asyncio
from functools import wraps
from typing import TypeVar, Callable
T = TypeVar("T")
def with_timeout(seconds: float):
"""为异步函数添加超时的装饰器。"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=seconds,
)
return wrapper
return decorator
@with_timeout(30)
async def fetch_with_timeout(url: str) -> dict:
"""使用 30 秒超时获取 URL。"""
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
堆叠装饰器以将基础设施与业务逻辑分离。
from functools import wraps
from typing import TypeVar, Callable
import structlog
logger = structlog.get_logger()
T = TypeVar("T")
def traced(name: str | None = None):
"""为函数调用添加追踪。"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
span_name = name or func.__name__
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
logger.info("操作开始", operation=span_name)
try:
result = await func(*args, **kwargs)
logger.info("操作完成", operation=span_name)
return result
except Exception as e:
logger.error("操作失败", operation=span_name, error=str(e))
raise
return wrapper
return decorator
# 堆叠多个关注点
@traced("fetch_user_data")
@with_timeout(30)
@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter())
async def fetch_user_data(user_id: str) -> dict:
"""使用追踪、超时和重试获取用户。"""
...
通过构造函数传递基础设施组件,便于测试。
from dataclasses import dataclass
from typing import Protocol
class Logger(Protocol):
def info(self, msg: str, **kwargs) -> None: ...
def error(self, msg: str, **kwargs) -> None: ...
class MetricsClient(Protocol):
def increment(self, metric: str, tags: dict | None = None) -> None: ...
def timing(self, metric: str, value: float) -> None: ...
@dataclass
class UserService:
"""注入基础设施的服务。"""
repository: UserRepository
logger: Logger
metrics: MetricsClient
async def get_user(self, user_id: str) -> User:
self.logger.info("正在获取用户", user_id=user_id)
start = time.perf_counter()
try:
user = await self.repository.get(user_id)
self.metrics.increment("user.fetch.success")
return user
except Exception as e:
self.metrics.increment("user.fetch.error")
self.logger.error("获取用户失败", user_id=user_id, error=str(e))
raise
finally:
elapsed = time.perf_counter() - start
self.metrics.timing("user.fetch.duration", elapsed)
# 使用模拟对象轻松测试
service = UserService(
repository=FakeRepository(),
logger=FakeLogger(),
metrics=FakeMetrics(),
)
当非关键操作失败时优雅降级。
from typing import TypeVar
from collections.abc import Callable
T = TypeVar("T")
def fail_safe(default: T, log_failure: bool = True):
"""失败时返回默认值而不是引发异常。"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
try:
return await func(*args, **kwargs)
except Exception as e:
if log_failure:
logger.warning(
"操作失败,使用默认值",
function=func.__name__,
error=str(e),
)
return default
return wrapper
return decorator
@fail_safe(default=[])
async def get_recommendations(user_id: str) -> list[str]:
"""获取推荐,失败时返回空列表。"""
...
stop_after_attempt(5) | stop_after_delay(60)每周安装量
2.9K
仓库
GitHub 星标
32.2K
首次出现
2026 年 1 月 30 日
安全审计
安装于
gemini-cli2.3K
opencode2.3K
codex2.3K
claude-code2.3K
cursor2.1K
github-copilot2.0K
Build fault-tolerant Python applications that gracefully handle transient failures, network issues, and service outages. Resilience patterns keep systems running when dependencies are unreliable.
Retry transient errors (network timeouts, temporary service issues). Don't retry permanent errors (invalid credentials, bad requests).
Increase wait time between retries to avoid overwhelming recovering services.
Add randomness to backoff to prevent thundering herd when many clients retry simultaneously.
Cap both attempt count and total duration to prevent infinite retry loops.
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def call_external_service(request: dict) -> dict:
return httpx.post("https://api.example.com", json=request).json()
Use the tenacity library for production-grade retry logic. For simpler cases, consider built-in retry functionality or a lightweight custom implementation.
from tenacity import (
retry,
stop_after_attempt,
stop_after_delay,
wait_exponential_jitter,
retry_if_exception_type,
)
TRANSIENT_ERRORS = (ConnectionError, TimeoutError, OSError)
@retry(
retry=retry_if_exception_type(TRANSIENT_ERRORS),
stop=stop_after_attempt(5) | stop_after_delay(60),
wait=wait_exponential_jitter(initial=1, max=30),
)
def fetch_data(url: str) -> dict:
"""Fetch data with automatic retry on transient failures."""
response = httpx.get(url, timeout=30)
response.raise_for_status()
return response.json()
Whitelist specific transient exceptions. Never retry:
ValueError, TypeError - These are bugs, not transient issues
AuthenticationError - Invalid credentials won't become valid
HTTP 4xx errors (except 429) - Client errors are permanent
from tenacity import retry, retry_if_exception_type import httpx
RETRYABLE_EXCEPTIONS = ( ConnectionError, TimeoutError, httpx.ConnectTimeout, httpx.ReadTimeout, )
@retry( retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS), stop=stop_after_attempt(3), wait=wait_exponential_jitter(initial=1, max=10), ) def resilient_api_call(endpoint: str) -> dict: """Make API call with retry on network issues.""" return httpx.get(endpoint, timeout=10).json()
Retry specific HTTP status codes that indicate transient issues.
from tenacity import retry, retry_if_result, stop_after_attempt
import httpx
RETRY_STATUS_CODES = {429, 502, 503, 504}
def should_retry_response(response: httpx.Response) -> bool:
"""Check if response indicates a retryable error."""
return response.status_code in RETRY_STATUS_CODES
@retry(
retry=retry_if_result(should_retry_response),
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def http_request(method: str, url: str, **kwargs) -> httpx.Response:
"""Make HTTP request with retry on transient status codes."""
return httpx.request(method, url, timeout=30, **kwargs)
Handle both network exceptions and HTTP status codes.
from tenacity import (
retry,
retry_if_exception_type,
retry_if_result,
stop_after_attempt,
wait_exponential_jitter,
before_sleep_log,
)
import logging
import httpx
logger = logging.getLogger(__name__)
TRANSIENT_EXCEPTIONS = (
ConnectionError,
TimeoutError,
httpx.ConnectError,
httpx.ReadTimeout,
)
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}
def is_retryable_response(response: httpx.Response) -> bool:
return response.status_code in RETRY_STATUS_CODES
@retry(
retry=(
retry_if_exception_type(TRANSIENT_EXCEPTIONS) |
retry_if_result(is_retryable_response)
),
stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=1, max=30),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def robust_http_call(
method: str,
url: str,
**kwargs,
) -> httpx.Response:
"""HTTP call with comprehensive retry handling."""
return httpx.request(method, url, timeout=30, **kwargs)
Track retry behavior for debugging and alerting.
from tenacity import retry, stop_after_attempt, wait_exponential
import structlog
logger = structlog.get_logger()
def log_retry_attempt(retry_state):
"""Log detailed retry information."""
exception = retry_state.outcome.exception()
logger.warning(
"Retrying operation",
attempt=retry_state.attempt_number,
exception_type=type(exception).__name__,
exception_message=str(exception),
next_wait_seconds=retry_state.next_action.sleep if retry_state.next_action else None,
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, max=10),
before_sleep=log_retry_attempt,
)
def call_with_logging(request: dict) -> dict:
"""External call with retry logging."""
...
Create reusable timeout decorators for consistent timeout handling.
import asyncio
from functools import wraps
from typing import TypeVar, Callable
T = TypeVar("T")
def with_timeout(seconds: float):
"""Decorator to add timeout to async functions."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=seconds,
)
return wrapper
return decorator
@with_timeout(30)
async def fetch_with_timeout(url: str) -> dict:
"""Fetch URL with 30 second timeout."""
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
Stack decorators to separate infrastructure from business logic.
from functools import wraps
from typing import TypeVar, Callable
import structlog
logger = structlog.get_logger()
T = TypeVar("T")
def traced(name: str | None = None):
"""Add tracing to function calls."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
span_name = name or func.__name__
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
logger.info("Operation started", operation=span_name)
try:
result = await func(*args, **kwargs)
logger.info("Operation completed", operation=span_name)
return result
except Exception as e:
logger.error("Operation failed", operation=span_name, error=str(e))
raise
return wrapper
return decorator
# Stack multiple concerns
@traced("fetch_user_data")
@with_timeout(30)
@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter())
async def fetch_user_data(user_id: str) -> dict:
"""Fetch user with tracing, timeout, and retry."""
...
Pass infrastructure components through constructors for easy testing.
from dataclasses import dataclass
from typing import Protocol
class Logger(Protocol):
def info(self, msg: str, **kwargs) -> None: ...
def error(self, msg: str, **kwargs) -> None: ...
class MetricsClient(Protocol):
def increment(self, metric: str, tags: dict | None = None) -> None: ...
def timing(self, metric: str, value: float) -> None: ...
@dataclass
class UserService:
"""Service with injected infrastructure."""
repository: UserRepository
logger: Logger
metrics: MetricsClient
async def get_user(self, user_id: str) -> User:
self.logger.info("Fetching user", user_id=user_id)
start = time.perf_counter()
try:
user = await self.repository.get(user_id)
self.metrics.increment("user.fetch.success")
return user
except Exception as e:
self.metrics.increment("user.fetch.error")
self.logger.error("Failed to fetch user", user_id=user_id, error=str(e))
raise
finally:
elapsed = time.perf_counter() - start
self.metrics.timing("user.fetch.duration", elapsed)
# Easy to test with fakes
service = UserService(
repository=FakeRepository(),
logger=FakeLogger(),
metrics=FakeMetrics(),
)
Degrade gracefully when non-critical operations fail.
from typing import TypeVar
from collections.abc import Callable
T = TypeVar("T")
def fail_safe(default: T, log_failure: bool = True):
"""Return default value on failure instead of raising."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
try:
return await func(*args, **kwargs)
except Exception as e:
if log_failure:
logger.warning(
"Operation failed, using default",
function=func.__name__,
error=str(e),
)
return default
return wrapper
return decorator
@fail_safe(default=[])
async def get_recommendations(user_id: str) -> list[str]:
"""Get recommendations, return empty list on failure."""
...
stop_after_attempt(5) | stop_after_delay(60)Weekly Installs
2.9K
Repository
GitHub Stars
32.2K
First Seen
Jan 30, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykWarn
Installed on
gemini-cli2.3K
opencode2.3K
codex2.3K
claude-code2.3K
cursor2.1K
github-copilot2.0K
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
102,200 周安装