重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
npx skills add https://github.com/cosmix/loom --skill logging-observability可观测性通过日志、指标和追踪来理解系统行为。本技能提供了以下模式:
import json
import logging
import sys
from datetime import datetime
from contextvars import ContextVar
from typing import Any
# 用于请求追踪的上下文变量
correlation_id: ContextVar[str] = ContextVar('correlation_id', default='')
span_id: ContextVar[str] = ContextVar('span_id', default='')
class StructuredFormatter(logging.Formatter):
"""用于结构化日志记录的JSON格式化器。"""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"correlation_id": correlation_id.get(),
"span_id": span_id.get(),
}
# 如果存在异常信息则添加
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# 添加额外字段
if hasattr(record, 'structured_data'):
log_data.update(record.structured_data)
return json.dumps(log_data)
def setup_logging():
"""配置结构化日志记录。"""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(StructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)
# 用法示例
logger = logging.getLogger(__name__)
logger.info("User logged in", extra={
"structured_data": {
"user_id": "123",
"ip_address": "192.168.1.1",
"action": "login"
}
})
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
interface LogContext {
correlationId?: string;
spanId?: string;
[key: string]: unknown;
}
interface LogEntry {
timestamp: string;
level: string;
message: string;
context: LogContext;
}
class StructuredLogger {
private context: LogContext = {};
withContext(context: LogContext): StructuredLogger {
const child = new StructuredLogger();
child.context = { ...this.context, ...context };
return child;
}
private log(
level: string,
message: string,
data?: Record<string, unknown>,
): void {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
message,
context: { ...this.context, ...data },
};
console.log(JSON.stringify(entry));
}
debug(message: string, data?: Record<string, unknown>): void {
this.log("DEBUG", message, data);
}
info(message: string, data?: Record<string, unknown>): void {
this.log("INFO", message, data);
}
warn(message: string, data?: Record<string, unknown>): void {
this.log("WARN", message, data);
}
error(message: string, data?: Record<string, unknown>): void {
this.log("ERROR", message, data);
}
}
| 级别 | 用途 | 示例 |
|---|---|---|
| TRACE | 细粒度调试 | 循环迭代、变量值 |
| DEBUG | 诊断信息 | 函数进入/退出、中间状态 |
| INFO | 正常操作 | 请求开始、作业完成、用户操作 |
| WARN | 潜在问题 | 已弃用的API使用、重试尝试、慢查询 |
| ERROR | 需要关注的失败 | 捕获异常、操作失败 |
| FATAL | 严重故障 | 系统无法继续、数据损坏 |
# 日志级别使用示例
logger.debug("Processing item", extra={"structured_data": {"item_id": item.id}})
logger.info("Order processed successfully", extra={"structured_data": {"order_id": order.id, "total": order.total}})
logger.warning("Rate limit approaching", extra={"structured_data": {"current": 95, "limit": 100}})
logger.error("Payment failed", extra={"structured_data": {"order_id": order.id, "error": str(e)}})
import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class Span:
name: str
trace_id: str
span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16])
parent_span_id: Optional[str] = None
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
attributes: dict = field(default_factory=dict)
def end(self):
self.end_time = time.time()
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0
current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)
class Tracer:
def __init__(self, service_name: str):
self.service_name = service_name
def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
parent = parent or current_span.get()
trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
parent_span_id = parent.span_id if parent else None
span = Span(
name=name,
trace_id=trace_id,
parent_span_id=parent_span_id,
attributes={"service": self.service_name}
)
current_span.set(span)
return span
def end_span(self, span: Span):
span.end()
self._export(span)
# 如果存在父span则恢复
# 在生产环境中,使用span栈
def _export(self, span: Span):
"""将span导出到追踪后端。"""
logger.info(f"Span completed: {span.name}", extra={
"structured_data": {
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_span_id": span.parent_span_id,
"duration_ms": span.duration_ms,
"attributes": span.attributes
}
})
# 用于span的上下文管理器
from contextlib import contextmanager
@contextmanager
def trace_span(tracer: Tracer, name: str):
span = tracer.start_span(name)
try:
yield span
except Exception as e:
span.attributes["error"] = True
span.attributes["error.message"] = str(e)
raise
finally:
tracer.end_span(span)
# 用法示例
tracer = Tracer("order-service")
async def process_order(order_id: str):
with trace_span(tracer, "process_order") as span:
span.attributes["order_id"] = order_id
with trace_span(tracer, "validate_order"):
await validate(order_id)
with trace_span(tracer, "charge_payment"):
await charge(order_id)
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
import time
import threading
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@dataclass
class Counter:
name: str
labels: Dict[str, str]
value: float = 0
def inc(self, amount: float = 1):
self.value += amount
@dataclass
class Gauge:
name: str
labels: Dict[str, str]
value: float = 0
def set(self, value: float):
self.value = value
def inc(self, amount: float = 1):
self.value += amount
def dec(self, amount: float = 1):
self.value -= amount
@dataclass
class Histogram:
name: str
labels: Dict[str, str]
buckets: List[float]
values: List[float] = None
def __post_init__(self):
self.values = []
self._bucket_counts = {b: 0 for b in self.buckets}
self._bucket_counts[float('inf')] = 0
self._sum = 0
self._count = 0
def observe(self, value: float):
self.values.append(value)
self._sum += value
self._count += 1
for bucket in sorted(self._bucket_counts.keys()):
if value <= bucket:
self._bucket_counts[bucket] += 1
class MetricsRegistry:
def __init__(self):
self._metrics: Dict[str, any] = {}
self._lock = threading.Lock()
def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Counter(name, labels or {})
return self._metrics[key]
def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Gauge(name, labels or {})
return self._metrics[key]
def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Histogram(name, labels or {}, buckets)
return self._metrics[key]
# 用法示例
metrics = MetricsRegistry()
# 请求计数器
request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"})
request_counter.inc()
# 活动连接数仪表盘
active_connections = metrics.gauge("active_connections")
active_connections.inc()
# ... 处理连接 ...
active_connections.dec()
# 请求持续时间直方图
request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
start = time.time()
# ... 处理请求 ...
request_duration.observe(time.time() - start)
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
def setup_opentelemetry(service_name: str, otlp_endpoint: str):
"""使用OTLP导出初始化OpenTelemetry。"""
# 追踪设置
trace_provider = TracerProvider(
resource=Resource.create({"service.name": service_name})
)
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)
trace.set_tracer_provider(trace_provider)
# 指标设置
metric_provider = MeterProvider(
resource=Resource.create({"service.name": service_name})
)
metrics.set_meter_provider(metric_provider)
# 自动插桩
RequestsInstrumentor().instrument()
return trace.get_tracer(service_name), metrics.get_meter(service_name)
# 与FastAPI一起使用
from fastapi import FastAPI
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
tracer, meter = setup_opentelemetry("order-service", "http://otel-collector:4317")
# 自定义span
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
with tracer.start_as_current_span("fetch_order") as span:
span.set_attribute("order.id", order_id)
order = await order_repository.get(order_id)
span.set_attribute("order.status", order.status)
return order
# Logstash流水线配置
input {
file {
path => "/var/log/app/*.log"
codec => json
}
}
filter {
# 解析结构化JSON日志
json {
source => "message"
}
# 基于日期添加Elasticsearch索引
mutate {
add_field => {
"[@metadata][index]" => "app-logs-%{+YYYY.MM.dd}"
}
}
# 使用地理位置信息丰富日志(如果存在IP)
geoip {
source => "ip_address"
target => "geo"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][index]}"
}
}
# Promtail抓取配置
scrape_configs:
- job_name: app-logs
static_configs:
- targets:
- localhost
labels:
job: app-logs
__path__: /var/log/app/*.log
# 将JSON字段提取为标签
pipeline_stages:
- json:
expressions:
level: level
correlation_id: correlation_id
service: service
- labels:
level:
correlation_id:
service:
# datadog.yaml
logs_enabled: true
logs_config:
processing_rules:
- type: exclude_at_match
name: exclude_healthcheck
pattern: "GET /health"
# 自动解析JSON日志
auto_multi_line_detection: true
# 从文件收集日志
logs:
- type: file
path: "/var/log/app/*.log"
service: "order-service"
source: "python"
tags:
- "env:production"
# Prometheus告警规则
groups:
- name: service-alerts
rules:
# 高错误率告警
- alert: HighErrorRate
expr: |
sum(rate(http_requests_total{status=~"5.."}[5m]))
/ sum(rate(http_requests_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "检测到高错误率"
description: "过去5分钟内错误率为 {{ $value | humanizePercentage }}"
runbook_url: "https://wiki.example.com/runbooks/high-error-rate"
# 高延迟告警
- alert: HighLatency
expr: |
histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1
for: 10m
labels:
severity: warning
annotations:
summary: "检测到高延迟"
description: "95百分位延迟为 {{ $value }}秒"
# 服务宕机告警
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "服务 {{ $labels.instance }} 已宕机"
description: "{{ $labels.job }} 已宕机超过1分钟"
| 级别 | 响应时间 | 示例 |
|---|---|---|
| Critical | 立即 | 服务宕机、高错误率、数据丢失 |
| Warning | 工作时间 | 高延迟、接近限制、重试激增 |
| Info | 仅记录 | 部署开始、配置更改 |
使用适当的日志级别:开发时使用DEBUG,正常操作时使用INFO,潜在问题时使用WARN,失败时使用ERROR,严重故障时使用FATAL。
包含上下文:始终在结构化字段中包含关联ID、追踪ID、用户ID和相关的业务标识符。
避免敏感数据:切勿记录密码、令牌、信用卡或个人身份信息。必要时实施自动脱敏。
使用结构化日志记录:JSON日志便于在日志聚合系统(ELK、Loki、Datadog)中进行解析和查询。
一致的字段命名:跨服务标准化字段名称(例如,始终使用correlation_id,而不是有时使用request_id)。
追踪边界:在服务边界、数据库调用、外部API调用和重要操作处创建span。
传播上下文:通过HTTP头部跨服务边界传递追踪ID和span ID(遵循OpenTelemetry标准)。
添加有意义的属性:在span属性中包含业务上下文(user_id、order_id)和技术上下文(db_query、cache_hit)。
适当采样:使用自适应采样 - 追踪100%的错误,根据流量量对成功请求进行采样。
追踪黄金信号:监控四大黄金信号 - 延迟、流量、错误、饱和度。
使用正确的指标类型:计数器用于总量(请求数),仪表盘用于当前值(内存),直方图用于分布(延迟)。
标签基数:保持标签基数较低 - 避免在指标标签中使用像用户ID这样的高基数值。
命名约定:遵循Prometheus命名规范 - http_requests_total(计数器)、process_memory_bytes(仪表盘)、http_request_duration_seconds(直方图)。
基于症状告警:对影响用户的问题(错误率、延迟)进行告警,而不是原因(CPU使用率)。症状表明什么坏了,原因解释为什么坏了。
包含操作手册:每个告警必须链接到包含调查步骤、常见原因和修复程序的操作手册。
使用适当的阈值:基于SLO和历史数据设置阈值,而不是任意值。
告警疲劳:确保告警是可操作的。不可操作的告警会导致告警疲劳并忽略关键问题。
端到端关联:使用关联ID将日志、追踪和指标链接起来,以实现跨系统调试。
集中化:使用集中式日志聚合(ELK、Loki)和追踪收集(Jaeger、Zipkin)以实现跨服务可见性。
测试可观测性:在开发中验证日志记录、追踪和指标 - 不要在生产中发现差距。
import time
import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
class ObservabilityMiddleware(BaseHTTPMiddleware):
def __init__(self, app, tracer, metrics):
super().__init__(app)
self.tracer = tracer
self.request_counter = metrics.counter("http_requests_total")
self.request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
async def dispatch(self, request: Request, call_next):
# 提取或生成关联ID
corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
correlation_id.set(corr_id)
start_time = time.time()
with self.tracer.start_as_current_span(
f"{request.method} {request.url.path}"
) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", str(request.url))
span.set_attribute("correlation_id", corr_id)
try:
response = await call_next(request)
span.set_attribute("http.status_code", response.status_code)
# 记录指标
labels = {
"method": request.method,
"path": request.url.path,
"status": str(response.status_code)
}
self.request_counter.labels(**labels).inc()
self.request_duration.labels(**labels).observe(
time.time() - start_time
)
# 将关联ID添加到响应中
response.headers["X-Correlation-ID"] = corr_id
return response
except Exception as e:
span.set_attribute("error", True)
span.record_exception(e)
raise
每周安装数
43
仓库
GitHub星标数
28
首次出现
2026年1月23日
安全审计
安装于
opencode33
codex32
github-copilot30
gemini-cli29
claude-code27
cursor24
Observability enables understanding system behavior through logs, metrics, and traces. This skill provides patterns for:
import json
import logging
import sys
from datetime import datetime
from contextvars import ContextVar
from typing import Any
# Context variables for request tracking
correlation_id: ContextVar[str] = ContextVar('correlation_id', default='')
span_id: ContextVar[str] = ContextVar('span_id', default='')
class StructuredFormatter(logging.Formatter):
"""JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"correlation_id": correlation_id.get(),
"span_id": span_id.get(),
}
# Add exception info if present
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# Add extra fields
if hasattr(record, 'structured_data'):
log_data.update(record.structured_data)
return json.dumps(log_data)
def setup_logging():
"""Configure structured logging."""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(StructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)
# Usage
logger = logging.getLogger(__name__)
logger.info("User logged in", extra={
"structured_data": {
"user_id": "123",
"ip_address": "192.168.1.1",
"action": "login"
}
})
interface LogContext {
correlationId?: string;
spanId?: string;
[key: string]: unknown;
}
interface LogEntry {
timestamp: string;
level: string;
message: string;
context: LogContext;
}
class StructuredLogger {
private context: LogContext = {};
withContext(context: LogContext): StructuredLogger {
const child = new StructuredLogger();
child.context = { ...this.context, ...context };
return child;
}
private log(
level: string,
message: string,
data?: Record<string, unknown>,
): void {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
message,
context: { ...this.context, ...data },
};
console.log(JSON.stringify(entry));
}
debug(message: string, data?: Record<string, unknown>): void {
this.log("DEBUG", message, data);
}
info(message: string, data?: Record<string, unknown>): void {
this.log("INFO", message, data);
}
warn(message: string, data?: Record<string, unknown>): void {
this.log("WARN", message, data);
}
error(message: string, data?: Record<string, unknown>): void {
this.log("ERROR", message, data);
}
}
| Level | Usage | Examples |
|---|---|---|
| TRACE | Fine-grained debugging | Loop iterations, variable values |
| DEBUG | Diagnostic information | Function entry/exit, intermediate states |
| INFO | Normal operations | Request started, job completed, user action |
| WARN | Potential issues | Deprecated API usage, retry attempted, slow query |
| ERROR | Failures requiring attention | Exception caught, operation failed |
| FATAL | Critical failures | System cannot continue, data corruption |
# Log level usage examples
logger.debug("Processing item", extra={"structured_data": {"item_id": item.id}})
logger.info("Order processed successfully", extra={"structured_data": {"order_id": order.id, "total": order.total}})
logger.warning("Rate limit approaching", extra={"structured_data": {"current": 95, "limit": 100}})
logger.error("Payment failed", extra={"structured_data": {"order_id": order.id, "error": str(e)}})
import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class Span:
name: str
trace_id: str
span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16])
parent_span_id: Optional[str] = None
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
attributes: dict = field(default_factory=dict)
def end(self):
self.end_time = time.time()
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0
current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)
class Tracer:
def __init__(self, service_name: str):
self.service_name = service_name
def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
parent = parent or current_span.get()
trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
parent_span_id = parent.span_id if parent else None
span = Span(
name=name,
trace_id=trace_id,
parent_span_id=parent_span_id,
attributes={"service": self.service_name}
)
current_span.set(span)
return span
def end_span(self, span: Span):
span.end()
self._export(span)
# Restore parent span if exists
# In production, use a span stack
def _export(self, span: Span):
"""Export span to tracing backend."""
logger.info(f"Span completed: {span.name}", extra={
"structured_data": {
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_span_id": span.parent_span_id,
"duration_ms": span.duration_ms,
"attributes": span.attributes
}
})
# Context manager for spans
from contextlib import contextmanager
@contextmanager
def trace_span(tracer: Tracer, name: str):
span = tracer.start_span(name)
try:
yield span
except Exception as e:
span.attributes["error"] = True
span.attributes["error.message"] = str(e)
raise
finally:
tracer.end_span(span)
# Usage
tracer = Tracer("order-service")
async def process_order(order_id: str):
with trace_span(tracer, "process_order") as span:
span.attributes["order_id"] = order_id
with trace_span(tracer, "validate_order"):
await validate(order_id)
with trace_span(tracer, "charge_payment"):
await charge(order_id)
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
import time
import threading
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@dataclass
class Counter:
name: str
labels: Dict[str, str]
value: float = 0
def inc(self, amount: float = 1):
self.value += amount
@dataclass
class Gauge:
name: str
labels: Dict[str, str]
value: float = 0
def set(self, value: float):
self.value = value
def inc(self, amount: float = 1):
self.value += amount
def dec(self, amount: float = 1):
self.value -= amount
@dataclass
class Histogram:
name: str
labels: Dict[str, str]
buckets: List[float]
values: List[float] = None
def __post_init__(self):
self.values = []
self._bucket_counts = {b: 0 for b in self.buckets}
self._bucket_counts[float('inf')] = 0
self._sum = 0
self._count = 0
def observe(self, value: float):
self.values.append(value)
self._sum += value
self._count += 1
for bucket in sorted(self._bucket_counts.keys()):
if value <= bucket:
self._bucket_counts[bucket] += 1
class MetricsRegistry:
def __init__(self):
self._metrics: Dict[str, any] = {}
self._lock = threading.Lock()
def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Counter(name, labels or {})
return self._metrics[key]
def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Gauge(name, labels or {})
return self._metrics[key]
def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Histogram(name, labels or {}, buckets)
return self._metrics[key]
# Usage
metrics = MetricsRegistry()
# Counter for requests
request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"})
request_counter.inc()
# Gauge for active connections
active_connections = metrics.gauge("active_connections")
active_connections.inc()
# ... handle connection ...
active_connections.dec()
# Histogram for request duration
request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
start = time.time()
# ... handle request ...
request_duration.observe(time.time() - start)
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
def setup_opentelemetry(service_name: str, otlp_endpoint: str):
"""Initialize OpenTelemetry with OTLP export."""
# Tracing setup
trace_provider = TracerProvider(
resource=Resource.create({"service.name": service_name})
)
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)
trace.set_tracer_provider(trace_provider)
# Metrics setup
metric_provider = MeterProvider(
resource=Resource.create({"service.name": service_name})
)
metrics.set_meter_provider(metric_provider)
# Auto-instrumentation
RequestsInstrumentor().instrument()
return trace.get_tracer(service_name), metrics.get_meter(service_name)
# Usage with FastAPI
from fastapi import FastAPI
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
tracer, meter = setup_opentelemetry("order-service", "http://otel-collector:4317")
# Custom spans
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
with tracer.start_as_current_span("fetch_order") as span:
span.set_attribute("order.id", order_id)
order = await order_repository.get(order_id)
span.set_attribute("order.status", order.status)
return order
# Logstash pipeline configuration
input {
file {
path => "/var/log/app/*.log"
codec => json
}
}
filter {
# Parse structured JSON logs
json {
source => "message"
}
# Add Elasticsearch index based on date
mutate {
add_field => {
"[@metadata][index]" => "app-logs-%{+YYYY.MM.dd}"
}
}
# Enrich with geolocation (if IP present)
geoip {
source => "ip_address"
target => "geo"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][index]}"
}
}
# Promtail scrape configuration
scrape_configs:
- job_name: app-logs
static_configs:
- targets:
- localhost
labels:
job: app-logs
__path__: /var/log/app/*.log
# Extract JSON fields as labels
pipeline_stages:
- json:
expressions:
level: level
correlation_id: correlation_id
service: service
- labels:
level:
correlation_id:
service:
# datadog.yaml
logs_enabled: true
logs_config:
processing_rules:
- type: exclude_at_match
name: exclude_healthcheck
pattern: "GET /health"
# Auto-parse JSON logs
auto_multi_line_detection: true
# Log collection from files
logs:
- type: file
path: "/var/log/app/*.log"
service: "order-service"
source: "python"
tags:
- "env:production"
# Prometheus alerting rules
groups:
- name: service-alerts
rules:
# High error rate alert
- alert: HighErrorRate
expr: |
sum(rate(http_requests_total{status=~"5.."}[5m]))
/ sum(rate(http_requests_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes"
runbook_url: "https://wiki.example.com/runbooks/high-error-rate"
# High latency alert
- alert: HighLatency
expr: |
histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1
for: 10m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "95th percentile latency is {{ $value }}s"
# Service down alert
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service {{ $labels.instance }} is down"
description: "{{ $labels.job }} has been down for more than 1 minute"
| Level | Response Time | Examples |
|---|---|---|
| Critical | Immediate | Service down, high error rate, data loss |
| Warning | Business hrs | High latency, approaching limits, retry spikes |
| Info | Log only | Deployment started, config changed |
Log at Appropriate Levels : DEBUG for development, INFO for normal operations, WARN for potential issues, ERROR for failures, FATAL for critical failures.
Include Context : Always include correlation IDs, trace IDs, user IDs, and relevant business identifiers in structured fields.
Avoid Sensitive Data : Never log passwords, tokens, credit cards, or PII. Implement automatic redaction when necessary.
Use Structured Logging : JSON logs enable easy parsing and querying in log aggregation systems (ELK, Loki, Datadog).
Consistent Field Names : Standardize field names across services (e.g., always use correlation_id, not sometimes request_id).
Trace Boundaries : Create spans at service boundaries, database calls, external API calls, and significant operations.
Propagate Context : Pass trace IDs and span IDs across service boundaries via HTTP headers (OpenTelemetry standards).
Add Meaningful Attributes : Include business context (user_id, order_id) and technical context (db_query, cache_hit) in span attributes.
Sample Appropriately : Use adaptive sampling - trace 100% of errors, sample successful requests based on traffic volume.
Track Golden Signals : Monitor the Four Golden Signals - latency, traffic, errors, saturation.
Use Correct Metric Types : Counters for totals (requests), Gauges for current values (memory), Histograms for distributions (latency).
Label Cardinality : Keep label cardinality low - avoid high-cardinality values like user IDs in metric labels.
Naming Conventions : Follow Prometheus naming - http_requests_total (counter), process_memory_bytes (gauge), http_request_duration_seconds (histogram).
Alert on Symptoms : Alert on user-impacting issues (error rate, latency), not causes (CPU usage). Symptoms indicate what is broken, causes explain why.
Include Runbooks : Every alert must link to a runbook with investigation steps, common causes, and remediation procedures.
Use Appropriate Thresholds : Set thresholds based on SLOs and historical data, not arbitrary values.
Alert Fatigue : Ensure alerts are actionable. Non-actionable alerts lead to alert fatigue and ignored critical issues.
End-to-End Correlation : Link logs, traces, and metrics using correlation IDs to enable cross-system debugging.
Centralize : Use centralized log aggregation (ELK, Loki) and trace collection (Jaeger, Zipkin) for cross-service visibility.
Test Observability : Verify logging, tracing, and metrics in development - don't discover gaps in production.
import time
import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
class ObservabilityMiddleware(BaseHTTPMiddleware):
def __init__(self, app, tracer, metrics):
super().__init__(app)
self.tracer = tracer
self.request_counter = metrics.counter("http_requests_total")
self.request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
async def dispatch(self, request: Request, call_next):
# Extract or generate correlation ID
corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
correlation_id.set(corr_id)
start_time = time.time()
with self.tracer.start_as_current_span(
f"{request.method} {request.url.path}"
) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", str(request.url))
span.set_attribute("correlation_id", corr_id)
try:
response = await call_next(request)
span.set_attribute("http.status_code", response.status_code)
# Record metrics
labels = {
"method": request.method,
"path": request.url.path,
"status": str(response.status_code)
}
self.request_counter.labels(**labels).inc()
self.request_duration.labels(**labels).observe(
time.time() - start_time
)
# Add correlation ID to response
response.headers["X-Correlation-ID"] = corr_id
return response
except Exception as e:
span.set_attribute("error", True)
span.record_exception(e)
raise
Weekly Installs
43
Repository
GitHub Stars
28
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykPass
Installed on
opencode33
codex32
github-copilot30
gemini-cli29
claude-code27
cursor24
Azure 升级评估与自动化工具 - 轻松迁移 Functions 计划、托管层级和 SKU
127,000 周安装