重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
microservices-patterns by secondsky/claude-skills
npx skills add https://github.com/secondsky/claude-skills --skill microservices-patterns掌握微服务架构模式,包括服务边界、服务间通信、数据管理和弹性模式,用于构建分布式系统。
按业务能力划分
按子域划分(DDD)
绞杀者模式
同步(请求/响应)
异步(事件/消息)
每个服务独立数据库
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
Saga 模式
断路器
退避重试
舱壁隔离
# Order Service
class OrderService:
async def create_order(self, order_data: dict) -> Order:
order = Order.create(order_data)
await self.event_bus.publish(
OrderCreatedEvent(order_id=order.id, customer_id=order.customer_id)
)
return order
# Payment Service (separate service)
class PaymentService:
async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult:
result = await self.payment_gateway.charge(
amount=payment_request.amount,
customer=payment_request.customer_id
)
if result.success:
await self.event_bus.publish(
PaymentCompletedEvent(order_id=payment_request.order_id)
)
return result
# Inventory Service (separate service)
class InventoryService:
async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult:
for item in items:
available = await self.inventory_repo.get_available(item.product_id)
if available < item.quantity:
return ReservationResult(success=False, error=f"Insufficient inventory")
reservation = await self.create_reservation(order_id, items)
await self.event_bus.publish(InventoryReservedEvent(order_id=order_id))
return ReservationResult(success=True, reservation=reservation)
from fastapi import FastAPI
import httpx
class APIGateway:
"""所有客户端请求的中央入口点。"""
def __init__(self):
self.order_service_url = "http://order-service:8000"
self.payment_service_url = "http://payment-service:8001"
self.http_client = httpx.AsyncClient(timeout=5.0)
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_order_service(self, path: str, method: str = "GET", **kwargs):
"""调用订单服务(带断路器)。"""
response = await self.http_client.request(
method, f"{self.order_service_url}{path}", **kwargs
)
response.raise_for_status()
return response.json()
async def create_order_aggregate(self, order_id: str) -> dict:
"""聚合来自多个服务的数据。"""
order, payment, inventory = await asyncio.gather(
self.call_order_service(f"/orders/{order_id}"),
self.call_payment_service(f"/payments/order/{order_id}"),
self.call_inventory_service(f"/reservations/order/{order_id}"),
return_exceptions=True
)
result = {"order": order}
if not isinstance(payment, Exception):
result["payment"] = payment
if not isinstance(inventory, Exception):
result["inventory"] = inventory
return result
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
class ServiceClient:
"""带重试和超时的 HTTP 客户端。"""
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=httpx.Timeout(5.0, connect=2.0))
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def get(self, path: str, **kwargs):
"""GET 请求(带自动重试)。"""
response = await self.client.get(f"{self.base_url}{path}", **kwargs)
response.raise_for_status()
return response.json()
payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.get("/payments/123")
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
class EventBus:
"""事件发布和订阅。"""
async def publish(self, event: DomainEvent):
"""将事件发布到 Kafka 主题。"""
await self.producer.send_and_wait(
event.event_type,
value=asdict(event),
key=event.aggregate_id.encode()
)
async def subscribe(self, topic: str, handler: callable):
"""订阅事件。"""
consumer = AIOKafkaConsumer(topic, bootstrap_servers=self.bootstrap_servers)
await consumer.start()
async for message in consumer:
await handler(message.value)
# Order Service 发布
await event_bus.publish(OrderCreatedEvent(order_id=order.id))
# Inventory Service 订阅
async def handle_order_created(event_data: dict):
await reserve_inventory(event_data["order_id"], event_data["items"])
class OrderFulfillmentSaga:
"""订单履行的编排式 Saga。"""
def __init__(self):
self.steps = [
SagaStep("create_order", self.create_order, self.cancel_order),
SagaStep("reserve_inventory", self.reserve_inventory, self.release_inventory),
SagaStep("process_payment", self.process_payment, self.refund_payment),
SagaStep("confirm_order", self.confirm_order, self.cancel_order_confirmation)
]
async def execute(self, order_data: dict) -> SagaResult:
completed_steps = []
context = {"order_data": order_data}
try:
for step in self.steps:
result = await step.action(context)
if not result.success:
await self.compensate(completed_steps, context)
return SagaResult(status=SagaStatus.FAILED, error=result.error)
completed_steps.append(step)
context.update(result.data)
return SagaResult(status=SagaStatus.COMPLETED, data=context)
except Exception as e:
await self.compensate(completed_steps, context)
return SagaResult(status=SagaStatus.FAILED, error=str(e))
async def compensate(self, completed_steps: List[SagaStep], context: dict):
"""按相反顺序执行补偿操作。"""
for step in reversed(completed_steps):
await step.compensation(context)
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # 正常操作
OPEN = "open" # 故障状态,拒绝请求
HALF_OPEN = "half_open" # 测试恢复
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
result = await breaker.call(payment_client.process_payment, payment_data)
每周安装量
67
代码仓库
GitHub 星标数
91
首次出现
Jan 25, 2026
安全审计
已安装于
claude-code58
gemini-cli54
codex54
cursor54
opencode53
github-copilot50
Master microservices architecture patterns including service boundaries, inter-service communication, data management, and resilience patterns for building distributed systems.
By Business Capability
By Subdomain (DDD)
Strangler Fig Pattern
Synchronous (Request/Response)
Asynchronous (Events/Messages)
Database Per Service
Saga Pattern
Circuit Breaker
Retry with Backoff
Bulkhead
# Order Service
class OrderService:
async def create_order(self, order_data: dict) -> Order:
order = Order.create(order_data)
await self.event_bus.publish(
OrderCreatedEvent(order_id=order.id, customer_id=order.customer_id)
)
return order
# Payment Service (separate service)
class PaymentService:
async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult:
result = await self.payment_gateway.charge(
amount=payment_request.amount,
customer=payment_request.customer_id
)
if result.success:
await self.event_bus.publish(
PaymentCompletedEvent(order_id=payment_request.order_id)
)
return result
# Inventory Service (separate service)
class InventoryService:
async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult:
for item in items:
available = await self.inventory_repo.get_available(item.product_id)
if available < item.quantity:
return ReservationResult(success=False, error=f"Insufficient inventory")
reservation = await self.create_reservation(order_id, items)
await self.event_bus.publish(InventoryReservedEvent(order_id=order_id))
return ReservationResult(success=True, reservation=reservation)
from fastapi import FastAPI
import httpx
class APIGateway:
"""Central entry point for all client requests."""
def __init__(self):
self.order_service_url = "http://order-service:8000"
self.payment_service_url = "http://payment-service:8001"
self.http_client = httpx.AsyncClient(timeout=5.0)
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_order_service(self, path: str, method: str = "GET", **kwargs):
"""Call order service with circuit breaker."""
response = await self.http_client.request(
method, f"{self.order_service_url}{path}", **kwargs
)
response.raise_for_status()
return response.json()
async def create_order_aggregate(self, order_id: str) -> dict:
"""Aggregate data from multiple services."""
order, payment, inventory = await asyncio.gather(
self.call_order_service(f"/orders/{order_id}"),
self.call_payment_service(f"/payments/order/{order_id}"),
self.call_inventory_service(f"/reservations/order/{order_id}"),
return_exceptions=True
)
result = {"order": order}
if not isinstance(payment, Exception):
result["payment"] = payment
if not isinstance(inventory, Exception):
result["inventory"] = inventory
return result
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
class ServiceClient:
"""HTTP client with retries and timeout."""
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=httpx.Timeout(5.0, connect=2.0))
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def get(self, path: str, **kwargs):
"""GET with automatic retries."""
response = await self.client.get(f"{self.base_url}{path}", **kwargs)
response.raise_for_status()
return response.json()
payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.get("/payments/123")
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
class EventBus:
"""Event publishing and subscription."""
async def publish(self, event: DomainEvent):
"""Publish event to Kafka topic."""
await self.producer.send_and_wait(
event.event_type,
value=asdict(event),
key=event.aggregate_id.encode()
)
async def subscribe(self, topic: str, handler: callable):
"""Subscribe to events."""
consumer = AIOKafkaConsumer(topic, bootstrap_servers=self.bootstrap_servers)
await consumer.start()
async for message in consumer:
await handler(message.value)
# Order Service publishes
await event_bus.publish(OrderCreatedEvent(order_id=order.id))
# Inventory Service subscribes
async def handle_order_created(event_data: dict):
await reserve_inventory(event_data["order_id"], event_data["items"])
class OrderFulfillmentSaga:
"""Orchestrated saga for order fulfillment."""
def __init__(self):
self.steps = [
SagaStep("create_order", self.create_order, self.cancel_order),
SagaStep("reserve_inventory", self.reserve_inventory, self.release_inventory),
SagaStep("process_payment", self.process_payment, self.refund_payment),
SagaStep("confirm_order", self.confirm_order, self.cancel_order_confirmation)
]
async def execute(self, order_data: dict) -> SagaResult:
completed_steps = []
context = {"order_data": order_data}
try:
for step in self.steps:
result = await step.action(context)
if not result.success:
await self.compensate(completed_steps, context)
return SagaResult(status=SagaStatus.FAILED, error=result.error)
completed_steps.append(step)
context.update(result.data)
return SagaResult(status=SagaStatus.COMPLETED, data=context)
except Exception as e:
await self.compensate(completed_steps, context)
return SagaResult(status=SagaStatus.FAILED, error=str(e))
async def compensate(self, completed_steps: List[SagaStep], context: dict):
"""Execute compensating actions in reverse order."""
for step in reversed(completed_steps):
await step.compensation(context)
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
result = await breaker.call(payment_client.process_payment, payment_data)
Weekly Installs
67
Repository
GitHub Stars
91
First Seen
Jan 25, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
claude-code58
gemini-cli54
codex54
cursor54
opencode53
github-copilot50
Kotlin Ktor 服务器模式指南:构建健壮 HTTP API 的架构与最佳实践
1,200 周安装