Model Deployment by aj-geddes/useful-ai-prompts
npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill 'Model Deployment'模型部署是将训练好的机器学习模型投入生产使用的过程,通过 API、Web 服务或批处理系统使其可用。
import numpy as np
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import joblib
# FastAPI for REST API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
import uvicorn
# For model serving
import mlflow.pyfunc
import mlflow.sklearn
# Docker and deployment
import logging
import time
from typing import List, Dict
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=== 1. Train and Save Model ===")
# Create dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_scaled, y)
# Save model and preprocessing
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'
joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)
print(f"Model saved to {model_path}")
print(f"Scaler saved to {scaler_path}")
# 2. Model Serving Class
print("\n=== 2. Model Serving Class ===")
class ModelPredictor:
def __init__(self, model_path, scaler_path):
self.model = joblib.load(model_path)
self.scaler = joblib.load(scaler_path)
self.load_time = time.time()
self.predictions_count = 0
logger.info("Model loaded successfully")
def predict(self, features: List[List[float]]) -> Dict:
try:
X = np.array(features)
X_scaled = self.scaler.transform(X)
predictions = self.model.predict(X_scaled)
probabilities = self.model.predict_proba(X_scaled)
self.predictions_count += len(X)
return {
'predictions': predictions.tolist(),
'probabilities': probabilities.tolist(),
'count': len(X),
'timestamp': time.time()
}
except Exception as e:
logger.error(f"Prediction error: {str(e)}")
raise
def health_check(self) -> Dict:
return {
'status': 'healthy',
'uptime': time.time() - self.load_time,
'predictions': self.predictions_count
}
# Initialize predictor
predictor = ModelPredictor(model_path, scaler_path)
# 3. FastAPI Application
print("\n=== 3. FastAPI Application ===")
app = FastAPI(
title="ML Model API",
description="Production ML model serving API",
version="1.0.0"
)
class PredictionRequest(BaseModel):
features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]])
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[List[float]]
count: int
timestamp: float
class HealthResponse(BaseModel):
status: str
uptime: float
predictions: int
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""健康检查端点"""
return predictor.health_check()
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""进行预测"""
try:
result = predictor.predict(request.features)
return result
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/predict-batch")
async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks):
"""带后台处理的批量预测"""
all_features = []
for req in requests:
all_features.extend(req.features)
result = predictor.predict(all_features)
background_tasks.add_task(logger.info, f"Batch prediction processed: {result['count']} samples")
return result
@app.get("/stats")
async def get_stats():
"""获取模型统计信息"""
return {
'model_type': type(predictor.model).__name__,
'n_estimators': predictor.model.n_estimators,
'max_depth': predictor.model.max_depth,
'feature_importance': predictor.model.feature_importances_.tolist(),
'total_predictions': predictor.predictions_count
}
# 4. Dockerfile template
print("\n=== 4. Dockerfile Template ===")
dockerfile_content = '''FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model.pkl .
COPY scaler.pkl .
COPY app.py .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''
print("Dockerfile content:")
print(dockerfile_content)
# 5. Requirements file
print("\n=== 5. Requirements.txt ===")
requirements = """fastapi==0.104.1
uvicorn[standard]==0.24.0
numpy==1.24.0
pandas==2.1.0
scikit-learn==1.3.2
joblib==1.3.2
pydantic==2.5.0
mlflow==2.8.1
"""
print("Requirements:")
print(requirements)
# 6. Docker Compose for deployment
print("\n=== 6. Docker Compose Template ===")
docker_compose = '''version: '3.8'
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- LOG_LEVEL=info
- WORKERS=4
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 10s
timeout: 5s
retries: 3
ml-monitor:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- "--config.file=/etc/prometheus/prometheus.yml"
ml-dashboard:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
'''
print("Docker Compose content:")
print(docker_compose)
# 7. Testing the API
print("\n=== 7. Testing the API ===")
def test_predictor():
# Test single prediction
test_features = [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0,
1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1]]
result = predictor.predict(test_features)
print(f"Prediction result: {result}")
# Health check
health = predictor.health_check()
print(f"Health status: {health}")
# Batch predictions
batch_features = [
[1.0] * 20,
[2.0] * 20,
[3.0] * 20,
]
batch_result = predictor.predict(batch_features)
print(f"Batch prediction: {batch_result['count']} samples processed")
test_predictor()
# 8. Model versioning and registry
print("\n=== 8. Model Registry with MLflow ===")
# Log model to MLflow
with mlflow.start_run():
mlflow.sklearn.log_model(model, "model")
mlflow.log_param("max_depth", 10)
mlflow.log_param("n_estimators", 100)
mlflow.log_metric("accuracy", 0.95)
model_uri = "runs:/" + mlflow.active_run().info.run_id + "/model"
print(f"Model logged to MLflow: {model_uri}")
# 9. Deployment monitoring code
print("\n=== 9. Monitoring Setup ===")
class ModelMonitor:
def __init__(self):
self.predictions = []
self.latencies = []
def log_prediction(self, features, prediction, latency):
self.predictions.append({
'timestamp': time.time(),
'features_mean': np.mean(features),
'prediction': prediction,
'latency_ms': latency * 1000
})
def check_model_drift(self):
if len(self.predictions) < 100:
return {'drift_detected': False}
recent_predictions = [p['prediction'] for p in self.predictions[-100:]]
historical_mean = np.mean([p['prediction'] for p in self.predictions[:-100]])
recent_mean = np.mean(recent_predictions)
drift = abs(recent_mean - historical_mean) > 0.1
return {
'drift_detected': drift,
'historical_mean': float(historical_mean),
'recent_mean': float(recent_mean),
'threshold': 0.1
}
def get_stats(self):
if not self.latencies:
return {}
return {
'avg_latency_ms': np.mean(self.latencies) * 1000,
'p95_latency_ms': np.percentile(self.latencies, 95) * 1000,
'p99_latency_ms': np.percentile(self.latencies, 99) * 1000,
'total_predictions': len(self.predictions)
}
monitor = ModelMonitor()
print("\nDeployment setup completed!")
print("To run FastAPI server: uvicorn app:app --reload")
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
每周安装量
0
代码仓库
GitHub 星标数
126
首次出现时间
1970年1月1日
安全审计
Model deployment is the process of taking a trained machine learning model and making it available for production use through APIs, web services, or batch processing systems.
import numpy as np
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import joblib
# FastAPI for REST API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
import uvicorn
# For model serving
import mlflow.pyfunc
import mlflow.sklearn
# Docker and deployment
import logging
import time
from typing import List, Dict
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=== 1. Train and Save Model ===")
# Create dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_scaled, y)
# Save model and preprocessing
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'
joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)
print(f"Model saved to {model_path}")
print(f"Scaler saved to {scaler_path}")
# 2. Model Serving Class
print("\n=== 2. Model Serving Class ===")
class ModelPredictor:
def __init__(self, model_path, scaler_path):
self.model = joblib.load(model_path)
self.scaler = joblib.load(scaler_path)
self.load_time = time.time()
self.predictions_count = 0
logger.info("Model loaded successfully")
def predict(self, features: List[List[float]]) -> Dict:
try:
X = np.array(features)
X_scaled = self.scaler.transform(X)
predictions = self.model.predict(X_scaled)
probabilities = self.model.predict_proba(X_scaled)
self.predictions_count += len(X)
return {
'predictions': predictions.tolist(),
'probabilities': probabilities.tolist(),
'count': len(X),
'timestamp': time.time()
}
except Exception as e:
logger.error(f"Prediction error: {str(e)}")
raise
def health_check(self) -> Dict:
return {
'status': 'healthy',
'uptime': time.time() - self.load_time,
'predictions': self.predictions_count
}
# Initialize predictor
predictor = ModelPredictor(model_path, scaler_path)
# 3. FastAPI Application
print("\n=== 3. FastAPI Application ===")
app = FastAPI(
title="ML Model API",
description="Production ML model serving API",
version="1.0.0"
)
class PredictionRequest(BaseModel):
features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]])
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[List[float]]
count: int
timestamp: float
class HealthResponse(BaseModel):
status: str
uptime: float
predictions: int
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint"""
return predictor.health_check()
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Make predictions"""
try:
result = predictor.predict(request.features)
return result
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/predict-batch")
async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks):
"""Batch prediction with background processing"""
all_features = []
for req in requests:
all_features.extend(req.features)
result = predictor.predict(all_features)
background_tasks.add_task(logger.info, f"Batch prediction processed: {result['count']} samples")
return result
@app.get("/stats")
async def get_stats():
"""Get model statistics"""
return {
'model_type': type(predictor.model).__name__,
'n_estimators': predictor.model.n_estimators,
'max_depth': predictor.model.max_depth,
'feature_importance': predictor.model.feature_importances_.tolist(),
'total_predictions': predictor.predictions_count
}
# 4. Dockerfile template
print("\n=== 4. Dockerfile Template ===")
dockerfile_content = '''FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model.pkl .
COPY scaler.pkl .
COPY app.py .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''
print("Dockerfile content:")
print(dockerfile_content)
# 5. Requirements file
print("\n=== 5. Requirements.txt ===")
requirements = """fastapi==0.104.1
uvicorn[standard]==0.24.0
numpy==1.24.0
pandas==2.1.0
scikit-learn==1.3.2
joblib==1.3.2
pydantic==2.5.0
mlflow==2.8.1
"""
print("Requirements:")
print(requirements)
# 6. Docker Compose for deployment
print("\n=== 6. Docker Compose Template ===")
docker_compose = '''version: '3.8'
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- LOG_LEVEL=info
- WORKERS=4
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 10s
timeout: 5s
retries: 3
ml-monitor:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- "--config.file=/etc/prometheus/prometheus.yml"
ml-dashboard:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
'''
print("Docker Compose content:")
print(docker_compose)
# 7. Testing the API
print("\n=== 7. Testing the API ===")
def test_predictor():
# Test single prediction
test_features = [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0,
1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1]]
result = predictor.predict(test_features)
print(f"Prediction result: {result}")
# Health check
health = predictor.health_check()
print(f"Health status: {health}")
# Batch predictions
batch_features = [
[1.0] * 20,
[2.0] * 20,
[3.0] * 20,
]
batch_result = predictor.predict(batch_features)
print(f"Batch prediction: {batch_result['count']} samples processed")
test_predictor()
# 8. Model versioning and registry
print("\n=== 8. Model Registry with MLflow ===")
# Log model to MLflow
with mlflow.start_run():
mlflow.sklearn.log_model(model, "model")
mlflow.log_param("max_depth", 10)
mlflow.log_param("n_estimators", 100)
mlflow.log_metric("accuracy", 0.95)
model_uri = "runs:/" + mlflow.active_run().info.run_id + "/model"
print(f"Model logged to MLflow: {model_uri}")
# 9. Deployment monitoring code
print("\n=== 9. Monitoring Setup ===")
class ModelMonitor:
def __init__(self):
self.predictions = []
self.latencies = []
def log_prediction(self, features, prediction, latency):
self.predictions.append({
'timestamp': time.time(),
'features_mean': np.mean(features),
'prediction': prediction,
'latency_ms': latency * 1000
})
def check_model_drift(self):
if len(self.predictions) < 100:
return {'drift_detected': False}
recent_predictions = [p['prediction'] for p in self.predictions[-100:]]
historical_mean = np.mean([p['prediction'] for p in self.predictions[:-100]])
recent_mean = np.mean(recent_predictions)
drift = abs(recent_mean - historical_mean) > 0.1
return {
'drift_detected': drift,
'historical_mean': float(historical_mean),
'recent_mean': float(recent_mean),
'threshold': 0.1
}
def get_stats(self):
if not self.latencies:
return {}
return {
'avg_latency_ms': np.mean(self.latencies) * 1000,
'p95_latency_ms': np.percentile(self.latencies, 95) * 1000,
'p99_latency_ms': np.percentile(self.latencies, 99) * 1000,
'total_predictions': len(self.predictions)
}
monitor = ModelMonitor()
print("\nDeployment setup completed!")
print("To run FastAPI server: uvicorn app:app --reload")
Weekly Installs
0
Repository
GitHub Stars
126
First Seen
Jan 1, 1970
Security Audits
Azure Data Explorer (Kusto) 查询技能:KQL数据分析、日志遥测与时间序列处理
107,900 周安装