python-observability-ops by 0xdarkmatter/claude-mods
npx skills add https://github.com/0xdarkmatter/claude-mods --skill python-observability-ops适用于生产应用程序的日志记录、指标和追踪。
import structlog
# Configure structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
# Usage
logger.info("user_created", user_id=123, email="test@example.com")
# Output: {"event": "user_created", "user_id": 123, "email": "test@example.com", "level": "info", "timestamp": "2024-01-15T10:00:00Z"}
import structlog
from contextvars import ContextVar
from uuid import uuid4
request_id_var: ContextVar[str] = ContextVar("request_id", default="")
def bind_request_context(request_id: str | None = None):
"""Bind request ID to logging context."""
rid = request_id or str(uuid4())
request_id_var.set(rid)
structlog.contextvars.bind_contextvars(request_id=rid)
return rid
# FastAPI middleware
@app.middleware("http")
async def request_context_middleware(request, call_next):
request_id = request.headers.get("X-Request-ID") or str(uuid4())
bind_request_context(request_id)
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
structlog.contextvars.clear_contextvars()
return response
Logging, metrics, and tracing for production applications.
import structlog
# Configure structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
# Usage
logger.info("user_created", user_id=123, email="test@example.com")
# Output: {"event": "user_created", "user_id": 123, "email": "test@example.com", "level": "info", "timestamp": "2024-01-15T10:00:00Z"}
import structlog
from contextvars import ContextVar
from uuid import uuid4
request_id_var: ContextVar[str] = ContextVar("request_id", default="")
def bind_request_context(request_id: str | None = None):
"""Bind request ID to logging context."""
rid = request_id or str(uuid4())
request_id_var.set(rid)
structlog.contextvars.bind_contextvars(request_id=rid)
return rid
# FastAPI middleware
@app.middleware("http")
async def request_context_middleware(request, call_next):
request_id = request.headers.get("X-Request-ID") or str(uuid4())
bind_request_context(request_id)
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
structlog.contextvars.clear_contextvars()
return response
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import FastAPI, Response
# Define metrics
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"HTTP request latency",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
ACTIVE_CONNECTIONS = Gauge(
"active_connections",
"Number of active connections"
)
# Middleware to record metrics
@app.middleware("http")
async def metrics_middleware(request, call_next):
ACTIVE_CONNECTIONS.inc()
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
ACTIVE_CONNECTIONS.dec()
return response
# Metrics endpoint
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type="text/plain"
)
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Setup
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
# Manual instrumentation
async def process_order(order_id: int):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order_id", order_id)
with tracer.start_as_current_span("validate_order"):
await validate(order_id)
with tracer.start_as_current_span("charge_payment"):
await charge(order_id)
| 库 | 用途 |
|---|---|
| structlog | 结构化日志记录 |
| prometheus-client | 指标收集 |
| opentelemetry | 分布式追踪 |
| 指标类型 | 使用场景 |
| --- | --- |
| Counter | 总请求数、错误数 |
| Histogram | 延迟、大小 |
| Gauge | 当前连接数、队列大小 |
./references/structured-logging.md - structlog 配置、格式化器./references/metrics.md - Prometheus 模式、自定义指标./references/tracing.md - OpenTelemetry、分布式追踪./assets/logging-config.py - 生产环境日志配置先决条件:
python-async-ops - 异步上下文传播相关技能:
python-fastapi-ops - 用于指标/追踪的 API 中间件python-cli-ops - CLI 日志记录模式集成技能:
python-database-ops - 数据库查询追踪每周安装数
1
代码仓库
GitHub 星标数
8
首次出现
1 天前
安全审计
安装于
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import FastAPI, Response
# Define metrics
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"HTTP request latency",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
ACTIVE_CONNECTIONS = Gauge(
"active_connections",
"Number of active connections"
)
# Middleware to record metrics
@app.middleware("http")
async def metrics_middleware(request, call_next):
ACTIVE_CONNECTIONS.inc()
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
ACTIVE_CONNECTIONS.dec()
return response
# Metrics endpoint
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type="text/plain"
)
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Setup
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
# Manual instrumentation
async def process_order(order_id: int):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order_id", order_id)
with tracer.start_as_current_span("validate_order"):
await validate(order_id)
with tracer.start_as_current_span("charge_payment"):
await charge(order_id)
| Library | Purpose |
|---|---|
| structlog | Structured logging |
| prometheus-client | Metrics collection |
| opentelemetry | Distributed tracing |
| Metric Type | Use Case |
| --- | --- |
| Counter | Total requests, errors |
| Histogram | Latencies, sizes |
| Gauge | Current connections, queue size |
./references/structured-logging.md - structlog configuration, formatters./references/metrics.md - Prometheus patterns, custom metrics./references/tracing.md - OpenTelemetry, distributed tracing./assets/logging-config.py - Production logging configurationPrerequisites:
python-async-ops - Async context propagationRelated Skills:
python-fastapi-ops - API middleware for metrics/tracingpython-cli-ops - CLI logging patternsIntegration Skills:
python-database-ops - Database query tracingWeekly Installs
1
Repository
GitHub Stars
8
First Seen
1 day ago
Security Audits
Installed on
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1
AI新闻播客制作技能:实时新闻转对话式播客脚本与音频生成
1,200 周安装