重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
microservices-expert by personamanagmentlayer/pcl
npx skills add https://github.com/personamanagmentlayer/pcl --skill microservices-expert提供微服务架构、设计模式、服务通信和分布式系统挑战的专家指导。
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
from typing import List, Optional
from circuitbreaker import circuit
import asyncio
# Individual Microservice
app = FastAPI(title="Order Service", version="1.0.0")
class Order(BaseModel):
id: str
user_id: str
items: List[dict]
total: float
status: str
class OrderService:
def __init__(self, inventory_url: str, payment_url: str):
self.inventory_url = inventory_url
self.payment_url = payment_url
self.client = httpx.AsyncClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def check_inventory(self, items: List[dict]) -> bool:
"""使用熔断器检查库存可用性"""
try:
response = await self.client.post(
f"{self.inventory_url}/check",
json={"items": items},
timeout=5.0
)
return response.json()["available"]
except Exception as e:
print(f"Inventory service error: {e}")
raise
@circuit(failure_threshold=5, recovery_timeout=60)
async def process_payment(self, user_id: str, amount: float) -> dict:
"""使用熔断器处理支付"""
try:
response = await self.client.post(
f"{self.payment_url}/charge",
json={"user_id": user_id, "amount": amount},
timeout=10.0
)
return response.json()
except Exception as e:
print(f"Payment service error: {e}")
raise
async def create_order(self, order: Order) -> Order:
"""协调创建订单"""
# 1. 检查库存
inventory_available = await self.check_inventory(order.items)
if not inventory_available:
raise HTTPException(400, "Items not available")
# 2. 处理支付
payment = await self.process_payment(order.user_id, order.total)
if payment["status"] != "success":
raise HTTPException(400, "Payment failed")
# 3. 预留库存
await self.reserve_inventory(order.items)
# 4. 创建订单记录
order.status = "confirmed"
await self.save_order(order)
return order
@app.post("/orders", response_model=Order)
async def create_order(order: Order):
service = OrderService(
inventory_url="http://inventory-service",
payment_url="http://payment-service"
)
return await service.create_order(order)
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from enum import Enum
from typing import List, Callable
import asyncio
class SagaStep:
def __init__(self, action: Callable, compensation: Callable):
self.action = action
self.compensation = compensation
class SagaOrchestrator:
"""使用 Saga 模式编排分布式事务"""
def __init__(self):
self.steps: List[SagaStep] = []
self.completed_steps: List[SagaStep] = []
def add_step(self, action: Callable, compensation: Callable):
"""向 saga 添加一个步骤"""
self.steps.append(SagaStep(action, compensation))
async def execute(self) -> bool:
"""执行 saga,失败时进行补偿"""
try:
# 执行所有步骤
for step in self.steps:
result = await step.action()
self.completed_steps.append(step)
if not result:
await self.compensate()
return False
return True
except Exception as e:
print(f"Saga failed: {e}")
await self.compensate()
return False
async def compensate(self):
"""回滚已完成的步骤"""
# 按相反顺序执行补偿操作
for step in reversed(self.completed_steps):
try:
await step.compensation()
except Exception as e:
print(f"Compensation failed: {e}")
# 示例:订单 Saga
class OrderSaga:
async def create_order_with_saga(self, order_data: dict):
saga = SagaOrchestrator()
# 步骤 1:预留库存
saga.add_step(
action=lambda: self.reserve_inventory(order_data["items"]),
compensation=lambda: self.release_inventory(order_data["items"])
)
# 步骤 2:处理支付
saga.add_step(
action=lambda: self.charge_payment(order_data["user_id"], order_data["total"]),
compensation=lambda: self.refund_payment(order_data["user_id"], order_data["total"])
)
# 步骤 3:创建订单
saga.add_step(
action=lambda: self.create_order_record(order_data),
compensation=lambda: self.delete_order_record(order_data["id"])
)
# 执行 saga
success = await saga.execute()
if success:
await self.send_confirmation(order_data["user_id"])
return {"status": "success", "order_id": order_data["id"]}
else:
return {"status": "failed", "message": "Order creation failed"}
import consul
from typing import List, Optional
import random
class ServiceRegistry:
"""使用 Consul 进行服务发现"""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.Consul(host=consul_host, port=consul_port)
def register_service(self, service_name: str, service_id: str,
host: str, port: int, tags: List[str] = None):
"""向 Consul 注册服务"""
self.consul.agent.service.register(
name=service_name,
service_id=service_id,
address=host,
port=port,
tags=tags or [],
check=consul.Check.http(
f"http://{host}:{port}/health",
interval="10s",
timeout="5s"
)
)
def deregister_service(self, service_id: str):
"""注销服务"""
self.consul.agent.service.deregister(service_id)
def discover_service(self, service_name: str) -> Optional[dict]:
"""发现健康的服务实例"""
_, services = self.consul.health.service(service_name, passing=True)
if not services:
return None
# 负载均衡:随机选择
service = random.choice(services)
return {
"id": service["Service"]["ID"],
"address": service["Service"]["Address"],
"port": service["Service"]["Port"],
"tags": service["Service"]["Tags"]
}
def get_all_services(self, service_name: str) -> List[dict]:
"""获取服务的所有健康实例"""
_, services = self.consul.health.service(service_name, passing=True)
return [
{
"id": s["Service"]["ID"],
"address": s["Service"]["Address"],
"port": s["Service"]["Port"]
}
for s in services
]
from fastapi import FastAPI, Request, Response
import httpx
from typing import Dict
import jwt
app = FastAPI(title="API Gateway")
class APIGateway:
"""用于路由和横切关注点的 API 网关"""
def __init__(self):
self.service_registry = ServiceRegistry()
self.client = httpx.AsyncClient()
async def route_request(self, service: str, path: str,
method: str, **kwargs) -> Response:
"""将请求路由到适当的微服务"""
# 发现服务
service_info = self.service_registry.discover_service(service)
if not service_info:
return Response(
content={"error": "Service unavailable"},
status_code=503
)
# 构建 URL
url = f"http://{service_info['address']}:{service_info['port']}{path}"
# 转发请求
response = await self.client.request(method, url, **kwargs)
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
async def authenticate(self, token: str) -> Optional[dict]:
"""集中式身份验证"""
try:
payload = jwt.decode(token, "secret", algorithms=["HS256"])
return payload
except jwt.JWTError:
return None
async def rate_limit(self, client_id: str) -> bool:
"""集中式速率限制"""
# 使用 Redis 实现
pass
# 网关端点
@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway_route(service: str, path: str, request: Request):
gateway = APIGateway()
# 身份验证
token = request.headers.get("Authorization", "").replace("Bearer ", "")
user = await gateway.authenticate(token)
if not user:
return Response(content={"error": "Unauthorized"}, status_code=401)
# 速率限制
if not await gateway.rate_limit(user["id"]):
return Response(content={"error": "Rate limit exceeded"}, status_code=429)
# 路由请求
return await gateway.route_request(
service=service,
path=f"/{path}",
method=request.method,
headers=dict(request.headers),
content=await request.body()
)
import pika
import json
from typing import Callable, Dict
import asyncio
class EventBus:
"""用于事件驱动通信的消息代理"""
def __init__(self, rabbitmq_url: str):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.handlers: Dict[str, Callable] = {}
def publish_event(self, event_type: str, data: dict):
"""向所有订阅者发布事件"""
self.channel.exchange_declare(
exchange='events',
exchange_type='topic',
durable=True
)
message = json.dumps({
"event_type": event_type,
"data": data,
"timestamp": datetime.now().isoformat()
})
self.channel.basic_publish(
exchange='events',
routing_key=event_type,
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # persistent
)
)
def subscribe(self, event_type: str, handler: Callable):
"""订阅事件类型"""
self.handlers[event_type] = handler
# 声明队列
queue_name = f"{event_type}_queue"
self.channel.queue_declare(queue=queue_name, durable=True)
# 将队列绑定到交换机
self.channel.queue_bind(
queue=queue_name,
exchange='events',
routing_key=event_type
)
# 开始消费
def callback(ch, method, properties, body):
message = json.loads(body)
handler(message["data"])
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback
)
def start_consuming(self):
"""开始消费事件"""
self.channel.start_consuming()
# 使用示例
event_bus = EventBus("amqp://localhost")
# 服务 A 发布事件
event_bus.publish_event("order.created", {
"order_id": "12345",
"user_id": "user_1",
"total": 99.99
})
# 服务 B 订阅事件
def handle_order_created(data):
print(f"Processing order: {data['order_id']}")
# 发送邮件、更新库存等
event_bus.subscribe("order.created", handle_order_created)
❌ 分布式单体 ❌ 服务间共享数据库 ❌ 处处使用同步通信 ❌ 无服务版本控制 ❌ 服务间紧耦合 ❌ 无熔断器 ❌ 缺少分布式追踪
每周安装量
56
代码仓库
GitHub 星标数
12
首次出现
2026年1月24日
安全审计
安装于
opencode47
codex46
gemini-cli44
github-copilot40
cursor40
claude-code38
Expert guidance for microservices architecture, design patterns, service communication, and distributed system challenges.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
from typing import List, Optional
from circuitbreaker import circuit
import asyncio
# Individual Microservice
app = FastAPI(title="Order Service", version="1.0.0")
class Order(BaseModel):
id: str
user_id: str
items: List[dict]
total: float
status: str
class OrderService:
def __init__(self, inventory_url: str, payment_url: str):
self.inventory_url = inventory_url
self.payment_url = payment_url
self.client = httpx.AsyncClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def check_inventory(self, items: List[dict]) -> bool:
"""Check inventory availability with circuit breaker"""
try:
response = await self.client.post(
f"{self.inventory_url}/check",
json={"items": items},
timeout=5.0
)
return response.json()["available"]
except Exception as e:
print(f"Inventory service error: {e}")
raise
@circuit(failure_threshold=5, recovery_timeout=60)
async def process_payment(self, user_id: str, amount: float) -> dict:
"""Process payment with circuit breaker"""
try:
response = await self.client.post(
f"{self.payment_url}/charge",
json={"user_id": user_id, "amount": amount},
timeout=10.0
)
return response.json()
except Exception as e:
print(f"Payment service error: {e}")
raise
async def create_order(self, order: Order) -> Order:
"""Create order with coordination"""
# 1. Check inventory
inventory_available = await self.check_inventory(order.items)
if not inventory_available:
raise HTTPException(400, "Items not available")
# 2. Process payment
payment = await self.process_payment(order.user_id, order.total)
if payment["status"] != "success":
raise HTTPException(400, "Payment failed")
# 3. Reserve inventory
await self.reserve_inventory(order.items)
# 4. Create order record
order.status = "confirmed"
await self.save_order(order)
return order
@app.post("/orders", response_model=Order)
async def create_order(order: Order):
service = OrderService(
inventory_url="http://inventory-service",
payment_url="http://payment-service"
)
return await service.create_order(order)
from enum import Enum
from typing import List, Callable
import asyncio
class SagaStep:
def __init__(self, action: Callable, compensation: Callable):
self.action = action
self.compensation = compensation
class SagaOrchestrator:
"""Orchestrate distributed transactions using Saga pattern"""
def __init__(self):
self.steps: List[SagaStep] = []
self.completed_steps: List[SagaStep] = []
def add_step(self, action: Callable, compensation: Callable):
"""Add a step to the saga"""
self.steps.append(SagaStep(action, compensation))
async def execute(self) -> bool:
"""Execute saga with compensation on failure"""
try:
# Execute all steps
for step in self.steps:
result = await step.action()
self.completed_steps.append(step)
if not result:
await self.compensate()
return False
return True
except Exception as e:
print(f"Saga failed: {e}")
await self.compensate()
return False
async def compensate(self):
"""Rollback completed steps"""
# Execute compensations in reverse order
for step in reversed(self.completed_steps):
try:
await step.compensation()
except Exception as e:
print(f"Compensation failed: {e}")
# Example: Order Saga
class OrderSaga:
async def create_order_with_saga(self, order_data: dict):
saga = SagaOrchestrator()
# Step 1: Reserve inventory
saga.add_step(
action=lambda: self.reserve_inventory(order_data["items"]),
compensation=lambda: self.release_inventory(order_data["items"])
)
# Step 2: Process payment
saga.add_step(
action=lambda: self.charge_payment(order_data["user_id"], order_data["total"]),
compensation=lambda: self.refund_payment(order_data["user_id"], order_data["total"])
)
# Step 3: Create order
saga.add_step(
action=lambda: self.create_order_record(order_data),
compensation=lambda: self.delete_order_record(order_data["id"])
)
# Execute saga
success = await saga.execute()
if success:
await self.send_confirmation(order_data["user_id"])
return {"status": "success", "order_id": order_data["id"]}
else:
return {"status": "failed", "message": "Order creation failed"}
import consul
from typing import List, Optional
import random
class ServiceRegistry:
"""Service discovery using Consul"""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.Consul(host=consul_host, port=consul_port)
def register_service(self, service_name: str, service_id: str,
host: str, port: int, tags: List[str] = None):
"""Register service with Consul"""
self.consul.agent.service.register(
name=service_name,
service_id=service_id,
address=host,
port=port,
tags=tags or [],
check=consul.Check.http(
f"http://{host}:{port}/health",
interval="10s",
timeout="5s"
)
)
def deregister_service(self, service_id: str):
"""Deregister service"""
self.consul.agent.service.deregister(service_id)
def discover_service(self, service_name: str) -> Optional[dict]:
"""Discover healthy service instance"""
_, services = self.consul.health.service(service_name, passing=True)
if not services:
return None
# Load balance: random selection
service = random.choice(services)
return {
"id": service["Service"]["ID"],
"address": service["Service"]["Address"],
"port": service["Service"]["Port"],
"tags": service["Service"]["Tags"]
}
def get_all_services(self, service_name: str) -> List[dict]:
"""Get all healthy instances of a service"""
_, services = self.consul.health.service(service_name, passing=True)
return [
{
"id": s["Service"]["ID"],
"address": s["Service"]["Address"],
"port": s["Service"]["Port"]
}
for s in services
]
from fastapi import FastAPI, Request, Response
import httpx
from typing import Dict
import jwt
app = FastAPI(title="API Gateway")
class APIGateway:
"""API Gateway for routing and cross-cutting concerns"""
def __init__(self):
self.service_registry = ServiceRegistry()
self.client = httpx.AsyncClient()
async def route_request(self, service: str, path: str,
method: str, **kwargs) -> Response:
"""Route request to appropriate microservice"""
# Discover service
service_info = self.service_registry.discover_service(service)
if not service_info:
return Response(
content={"error": "Service unavailable"},
status_code=503
)
# Build URL
url = f"http://{service_info['address']}:{service_info['port']}{path}"
# Forward request
response = await self.client.request(method, url, **kwargs)
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
async def authenticate(self, token: str) -> Optional[dict]:
"""Centralized authentication"""
try:
payload = jwt.decode(token, "secret", algorithms=["HS256"])
return payload
except jwt.JWTError:
return None
async def rate_limit(self, client_id: str) -> bool:
"""Centralized rate limiting"""
# Implementation using Redis
pass
# Gateway endpoints
@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway_route(service: str, path: str, request: Request):
gateway = APIGateway()
# Authentication
token = request.headers.get("Authorization", "").replace("Bearer ", "")
user = await gateway.authenticate(token)
if not user:
return Response(content={"error": "Unauthorized"}, status_code=401)
# Rate limiting
if not await gateway.rate_limit(user["id"]):
return Response(content={"error": "Rate limit exceeded"}, status_code=429)
# Route request
return await gateway.route_request(
service=service,
path=f"/{path}",
method=request.method,
headers=dict(request.headers),
content=await request.body()
)
import pika
import json
from typing import Callable, Dict
import asyncio
class EventBus:
"""Message broker for event-driven communication"""
def __init__(self, rabbitmq_url: str):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.handlers: Dict[str, Callable] = {}
def publish_event(self, event_type: str, data: dict):
"""Publish event to all subscribers"""
self.channel.exchange_declare(
exchange='events',
exchange_type='topic',
durable=True
)
message = json.dumps({
"event_type": event_type,
"data": data,
"timestamp": datetime.now().isoformat()
})
self.channel.basic_publish(
exchange='events',
routing_key=event_type,
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # persistent
)
)
def subscribe(self, event_type: str, handler: Callable):
"""Subscribe to event type"""
self.handlers[event_type] = handler
# Declare queue
queue_name = f"{event_type}_queue"
self.channel.queue_declare(queue=queue_name, durable=True)
# Bind queue to exchange
self.channel.queue_bind(
queue=queue_name,
exchange='events',
routing_key=event_type
)
# Start consuming
def callback(ch, method, properties, body):
message = json.loads(body)
handler(message["data"])
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback
)
def start_consuming(self):
"""Start consuming events"""
self.channel.start_consuming()
# Usage Example
event_bus = EventBus("amqp://localhost")
# Service A publishes event
event_bus.publish_event("order.created", {
"order_id": "12345",
"user_id": "user_1",
"total": 99.99
})
# Service B subscribes to event
def handle_order_created(data):
print(f"Processing order: {data['order_id']}")
# Send email, update inventory, etc.
event_bus.subscribe("order.created", handle_order_created)
❌ Distributed monolith ❌ Shared database between services ❌ Synchronous communication everywhere ❌ No service versioning ❌ Tight coupling between services ❌ No circuit breakers ❌ Missing distributed tracing
Weekly Installs
56
Repository
GitHub Stars
12
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
opencode47
codex46
gemini-cli44
github-copilot40
cursor40
claude-code38
NoSQL专家指南:Cassandra与DynamoDB分布式数据库设计模式与性能优化
263 周安装