python-observability by wshobson/agents
npx skills add https://github.com/wshobson/agents --skill python-observability为 Python 应用程序配备结构化日志、指标和追踪。当生产环境出现问题时,你无需部署新代码即可回答“发生了什么、在哪里发生以及为什么发生”。
在生产环境中,以 JSON 格式输出具有一致字段的日志。机器可读的日志支持强大的查询和告警。对于本地开发,请考虑使用人类可读的格式。
追踪每个服务边界的延迟、流量、错误和饱和度。
为单个请求在所有日志和跨度中贯穿一个唯一 ID,实现端到端追踪。
保持指标标签值有界。无界的标签(如用户 ID)会急剧增加存储成本。
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger()
logger.info("Request processed", user_id="123", duration_ms=45)
配置 structlog 以输出具有一致字段的 JSON。
import logging
import structlog
def configure_logging(log_level: str = "INFO") -> None:
"""配置应用程序的结构化日志记录。"""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# 在应用程序启动时初始化
configure_logging("INFO")
logger = structlog.get_logger()
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
每个日志条目都应包含用于过滤和关联的标准字段。
import structlog
from contextvars import ContextVar
# 在上下文中存储关联 ID
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
logger = structlog.get_logger()
def process_request(request: Request) -> Response:
"""使用结构化日志记录处理请求。"""
logger.info(
"Request received",
correlation_id=correlation_id.get(),
method=request.method,
path=request.path,
user_id=request.user_id,
)
try:
result = handle_request(request)
logger.info(
"Request completed",
correlation_id=correlation_id.get(),
status_code=200,
duration_ms=elapsed,
)
return result
except Exception as e:
logger.error(
"Request failed",
correlation_id=correlation_id.get(),
error_type=type(e).__name__,
error_message=str(e),
)
raise
在整个应用程序中一致地使用日志级别。
| 级别 | 目的 | 示例 |
|---|---|---|
DEBUG | 开发诊断 | 变量值、内部状态 |
INFO | 请求生命周期、操作 | 请求开始/结束、作业完成 |
WARNING | 可恢复的异常 | 重试尝试、使用了降级方案 |
ERROR | 需要关注的失败 | 异常、服务不可用 |
# DEBUG: 详细的内部信息
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)
# INFO: 正常的操作事件
logger.info("Order created", order_id=order.id, total=order.total)
# WARNING: 异常但已处理的情况
logger.warning(
"Rate limit approaching",
current_rate=950,
limit=1000,
reset_seconds=30,
)
# ERROR: 需要调查的失败
logger.error(
"Payment processing failed",
order_id=order.id,
error=str(e),
payment_provider="stripe",
)
切勿将预期行为记录为 ERROR。用户输入错误密码是 INFO,而不是 ERROR。
在入口处生成唯一 ID,并将其贯穿所有操作。
from contextvars import ContextVar
import uuid
import structlog
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def set_correlation_id(cid: str | None = None) -> str:
"""为当前上下文设置关联 ID。"""
cid = cid or str(uuid.uuid4())
correlation_id.set(cid)
structlog.contextvars.bind_contextvars(correlation_id=cid)
return cid
# FastAPI 中间件示例
from fastapi import Request
async def correlation_middleware(request: Request, call_next):
"""用于设置和传播关联 ID 的中间件。"""
# 使用传入的头部信息或生成新的 ID
cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
set_correlation_id(cid)
response = await call_next(request)
response.headers["X-Correlation-ID"] = cid
return response
传播到出站请求:
import httpx
async def call_downstream_service(endpoint: str, data: dict) -> dict:
"""使用关联 ID 调用下游服务。"""
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json=data,
headers={"X-Correlation-ID": correlation_id.get()},
)
return response.json()
为每个服务边界追踪这些指标:
from prometheus_client import Counter, Histogram, Gauge
# 延迟:请求耗时
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"Request latency in seconds",
["method", "endpoint", "status"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)
# 流量:请求速率
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
)
# 错误:错误率
ERROR_COUNT = Counter(
"http_errors_total",
"Total HTTP errors",
["method", "endpoint", "error_type"],
)
# 饱和度:资源利用率
DB_POOL_USAGE = Gauge(
"db_connection_pool_used",
"Number of database connections in use",
)
为你的端点添加监控:
import time
from functools import wraps
def track_request(func):
"""用于追踪请求指标的装饰器。"""
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
method = request.method
endpoint = request.url.path
start = time.perf_counter()
try:
response = await func(request, *args, **kwargs)
status = str(response.status_code)
return response
except Exception as e:
status = "500"
ERROR_COUNT.labels(
method=method,
endpoint=endpoint,
error_type=type(e).__name__,
).inc()
raise
finally:
duration = time.perf_counter() - start
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)
return wrapper
避免使用具有无界值的标签,以防止指标爆炸。
# 错误示例:用户 ID 可能有数百万个值
REQUEST_COUNT.labels(method="GET", user_id=user.id) # 不要这样做!
# 正确示例:仅使用有界值
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")
# 如果你需要按用户统计指标,请使用不同的方法:
# - 记录 user_id 并查询日志
# - 使用单独的分析系统
# - 按用户类型/等级进行分桶
REQUEST_COUNT.labels(
method="GET",
endpoint="/users",
user_tier="premium", # 有界的值集合
)
为操作创建可重用的计时上下文管理器。
from contextlib import contextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_operation(name: str, **extra_fields):
"""用于计时和记录操作的上下文管理器。"""
start = time.perf_counter()
logger.debug("Operation started", operation=name, **extra_fields)
try:
yield
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error(
"Operation failed",
operation=name,
duration_ms=round(elapsed_ms, 2),
error=str(e),
**extra_fields,
)
raise
else:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"Operation completed",
operation=name,
duration_ms=round(elapsed_ms, 2),
**extra_fields,
)
# 用法示例
with timed_operation("fetch_user_orders", user_id=user.id):
orders = await order_repository.get_by_user(user.id)
使用 OpenTelemetry 设置分布式追踪。
注意: OpenTelemetry 正在积极发展中。请查阅 官方 Python 文档 以获取最新的 API 模式和最佳实践。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
"""配置 OpenTelemetry 追踪。"""
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str) -> Order:
"""使用追踪处理订单。"""
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
with tracer.start_as_current_span("validate_order"):
validate_order(order_id)
with tracer.start_as_current_span("charge_payment"):
charge_payment(order_id)
with tracer.start_as_current_span("send_confirmation"):
send_confirmation(order_id)
return order
每周安装量
3.1K
代码仓库
GitHub 星标数
32.2K
首次出现
2026 年 1 月 30 日
安全审计
安装于
gemini-cli2.4K
opencode2.4K
codex2.4K
claude-code2.3K
cursor2.2K
github-copilot2.2K
Instrument Python applications with structured logs, metrics, and traces. When something breaks in production, you need to answer "what, where, and why" without deploying new code.
Emit logs as JSON with consistent fields for production environments. Machine-readable logs enable powerful queries and alerts. For local development, consider human-readable formats.
Track latency, traffic, errors, and saturation for every service boundary.
Thread a unique ID through all logs and spans for a single request, enabling end-to-end tracing.
Keep metric label values bounded. Unbounded labels (like user IDs) explode storage costs.
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger()
logger.info("Request processed", user_id="123", duration_ms=45)
Configure structlog for JSON output with consistent fields.
import logging
import structlog
def configure_logging(log_level: str = "INFO") -> None:
"""Configure structured logging for the application."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# Initialize at application startup
configure_logging("INFO")
logger = structlog.get_logger()
Every log entry should include standard fields for filtering and correlation.
import structlog
from contextvars import ContextVar
# Store correlation ID in context
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
logger = structlog.get_logger()
def process_request(request: Request) -> Response:
"""Process request with structured logging."""
logger.info(
"Request received",
correlation_id=correlation_id.get(),
method=request.method,
path=request.path,
user_id=request.user_id,
)
try:
result = handle_request(request)
logger.info(
"Request completed",
correlation_id=correlation_id.get(),
status_code=200,
duration_ms=elapsed,
)
return result
except Exception as e:
logger.error(
"Request failed",
correlation_id=correlation_id.get(),
error_type=type(e).__name__,
error_message=str(e),
)
raise
Use log levels consistently across the application.
| Level | Purpose | Examples |
|---|---|---|
DEBUG | Development diagnostics | Variable values, internal state |
INFO | Request lifecycle, operations | Request start/end, job completion |
WARNING | Recoverable anomalies | Retry attempts, fallback used |
ERROR | Failures needing attention | Exceptions, service unavailable |
# DEBUG: Detailed internal information
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)
# INFO: Normal operational events
logger.info("Order created", order_id=order.id, total=order.total)
# WARNING: Abnormal but handled situations
logger.warning(
"Rate limit approaching",
current_rate=950,
limit=1000,
reset_seconds=30,
)
# ERROR: Failures requiring investigation
logger.error(
"Payment processing failed",
order_id=order.id,
error=str(e),
payment_provider="stripe",
)
Never log expected behavior at ERROR. A user entering a wrong password is INFO, not ERROR.
Generate a unique ID at ingress and thread it through all operations.
from contextvars import ContextVar
import uuid
import structlog
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def set_correlation_id(cid: str | None = None) -> str:
"""Set correlation ID for current context."""
cid = cid or str(uuid.uuid4())
correlation_id.set(cid)
structlog.contextvars.bind_contextvars(correlation_id=cid)
return cid
# FastAPI middleware example
from fastapi import Request
async def correlation_middleware(request: Request, call_next):
"""Middleware to set and propagate correlation ID."""
# Use incoming header or generate new
cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
set_correlation_id(cid)
response = await call_next(request)
response.headers["X-Correlation-ID"] = cid
return response
Propagate to outbound requests:
import httpx
async def call_downstream_service(endpoint: str, data: dict) -> dict:
"""Call downstream service with correlation ID."""
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json=data,
headers={"X-Correlation-ID": correlation_id.get()},
)
return response.json()
Track these metrics for every service boundary:
from prometheus_client import Counter, Histogram, Gauge
# Latency: How long requests take
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"Request latency in seconds",
["method", "endpoint", "status"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)
# Traffic: Request rate
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
)
# Errors: Error rate
ERROR_COUNT = Counter(
"http_errors_total",
"Total HTTP errors",
["method", "endpoint", "error_type"],
)
# Saturation: Resource utilization
DB_POOL_USAGE = Gauge(
"db_connection_pool_used",
"Number of database connections in use",
)
Instrument your endpoints:
import time
from functools import wraps
def track_request(func):
"""Decorator to track request metrics."""
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
method = request.method
endpoint = request.url.path
start = time.perf_counter()
try:
response = await func(request, *args, **kwargs)
status = str(response.status_code)
return response
except Exception as e:
status = "500"
ERROR_COUNT.labels(
method=method,
endpoint=endpoint,
error_type=type(e).__name__,
).inc()
raise
finally:
duration = time.perf_counter() - start
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)
return wrapper
Avoid labels with unbounded values to prevent metric explosion.
# BAD: User ID has potentially millions of values
REQUEST_COUNT.labels(method="GET", user_id=user.id) # Don't do this!
# GOOD: Bounded values only
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")
# If you need per-user metrics, use a different approach:
# - Log the user_id and query logs
# - Use a separate analytics system
# - Bucket users by type/tier
REQUEST_COUNT.labels(
method="GET",
endpoint="/users",
user_tier="premium", # Bounded set of values
)
Create a reusable timing context manager for operations.
from contextlib import contextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_operation(name: str, **extra_fields):
"""Context manager for timing and logging operations."""
start = time.perf_counter()
logger.debug("Operation started", operation=name, **extra_fields)
try:
yield
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error(
"Operation failed",
operation=name,
duration_ms=round(elapsed_ms, 2),
error=str(e),
**extra_fields,
)
raise
else:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"Operation completed",
operation=name,
duration_ms=round(elapsed_ms, 2),
**extra_fields,
)
# Usage
with timed_operation("fetch_user_orders", user_id=user.id):
orders = await order_repository.get_by_user(user.id)
Set up distributed tracing with OpenTelemetry.
Note: OpenTelemetry is actively evolving. Check the official Python documentation for the latest API patterns and best practices.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
"""Configure OpenTelemetry tracing."""
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str) -> Order:
"""Process order with tracing."""
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
with tracer.start_as_current_span("validate_order"):
validate_order(order_id)
with tracer.start_as_current_span("charge_payment"):
charge_payment(order_id)
with tracer.start_as_current_span("send_confirmation"):
send_confirmation(order_id)
return order
Weekly Installs
3.1K
Repository
GitHub Stars
32.2K
First Seen
Jan 30, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykPass
Installed on
gemini-cli2.4K
opencode2.4K
codex2.4K
claude-code2.3K
cursor2.2K
github-copilot2.2K
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
102,200 周安装