重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
ai-architect-expert by personamanagmentlayer/pcl
npx skills add https://github.com/personamanagmentlayer/pcl --skill ai-architect-expert为设计 AI 系统、MLOps 架构、可扩展的 ML 基础设施和 AI 平台工程提供专家指导。
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
class ModelStage(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
ARCHIVED = "archived"
@dataclass
class ModelMetadata:
name: str
version: str
framework: str
stage: ModelStage
metrics: Dict[str, float]
created_at: str
updated_at: str
class ModelRegistry:
"""ML 平台的中央模型注册中心"""
def __init__(self):
self.models: Dict[str, List[ModelMetadata]] = {}
def register_model(self, model: ModelMetadata) -> str:
"""注册新模型版本"""
if model.name not in self.models:
self.models[model.name] = []
self.models[model.name].append(model)
return f"{model.name}:{model.version}"
def promote_model(self, name: str, version: str, stage: ModelStage):
"""将模型提升到不同阶段"""
for model in self.models.get(name, []):
if model.version == version:
model.stage = stage
return True
return False
def get_production_model(self, name: str) -> Optional[ModelMetadata]:
"""获取当前生产模型"""
for model in self.models.get(name, []):
if model.stage == ModelStage.PRODUCTION:
return model
return None
class FeatureStore:
"""用于 ML 特征的特征存储"""
def __init__(self):
self.features: Dict[str, Dict] = {}
self.feature_groups: Dict[str, List[str]] = {}
def register_feature(self, name: str, dtype: str, description: str,
transformation: Optional[str] = None):
"""注册特征定义"""
self.features[name] = {
"dtype": dtype,
"description": description,
"transformation": transformation
}
def create_feature_group(self, group_name: str, feature_names: List[str]):
"""创建特征组以供重用"""
self.feature_groups[group_name] = feature_names
def get_features(self, entity_id: str, feature_names: List[str]) -> Dict:
"""检索实体的特征值"""
# 在生产环境中,这将查询在线/离线存储
return {name: self._fetch_feature(entity_id, name)
for name in feature_names}
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from abc import ABC, abstractmethod
import torch.distributed as dist
class TrainingPipeline(ABC):
"""基础训练流水线"""
def __init__(self, config: Dict):
self.config = config
self.experiment_tracker = None
self.checkpointer = None
@abstractmethod
def prepare_data(self):
"""数据准备步骤"""
pass
@abstractmethod
def train(self):
"""训练步骤"""
pass
@abstractmethod
def evaluate(self):
"""评估步骤"""
pass
def run(self):
"""执行完整流水线"""
self.prepare_data()
self.train()
metrics = self.evaluate()
self.log_metrics(metrics)
return metrics
class DistributedTrainingPipeline(TrainingPipeline):
"""使用 DDP 的分布式训练"""
def __init__(self, config: Dict, world_size: int, rank: int):
super().__init__(config)
self.world_size = world_size
self.rank = rank
self.setup_distributed()
def setup_distributed(self):
"""初始化分布式训练"""
dist.init_process_group(
backend='nccl',
world_size=self.world_size,
rank=self.rank
)
def prepare_data(self):
"""在多个工作节点间分发数据"""
from torch.utils.data.distributed import DistributedSampler
self.sampler = DistributedSampler(
self.dataset,
num_replicas=self.world_size,
rank=self.rank
)
def train(self):
"""分布式训练循环"""
from torch.nn.parallel import DistributedDataParallel as DDP
model = DDP(self.model, device_ids=[self.rank])
for epoch in range(self.config['epochs']):
self.sampler.set_epoch(epoch)
for batch in self.dataloader:
loss = self.train_step(model, batch)
if self.rank == 0:
self.log_loss(loss)
from fastapi import FastAPI, BackgroundTasks
from prometheus_client import Counter, Histogram
import asyncio
# 指标
prediction_counter = Counter('predictions_total', 'Total predictions')
prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency')
class ModelServer:
"""生产模型服务"""
def __init__(self, model_registry: ModelRegistry):
self.registry = model_registry
self.loaded_models = {}
self.prediction_cache = {}
async def load_model(self, name: str, version: str = "production"):
"""将模型加载到内存中"""
if version == "production":
model_metadata = self.registry.get_production_model(name)
else:
model_metadata = self.registry.get_model(name, version)
if not model_metadata:
raise ValueError(f"Model {name}:{version} not found")
# 从存储加载模型
model = await self._load_from_storage(model_metadata)
self.loaded_models[f"{name}:{version}"] = model
return model
@prediction_latency.time()
async def predict(self, model_name: str, features: Dict) -> Dict:
"""使用缓存进行预测"""
prediction_counter.inc()
# 检查缓存
cache_key = self._generate_cache_key(model_name, features)
if cache_key in self.prediction_cache:
return self.prediction_cache[cache_key]
# 获取模型
model = self.loaded_models.get(model_name)
if not model:
model = await self.load_model(model_name)
# 预测
result = await self._run_inference(model, features)
# 缓存结果
self.prediction_cache[cache_key] = result
return result
async def predict_batch(self, model_name: str,
batch_features: List[Dict]) -> List[Dict]:
"""批量预测以提高效率"""
tasks = [self.predict(model_name, features)
for features in batch_features]
return await asyncio.gather(*tasks)
app = FastAPI()
model_server = ModelServer(model_registry=ModelRegistry())
@app.post("/predict/{model_name}")
async def predict_endpoint(model_name: str, features: Dict):
return await model_server.predict(model_name, features)
from dataclasses import dataclass
from datetime import datetime
import numpy as np
@dataclass
class PredictionLog:
timestamp: datetime
model_name: str
model_version: str
features: Dict
prediction: any
latency_ms: float
input_hash: str
class ModelMonitor:
"""监控生产环境中的模型性能"""
def __init__(self):
self.logs: List[PredictionLog] = []
self.metrics = {}
def log_prediction(self, log: PredictionLog):
"""记录预测以供监控"""
self.logs.append(log)
# 更新指标
self.update_latency_metrics(log)
self.check_data_drift(log)
def update_latency_metrics(self, log: PredictionLog):
"""跟踪预测延迟"""
model_key = f"{log.model_name}:{log.model_version}"
if model_key not in self.metrics:
self.metrics[model_key] = {
"latencies": [],
"predictions": 0
}
self.metrics[model_key]["latencies"].append(log.latency_ms)
self.metrics[model_key]["predictions"] += 1
def check_data_drift(self, log: PredictionLog):
"""检测输入特征中的数据漂移"""
# 将当前特征分布与训练数据进行比较
# 如果检测到显著漂移则发出警报
pass
def get_model_health(self, model_name: str) -> Dict:
"""获取模型健康指标"""
model_metrics = self.metrics.get(model_name, {})
latencies = model_metrics.get("latencies", [])
return {
"total_predictions": model_metrics.get("predictions", 0),
"avg_latency_ms": np.mean(latencies) if latencies else 0,
"p95_latency_ms": np.percentile(latencies, 95) if latencies else 0,
"p99_latency_ms": np.percentile(latencies, 99) if latencies else 0
}
❌ 没有模型版本控制或注册中心 ❌ 训练和服务环境不匹配 ❌ 没有监控或告警 ❌ 手动模型部署流程 ❌ 忽视数据漂移 ❌ 没有回滚策略 ❌ 为初始 MVP 过度设计
每周安装次数
49
代码仓库
GitHub 星标数
11
首次出现
2026年1月23日
安全审计
安装于
opencode41
codex40
gemini-cli38
cursor36
github-copilot35
amp31
Expert guidance for designing AI systems, MLOps architecture, scalable ML infrastructure, and AI platform engineering.
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
class ModelStage(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
ARCHIVED = "archived"
@dataclass
class ModelMetadata:
name: str
version: str
framework: str
stage: ModelStage
metrics: Dict[str, float]
created_at: str
updated_at: str
class ModelRegistry:
"""Central model registry for ML platform"""
def __init__(self):
self.models: Dict[str, List[ModelMetadata]] = {}
def register_model(self, model: ModelMetadata) -> str:
"""Register new model version"""
if model.name not in self.models:
self.models[model.name] = []
self.models[model.name].append(model)
return f"{model.name}:{model.version}"
def promote_model(self, name: str, version: str, stage: ModelStage):
"""Promote model to different stage"""
for model in self.models.get(name, []):
if model.version == version:
model.stage = stage
return True
return False
def get_production_model(self, name: str) -> Optional[ModelMetadata]:
"""Get current production model"""
for model in self.models.get(name, []):
if model.stage == ModelStage.PRODUCTION:
return model
return None
class FeatureStore:
"""Feature store for ML features"""
def __init__(self):
self.features: Dict[str, Dict] = {}
self.feature_groups: Dict[str, List[str]] = {}
def register_feature(self, name: str, dtype: str, description: str,
transformation: Optional[str] = None):
"""Register feature definition"""
self.features[name] = {
"dtype": dtype,
"description": description,
"transformation": transformation
}
def create_feature_group(self, group_name: str, feature_names: List[str]):
"""Create feature group for reuse"""
self.feature_groups[group_name] = feature_names
def get_features(self, entity_id: str, feature_names: List[str]) -> Dict:
"""Retrieve feature values for entity"""
# In production, this would query online/offline stores
return {name: self._fetch_feature(entity_id, name)
for name in feature_names}
from abc import ABC, abstractmethod
import torch.distributed as dist
class TrainingPipeline(ABC):
"""Base training pipeline"""
def __init__(self, config: Dict):
self.config = config
self.experiment_tracker = None
self.checkpointer = None
@abstractmethod
def prepare_data(self):
"""Data preparation step"""
pass
@abstractmethod
def train(self):
"""Training step"""
pass
@abstractmethod
def evaluate(self):
"""Evaluation step"""
pass
def run(self):
"""Execute full pipeline"""
self.prepare_data()
self.train()
metrics = self.evaluate()
self.log_metrics(metrics)
return metrics
class DistributedTrainingPipeline(TrainingPipeline):
"""Distributed training with DDP"""
def __init__(self, config: Dict, world_size: int, rank: int):
super().__init__(config)
self.world_size = world_size
self.rank = rank
self.setup_distributed()
def setup_distributed(self):
"""Initialize distributed training"""
dist.init_process_group(
backend='nccl',
world_size=self.world_size,
rank=self.rank
)
def prepare_data(self):
"""Distribute data across workers"""
from torch.utils.data.distributed import DistributedSampler
self.sampler = DistributedSampler(
self.dataset,
num_replicas=self.world_size,
rank=self.rank
)
def train(self):
"""Distributed training loop"""
from torch.nn.parallel import DistributedDataParallel as DDP
model = DDP(self.model, device_ids=[self.rank])
for epoch in range(self.config['epochs']):
self.sampler.set_epoch(epoch)
for batch in self.dataloader:
loss = self.train_step(model, batch)
if self.rank == 0:
self.log_loss(loss)
from fastapi import FastAPI, BackgroundTasks
from prometheus_client import Counter, Histogram
import asyncio
# Metrics
prediction_counter = Counter('predictions_total', 'Total predictions')
prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency')
class ModelServer:
"""Production model serving"""
def __init__(self, model_registry: ModelRegistry):
self.registry = model_registry
self.loaded_models = {}
self.prediction_cache = {}
async def load_model(self, name: str, version: str = "production"):
"""Load model into memory"""
if version == "production":
model_metadata = self.registry.get_production_model(name)
else:
model_metadata = self.registry.get_model(name, version)
if not model_metadata:
raise ValueError(f"Model {name}:{version} not found")
# Load model from storage
model = await self._load_from_storage(model_metadata)
self.loaded_models[f"{name}:{version}"] = model
return model
@prediction_latency.time()
async def predict(self, model_name: str, features: Dict) -> Dict:
"""Make prediction with caching"""
prediction_counter.inc()
# Check cache
cache_key = self._generate_cache_key(model_name, features)
if cache_key in self.prediction_cache:
return self.prediction_cache[cache_key]
# Get model
model = self.loaded_models.get(model_name)
if not model:
model = await self.load_model(model_name)
# Predict
result = await self._run_inference(model, features)
# Cache result
self.prediction_cache[cache_key] = result
return result
async def predict_batch(self, model_name: str,
batch_features: List[Dict]) -> List[Dict]:
"""Batch prediction for efficiency"""
tasks = [self.predict(model_name, features)
for features in batch_features]
return await asyncio.gather(*tasks)
app = FastAPI()
model_server = ModelServer(model_registry=ModelRegistry())
@app.post("/predict/{model_name}")
async def predict_endpoint(model_name: str, features: Dict):
return await model_server.predict(model_name, features)
from dataclasses import dataclass
from datetime import datetime
import numpy as np
@dataclass
class PredictionLog:
timestamp: datetime
model_name: str
model_version: str
features: Dict
prediction: any
latency_ms: float
input_hash: str
class ModelMonitor:
"""Monitor model performance in production"""
def __init__(self):
self.logs: List[PredictionLog] = []
self.metrics = {}
def log_prediction(self, log: PredictionLog):
"""Log prediction for monitoring"""
self.logs.append(log)
# Update metrics
self.update_latency_metrics(log)
self.check_data_drift(log)
def update_latency_metrics(self, log: PredictionLog):
"""Track prediction latency"""
model_key = f"{log.model_name}:{log.model_version}"
if model_key not in self.metrics:
self.metrics[model_key] = {
"latencies": [],
"predictions": 0
}
self.metrics[model_key]["latencies"].append(log.latency_ms)
self.metrics[model_key]["predictions"] += 1
def check_data_drift(self, log: PredictionLog):
"""Detect data drift in input features"""
# Compare current feature distributions with training data
# Alert if significant drift detected
pass
def get_model_health(self, model_name: str) -> Dict:
"""Get model health metrics"""
model_metrics = self.metrics.get(model_name, {})
latencies = model_metrics.get("latencies", [])
return {
"total_predictions": model_metrics.get("predictions", 0),
"avg_latency_ms": np.mean(latencies) if latencies else 0,
"p95_latency_ms": np.percentile(latencies, 95) if latencies else 0,
"p99_latency_ms": np.percentile(latencies, 99) if latencies else 0
}
❌ No model versioning or registry ❌ Training and serving environment mismatch ❌ No monitoring or alerting ❌ Manual model deployment process ❌ Ignoring data drift ❌ No rollback strategy ❌ Over-engineering for initial MVP
Weekly Installs
49
Repository
GitHub Stars
11
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode41
codex40
gemini-cli38
cursor36
github-copilot35
amp31
AI界面设计评审工具 - 全面评估UI/UX设计质量、检测AI生成痕迹与优化用户体验
58,500 周安装
Supabase 管理专家:行级安全策略、数据库迁移与性能优化指南
55 周安装
AWS云服务技能:构建部署管理云基础设施,掌握S3、Lambda、DynamoDB、EC2、RDS等核心服务
55 周安装
UX研究员技能:运用迪士尼12条动画原则进行用户体验研究与优化
60 周安装
迪士尼12项动画原则完整诊断框架 - 通用动画解决方案,修复UI/UX动画问题
60 周安装
Popmotion动画教程:用JavaScript实现12条迪士尼动画原则,打造流畅交互体验
62 周安装
iOS Swift开发技能:SwiftUI、UIKit、Xcode配置、App Store提交完整指南
55 周安装