重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
recommendation-system by secondsky/claude-skills
npx skills add https://github.com/secondsky/claude-skills --skill recommendation-system适用于可扩展推荐系统的生产就绪架构,包含特征存储、多层缓存、A/B测试和全面监控。
在以下场景加载此技能:
# 1. 安装依赖
pip install fastapi==0.109.0 redis==5.0.0 prometheus-client==0.19.0
# 2. 启动 Redis(用于缓存和特征存储)
docker run -d -p 6379:6379 redis:alpine
# 3. 创建推荐服务:app.py
cat > app.py << 'EOF'
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import redis
import json
app = FastAPI()
cache = redis.Redis(host='localhost', port=6379, decode_responses=True)
class RecommendationResponse(BaseModel):
user_id: str
items: List[str]
cached: bool
@app.post("/recommendations", response_model=RecommendationResponse)
async def get_recommendations(user_id: str, n: int = 10):
# 检查缓存
cache_key = f"recs:{user_id}:{n}"
cached = cache.get(cache_key)
if cached:
return RecommendationResponse(
user_id=user_id,
items=json.loads(cached),
cached=True
)
# 生成推荐(简化版)
items = [f"item_{i}" for i in range(n)]
# 缓存5分钟
cache.setex(cache_key, 300, json.dumps(items))
return RecommendationResponse(
user_id=user_id,
items=items,
cached=False
)
@app.get("/health")
async def health():
return {"status": "healthy"}
EOF
# 4. 运行 API
uvicorn app:app --host 0.0.0.0 --port 8000
# 5. 测试
curl -X POST "http://localhost:8000/recommendations?user_id=user_123&n=10"
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
结果:在5分钟内构建一个带缓存的工作推荐API。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 用户事件 │────▶│ 特征 │────▶│ 模型 │
│ (点击, │ │ 存储 │ │ 服务 │
│ 购买) │ │ (Redis) │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ 训练 │ │ API │
│ 管道 │ │ (FastAPI) │
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ 监控 │
│ (Prometheus)│
└─────────────┘
用于用户和物品特征的集中存储:
import redis
import json
class FeatureStore:
"""使用 Redis 缓存实现快速特征访问。"""
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1 小时
def get_user_features(self, user_id: str) -> dict:
cache_key = f"user_features:{user_id}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 从数据库获取
features = fetch_from_db(user_id)
# 缓存
self.redis.setex(cache_key, self.ttl, json.dumps(features))
return features
为 A/B 测试提供多个模型:
class ModelServing:
"""提供多个推荐模型。"""
def __init__(self):
self.models = {}
def register_model(self, name: str, model, is_default: bool = False):
self.models[name] = model
if is_default:
self.default_model = name
def predict(self, user_features: dict, item_features: list, model_name: str = None):
model = self.models.get(model_name or self.default_model)
return model.predict(user_features, item_features)
用于低延迟的多层缓存:
class TieredCache:
"""L1(内存)-> L2(Redis)-> L3(数据库)。"""
def __init__(self, redis_client):
self.l1_cache = {} # 内存
self.redis = redis_client # L2
def get(self, key: str):
# L1: 内存(最快)
if key in self.l1_cache:
return self.l1_cache[key]
# L2: Redis
cached = self.redis.get(key)
if cached:
value = json.loads(cached)
self.l1_cache[key] = value # 提升到 L1
return value
# L3: 未命中(从数据库获取)
return None
| 指标 | 描述 | 目标 |
|---|---|---|
| 点击率 | 点击率 | >5% |
| 转化率 | 推荐带来的购买转化 | >2% |
| P95延迟 | 95分位响应时间 | <200ms |
| 缓存命中率 | 从缓存提供的百分比 | >80% |
| 覆盖率 | 被推荐的目录百分比 | >50% |
| 多样性 | 推荐内容的多样性 | >0.7 |
问题:没有历史记录的用户无法获得推荐,初始体验差。
解决方案:使用基于流行度的后备方案:
def get_recommendations(user_id: str, n: int = 10):
user_features = feature_store.get_user_features(user_id)
# 检查是否为新用户(无购买历史)
if user_features.get('total_purchases', 0) == 0:
# 回退到热门物品
return get_popular_items(n)
# 个性化推荐
return generate_personalized_recs(user_id, n)
问题:用户购买后,缓存仍显示已购买的物品在推荐列表中。
解决方案:在相关操作时使缓存失效:
INVALIDATING_ACTIONS = {'purchase', 'rating', 'add_to_cart'}
def on_user_action(user_id: str, action: str):
if action in INVALIDATING_ACTIONS:
cache_key = f"recs:{user_id}:*"
redis_client.delete(cache_key)
logger.info(f"Invalidated cache for {user_id} due to {action}")
问题:大量用户的缓存同时过期,导致数据库/模型过载。
解决方案:为 TTL 添加随机抖动:
import random
def set_cache(key: str, value: dict, base_ttl: int = 300):
# 添加 ±10% 的抖动
jitter = random.uniform(-0.1, 0.1) * base_ttl
ttl = int(base_ttl + jitter)
redis_client.setex(key, ttl, json.dumps(value))
问题:推荐内容过于相似,用户只看到同一类别。
解决方案:实现多样性约束:
def rank_with_diversity(items: list, scores: list, n: int = 10):
selected = []
category_counts = {}
for item, score in sorted(zip(items, scores), key=lambda x: -x[1]):
category = item['category']
# 每个类别限制3个物品
if category_counts.get(category, 0) >= 3:
continue
selected.append(item)
category_counts[category] = category_counts.get(category, 0) + 1
if len(selected) >= n:
break
return selected
问题:推荐质量下降,直到用户投诉才被发现。
解决方案:持续监控并设置告警:
from prometheus_client import Counter, Histogram
recommendation_clicks = Counter('recommendation_clicks_total')
recommendation_latency = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
start = time.time()
recs = generate_recs(user_id)
latency = time.time() - start
recommendation_latency.observe(latency)
return recs
@app.post("/track/click")
async def track_click(user_id: str, item_id: str):
recommendation_clicks.inc()
# 如果点击率低于3%则告警
问题:用户偏好改变但特征未更新,推荐内容不相关。
解决方案:设置适当的 TTL 和更新触发器:
class FeatureStore:
def __init__(self, redis_client):
self.redis = redis_client
# 对频繁变化的特征设置较短的 TTL
self.user_ttl = 300 # 5 分钟
self.item_ttl = 3600 # 1 小时
def update_on_event(self, user_id: str, event: str):
# 在重要事件时使特征失效
if event in ['purchase', 'rating']:
self.redis.delete(f"user_features:{user_id}")
logger.info(f"Refreshed features for {user_id}")
问题:过早宣布胜出者,结果不具备统计显著性。
解决方案:首先计算所需样本量:
def calculate_sample_size(
baseline_rate: float,
min_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8
) -> int:
"""计算每个变体所需的样本量。"""
from scipy import stats
z_alpha = stats.norm.ppf(1 - alpha/2)
z_beta = stats.norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + min_detectable_effect)
p_avg = (p1 + p2) / 2
n = (
(z_alpha + z_beta)**2 * 2 * p_avg * (1 - p_avg) /
(p2 - p1)**2
)
return int(n)
# 示例:检测基线点击率5%时10%的提升
n_required = calculate_sample_size(
baseline_rate=0.05,
min_detectable_effect=0.10
)
print(f"Required sample size: {n_required} per variant")
# 在得出结论前,等待两个变体都达到此样本量
为详细的生产实现加载参考文件:
references/production-architecture.md 以获取完整的 FeatureStore、ModelServing 和 RecommendationService 实现,包括批量获取、缓存集成和 FastAPI 部署模式。references/caching-strategies.md。references/ab-testing-framework.md 以获取确定性变体分配、汤普森采样(多臂老虎机)、贝叶斯和频率论显著性测试以及实验跟踪。references/monitoring-alerting.md 以获取 Prometheus 指标集成、仪表板端点、告警规则和质量监控(多样性、覆盖率)。class RecommendationService:
def __init__(self, feature_store, model_serving, cache):
self.feature_store = feature_store
self.model_serving = model_serving
self.cache = cache
def get_recommendations(self, user_id: str, n: int = 10):
# 1. 检查缓存
cached = self.cache.get(f"recs:{user_id}:{n}")
if cached:
return cached
# 2. 获取特征
user_features = self.feature_store.get_user_features(user_id)
candidates = self.get_candidates(user_id)
# 3. 为候选物品评分
scores = self.model_serving.predict(user_features, candidates)
# 4. 考虑多样性进行排序
recommendations = self.rank_with_diversity(candidates, scores, n)
# 5. 缓存
self.cache.set(f"recs:{user_id}:{n}", recommendations, ttl=300)
return recommendations
def assign_variant(user_id: str, experiment_id: str) -> str:
"""确定性分配 - 同一用户始终获得同一变体。"""
import hashlib
hash_input = f"{user_id}:{experiment_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
# 50/50 分割
return 'control' if hash_value % 2 == 0 else 'treatment'
# 用法
variant = assign_variant('user_123', 'rec_algo_v2')
model_name = 'main' if variant == 'control' else 'experimental'
recs = get_recommendations(user_id, model_name=model_name)
from prometheus_client import Counter, Histogram
requests_total = Counter('recommendation_requests_total', ['status'])
latency_seconds = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
with latency_seconds.time():
try:
recs = generate_recs(user_id)
requests_total.labels(status='success').inc()
return recs
except Exception as e:
requests_total.labels(status='error').inc()
raise
每周安装数
66
仓库
GitHub 星标数
93
首次出现
2026年1月25日
安全审计
安装于
claude-code58
gemini-cli55
codex52
opencode52
cursor52
github-copilot50
Production-ready architecture for scalable recommendation systems with feature stores, multi-tier caching, A/B testing, and comprehensive monitoring.
Load this skill when:
# 1. Install dependencies
pip install fastapi==0.109.0 redis==5.0.0 prometheus-client==0.19.0
# 2. Start Redis (for caching and feature store)
docker run -d -p 6379:6379 redis:alpine
# 3. Create recommendation service: app.py
cat > app.py << 'EOF'
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import redis
import json
app = FastAPI()
cache = redis.Redis(host='localhost', port=6379, decode_responses=True)
class RecommendationResponse(BaseModel):
user_id: str
items: List[str]
cached: bool
@app.post("/recommendations", response_model=RecommendationResponse)
async def get_recommendations(user_id: str, n: int = 10):
# Check cache
cache_key = f"recs:{user_id}:{n}"
cached = cache.get(cache_key)
if cached:
return RecommendationResponse(
user_id=user_id,
items=json.loads(cached),
cached=True
)
# Generate recommendations (simplified)
items = [f"item_{i}" for i in range(n)]
# Cache for 5 minutes
cache.setex(cache_key, 300, json.dumps(items))
return RecommendationResponse(
user_id=user_id,
items=items,
cached=False
)
@app.get("/health")
async def health():
return {"status": "healthy"}
EOF
# 4. Run API
uvicorn app:app --host 0.0.0.0 --port 8000
# 5. Test
curl -X POST "http://localhost:8000/recommendations?user_id=user_123&n=10"
Result : Working recommendation API with caching in under 5 minutes.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ User Events │────▶│ Feature │────▶│ Model │
│ (clicks, │ │ Store │ │ Serving │
│ purchases) │ │ (Redis) │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Training │ │ API │
│ Pipeline │ │ (FastAPI) │
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Monitoring │
│ (Prometheus)│
└─────────────┘
Centralized storage for user and item features:
import redis
import json
class FeatureStore:
"""Fast feature access with Redis caching."""
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1 hour
def get_user_features(self, user_id: str) -> dict:
cache_key = f"user_features:{user_id}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from database
features = fetch_from_db(user_id)
# Cache
self.redis.setex(cache_key, self.ttl, json.dumps(features))
return features
Serve multiple models for A/B testing:
class ModelServing:
"""Serve multiple recommendation models."""
def __init__(self):
self.models = {}
def register_model(self, name: str, model, is_default: bool = False):
self.models[name] = model
if is_default:
self.default_model = name
def predict(self, user_features: dict, item_features: list, model_name: str = None):
model = self.models.get(model_name or self.default_model)
return model.predict(user_features, item_features)
Multi-tier caching for low latency:
class TieredCache:
"""L1 (memory) -> L2 (Redis) -> L3 (database)."""
def __init__(self, redis_client):
self.l1_cache = {} # In-memory
self.redis = redis_client # L2
def get(self, key: str):
# L1: In-memory (fastest)
if key in self.l1_cache:
return self.l1_cache[key]
# L2: Redis
cached = self.redis.get(key)
if cached:
value = json.loads(cached)
self.l1_cache[key] = value # Promote to L1
return value
# L3: Miss (fetch from database)
return None
| Metric | Description | Target |
|---|---|---|
| CTR | Click-through rate | >5% |
| Conversion Rate | Purchases from recs | >2% |
| P95 Latency | 95th percentile response time | <200ms |
| Cache Hit Rate | % served from cache | >80% |
| Coverage | % of catalog recommended | >50% |
| Diversity | Variety in recommendations | >0.7 |
Problem : No recommendations for users without history, poor initial experience.
Solution : Use popularity-based fallback:
def get_recommendations(user_id: str, n: int = 10):
user_features = feature_store.get_user_features(user_id)
# Check if new user (no purchase history)
if user_features.get('total_purchases', 0) == 0:
# Fallback to popular items
return get_popular_items(n)
# Personalized recommendations
return generate_personalized_recs(user_id, n)
Problem : User makes purchase, cache still shows purchased item in recommendations.
Solution : Invalidate cache on relevant actions:
INVALIDATING_ACTIONS = {'purchase', 'rating', 'add_to_cart'}
def on_user_action(user_id: str, action: str):
if action in INVALIDATING_ACTIONS:
cache_key = f"recs:{user_id}:*"
redis_client.delete(cache_key)
logger.info(f"Invalidated cache for {user_id} due to {action}")
Problem : Many users' caches expire simultaneously, overload database/model.
Solution : Add random jitter to TTL:
import random
def set_cache(key: str, value: dict, base_ttl: int = 300):
# Add ±10% jitter
jitter = random.uniform(-0.1, 0.1) * base_ttl
ttl = int(base_ttl + jitter)
redis_client.setex(key, ttl, json.dumps(value))
Problem : Recommendations too similar, users only see same category.
Solution : Implement diversity constraint:
def rank_with_diversity(items: list, scores: list, n: int = 10):
selected = []
category_counts = {}
for item, score in sorted(zip(items, scores), key=lambda x: -x[1]):
category = item['category']
# Limit 3 items per category
if category_counts.get(category, 0) >= 3:
continue
selected.append(item)
category_counts[category] = category_counts.get(category, 0) + 1
if len(selected) >= n:
break
return selected
Problem : Recommendation quality drops, nobody notices until users complain.
Solution : Continuous monitoring with alerts:
from prometheus_client import Counter, Histogram
recommendation_clicks = Counter('recommendation_clicks_total')
recommendation_latency = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
start = time.time()
recs = generate_recs(user_id)
latency = time.time() - start
recommendation_latency.observe(latency)
return recs
@app.post("/track/click")
async def track_click(user_id: str, item_id: str):
recommendation_clicks.inc()
# Alert if CTR drops below 3%
Problem : User preferences change but features don't update, recommendations irrelevant.
Solution : Set appropriate TTLs and update triggers:
class FeatureStore:
def __init__(self, redis_client):
self.redis = redis_client
# Shorter TTL for frequently changing features
self.user_ttl = 300 # 5 minutes
self.item_ttl = 3600 # 1 hour
def update_on_event(self, user_id: str, event: str):
# Invalidate on important events
if event in ['purchase', 'rating']:
self.redis.delete(f"user_features:{user_id}")
logger.info(f"Refreshed features for {user_id}")
Problem : Declare winner too early, results not statistically significant.
Solution : Calculate required sample size first:
def calculate_sample_size(
baseline_rate: float,
min_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8
) -> int:
"""Calculate required sample size per variant."""
from scipy import stats
z_alpha = stats.norm.ppf(1 - alpha/2)
z_beta = stats.norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + min_detectable_effect)
p_avg = (p1 + p2) / 2
n = (
(z_alpha + z_beta)**2 * 2 * p_avg * (1 - p_avg) /
(p2 - p1)**2
)
return int(n)
# Example: detect 10% lift with baseline CTR=5%
n_required = calculate_sample_size(
baseline_rate=0.05,
min_detectable_effect=0.10
)
print(f"Required sample size: {n_required} per variant")
# Wait until both variants reach this size before concluding
Load reference files for detailed production implementations:
Production Architecture : Load references/production-architecture.md for complete FeatureStore, ModelServing, and RecommendationService implementations with batch fetching, caching integration, and FastAPI deployment patterns.
Caching Strategies : Load references/caching-strategies.md when implementing multi-tier caching (L1/L2/L3), cache warming, invalidation strategies, probabilistic refresh, or thundering herd prevention.
A/B Testing Framework : Load references/ab-testing-framework.md for deterministic variant assignment, Thompson sampling (multi-armed bandits), Bayesian and frequentist significance testing, and experiment tracking.
Monitoring & Alerting: Load references/monitoring-alerting.md for Prometheus metrics integration, dashboard endpoints, alert rules, and quality monitoring (diversity, coverage).
class RecommendationService:
def __init__(self, feature_store, model_serving, cache):
self.feature_store = feature_store
self.model_serving = model_serving
self.cache = cache
def get_recommendations(self, user_id: str, n: int = 10):
# 1. Check cache
cached = self.cache.get(f"recs:{user_id}:{n}")
if cached:
return cached
# 2. Get features
user_features = self.feature_store.get_user_features(user_id)
candidates = self.get_candidates(user_id)
# 3. Score candidates
scores = self.model_serving.predict(user_features, candidates)
# 4. Rank with diversity
recommendations = self.rank_with_diversity(candidates, scores, n)
# 5. Cache
self.cache.set(f"recs:{user_id}:{n}", recommendations, ttl=300)
return recommendations
def assign_variant(user_id: str, experiment_id: str) -> str:
"""Deterministic assignment - same user always gets same variant."""
import hashlib
hash_input = f"{user_id}:{experiment_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
# 50/50 split
return 'control' if hash_value % 2 == 0 else 'treatment'
# Usage
variant = assign_variant('user_123', 'rec_algo_v2')
model_name = 'main' if variant == 'control' else 'experimental'
recs = get_recommendations(user_id, model_name=model_name)
from prometheus_client import Counter, Histogram
requests_total = Counter('recommendation_requests_total', ['status'])
latency_seconds = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
with latency_seconds.time():
try:
recs = generate_recs(user_id)
requests_total.labels(status='success').inc()
return recs
except Exception as e:
requests_total.labels(status='error').inc()
raise
Weekly Installs
66
Repository
GitHub Stars
93
First Seen
Jan 25, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykPass
Installed on
claude-code58
gemini-cli55
codex52
opencode52
cursor52
github-copilot50
超能力技能使用指南:AI助手技能调用优先级与工作流程详解
53,700 周安装