manufacturing-expert by personamanagmentlayer/pcl
npx skills add https://github.com/personamanagmentlayer/pcl --skill manufacturing-expert为制造系统、工业4.0、生产优化、质量控制和智能工厂实施提供专家指导。
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
from enum import Enum
class OrderStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
ON_HOLD = "on_hold"
CANCELLED = "cancelled"
class MachineStatus(Enum):
IDLE = "idle"
RUNNING = "running"
MAINTENANCE = "maintenance"
ERROR = "error"
OFFLINE = "offline"
@dataclass
class WorkOrder:
"""制造工单"""
order_id: str
product_id: str
quantity: int
priority: int # 1 (最高) 到 5 (最低)
due_date: datetime
status: OrderStatus
assigned_line: Optional[str]
started_at: Optional[datetime]
completed_at: Optional[datetime]
actual_quantity: int = 0
defect_quantity: int = 0
@dataclass
class Machine:
"""生产机器/设备"""
machine_id: str
machine_type: str
status: MachineStatus
current_order: Optional[str]
production_rate: float # 每小时产量
uptime_percentage: float
last_maintenance: datetime
next_maintenance: datetime
oee: float # 整体设备效率
@dataclass
class ProductionMetrics:
"""实时生产指标"""
timestamp: datetime
line_id: str
produced_units: int
defective_units: int
downtime_minutes: int
cycle_time_seconds: float
efficiency_percentage: float
class ManufacturingExecutionSystem:
"""用于生产管理的 MES"""
def __init__(self):
self.work_orders = {}
self.machines = {}
self.production_data = []
def create_work_order(self,
product_id: str,
quantity: int,
due_date: datetime,
priority: int = 3) -> WorkOrder:
"""创建新的生产工单"""
order_id = self._generate_order_id()
order = WorkOrder(
order_id=order_id,
product_id=product_id,
quantity=quantity,
priority=priority,
due_date=due_date,
status=OrderStatus.PENDING,
assigned_line=None,
started_at=None,
completed_at=None
)
self.work_orders[order_id] = order
return order
def schedule_production(self) -> List[dict]:
"""将工单安排到生产线"""
# 获取按优先级和到期日排序的待处理订单
pending_orders = [
order for order in self.work_orders.values()
if order.status == OrderStatus.PENDING
]
sorted_orders = sorted(
pending_orders,
key=lambda x: (x.priority, x.due_date)
)
# 获取可用机器
available_machines = [
machine for machine in self.machines.values()
if machine.status in [MachineStatus.IDLE, MachineStatus.RUNNING]
]
schedule = []
for order in sorted_orders:
# 为此订单找到最佳机器
best_machine = self._find_best_machine(order, available_machines)
if best_machine:
# 计算预计完成时间
production_time = order.quantity / best_machine.production_rate
estimated_completion = datetime.now() + timedelta(hours=production_time)
schedule.append({
'order_id': order.order_id,
'machine_id': best_machine.machine_id,
'estimated_start': datetime.now(),
'estimated_completion': estimated_completion,
'estimated_duration_hours': production_time
})
# 更新订单
order.assigned_line = best_machine.machine_id
order.status = OrderStatus.IN_PROGRESS
return schedule
def _find_best_machine(self, order: WorkOrder, machines: List[Machine]) -> Optional[Machine]:
"""为工单找到最优机器"""
if not machines:
return None
# 基于多个因素为机器评分
scored_machines = []
for machine in machines:
score = 0
# 偏好 OEE 更高的机器
score += machine.oee * 50
# 偏好空闲的机器
if machine.status == MachineStatus.IDLE:
score += 30
# 偏好最近维护过的机器
days_since_maintenance = (datetime.now() - machine.last_maintenance).days
score += max(0, 20 - days_since_maintenance)
scored_machines.append((score, machine))
# 返回评分最高的机器
scored_machines.sort(reverse=True, key=lambda x: x[0])
return scored_machines[0][1]
def record_production(self,
order_id: str,
produced: int,
defective: int = 0) -> dict:
"""记录生产产出"""
order = self.work_orders.get(order_id)
if not order:
return {'error': 'Order not found'}
order.actual_quantity += produced
order.defect_quantity += defective
# 检查订单是否完成
if order.actual_quantity >= order.quantity:
order.status = OrderStatus.COMPLETED
order.completed_at = datetime.now()
# 计算指标
duration = order.completed_at - order.started_at
yield_rate = ((order.actual_quantity - order.defect_quantity) /
order.actual_quantity * 100)
return {
'order_id': order_id,
'status': 'completed',
'duration_hours': duration.total_seconds() / 3600,
'yield_rate': yield_rate,
'total_produced': order.actual_quantity,
'total_defective': order.defect_quantity
}
return {
'order_id': order_id,
'status': 'in_progress',
'progress_percentage': (order.actual_quantity / order.quantity) * 100
}
def calculate_oee(self,
machine_id: str,
time_period_hours: int = 24) -> dict:
"""计算整体设备效率"""
machine = self.machines.get(machine_id)
if not machine:
return {'error': 'Machine not found'}
# OEE = 可用率 × 性能 × 质量
# 可用率:(运行时间 / 计划生产时间)
planned_time = time_period_hours * 60 # 分钟
downtime = self._get_downtime(machine_id, time_period_hours)
operating_time = planned_time - downtime
availability = operating_time / planned_time
# 性能:(实际产量 / 理想产量)
actual_production = self._get_production_count(machine_id, time_period_hours)
ideal_production = machine.production_rate * time_period_hours
performance = actual_production / ideal_production if ideal_production > 0 else 0
# 质量:(良品数 / 总产量)
defects = self._get_defect_count(machine_id, time_period_hours)
quality = (actual_production - defects) / actual_production if actual_production > 0 else 0
oee = availability * performance * quality
return {
'machine_id': machine_id,
'period_hours': time_period_hours,
'oee': oee * 100, # 百分比
'availability': availability * 100,
'performance': performance * 100,
'quality': quality * 100,
'world_class_oee': 85.0 # 基准
}
def _get_downtime(self, machine_id: str, hours: int) -> float:
"""获取机器停机时间(分钟)"""
# 查询停机时间的生产数据
# 实现将从时间序列数据中聚合
return 0.0
def _get_production_count(self, machine_id: str, hours: int) -> int:
"""获取机器的生产数量"""
# 实现将查询生产记录
return 0
def _get_defect_count(self, machine_id: str, hours: int) -> int:
"""获取机器的缺陷数量"""
# 实现将查询质量记录
return 0
def _generate_order_id(self) -> str:
"""生成唯一订单 ID"""
import uuid
return f"WO-{uuid.uuid4().hex[:8].upper()}"
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from scipy import stats
import numpy as np
class StatisticalProcessControl:
"""用于质量管理的统计过程控制 (SPC)"""
def __init__(self):
self.measurement_history = {}
def calculate_control_limits(self,
measurements: List[float],
sigma_level: float = 3.0) -> dict:
"""计算控制图的控制限"""
mean = np.mean(measurements)
std_dev = np.std(measurements, ddof=1)
ucl = mean + (sigma_level * std_dev) # 上控制限
lcl = mean - (sigma_level * std_dev) # 下控制限
return {
'mean': mean,
'std_dev': std_dev,
'ucl': ucl,
'lcl': lcl,
'sigma_level': sigma_level
}
def detect_out_of_control(self,
measurements: List[float],
control_limits: dict) -> dict:
"""检测失控状况"""
violations = []
# 规则 1:点超出控制限
for i, value in enumerate(measurements):
if value > control_limits['ucl'] or value < control_limits['lcl']:
violations.append({
'rule': 'beyond_limits',
'index': i,
'value': value,
'severity': 'critical'
})
# 规则 2:连续 3 点中有 2 点超出 2σ
sigma_2 = control_limits['std_dev'] * 2
ucl_2 = control_limits['mean'] + sigma_2
lcl_2 = control_limits['mean'] - sigma_2
for i in range(len(measurements) - 2):
window = measurements[i:i+3]
beyond_2sigma = sum(1 for v in window if v > ucl_2 or v < lcl_2)
if beyond_2sigma >= 2:
violations.append({
'rule': '2_of_3_beyond_2sigma',
'index': i,
'severity': 'warning'
})
# 规则 3:连续 9 点位于均值同侧
for i in range(len(measurements) - 8):
window = measurements[i:i+9]
all_above = all(v > control_limits['mean'] for v in window)
all_below = all(v < control_limits['mean'] for v in window)
if all_above or all_below:
violations.append({
'rule': '9_consecutive_same_side',
'index': i,
'severity': 'warning'
})
return {
'in_control': len(violations) == 0,
'violations': violations,
'total_violations': len(violations)
}
def calculate_cpk(self,
measurements: List[float],
lower_spec_limit: float,
upper_spec_limit: float) -> dict:
"""计算过程能力指数 (Cpk)"""
mean = np.mean(measurements)
std_dev = np.std(measurements, ddof=1)
# Cp: 过程能力
cp = (upper_spec_limit - lower_spec_limit) / (6 * std_dev)
# Cpk: 过程能力指数 (考虑中心偏移)
cpu = (upper_spec_limit - mean) / (3 * std_dev)
cpl = (mean - lower_spec_limit) / (3 * std_dev)
cpk = min(cpu, cpl)
# 解释 Cpk
if cpk >= 2.0:
capability = "优秀"
elif cpk >= 1.33:
capability = "足够"
elif cpk >= 1.0:
capability = "临界"
else:
capability = "不足"
return {
'cp': cp,
'cpk': cpk,
'cpu': cpu,
'cpl': cpl,
'capability': capability,
'sigma_level': cpk * 3 if cpk > 0 else 0
}
def perform_gage_rr(self,
measurements: np.ndarray,
n_parts: int,
n_operators: int,
n_trials: int) -> dict:
"""执行测量系统重复性与再现性研究"""
# 重塑数据:(零件 × 操作员 × 试验)
data = measurements.reshape(n_parts, n_operators, n_trials)
# 计算方差分量
part_means = data.mean(axis=(1, 2))
operator_means = data.mean(axis=(0, 2))
overall_mean = data.mean()
# 零件变异
part_variance = np.var(part_means, ddof=1)
# 重复性 (设备变异)
within_operator_variance = np.mean([
np.var(data[:, op, :], ddof=1)
for op in range(n_operators)
])
# 再现性 (操作员变异)
operator_variance = np.var(operator_means, ddof=1)
# 总变异
total_variance = np.var(data, ddof=1)
# 测量系统 R&R
gage_rr = within_operator_variance + operator_variance
gage_rr_percentage = (gage_rr / total_variance) * 100
# 解释
if gage_rr_percentage < 10:
assessment = "可接受"
elif gage_rr_percentage < 30:
assessment = "临界"
else:
assessment = "不可接受"
return {
'gage_rr_percentage': gage_rr_percentage,
'repeatability_percentage': (within_operator_variance / total_variance) * 100,
'reproducibility_percentage': (operator_variance / total_variance) * 100,
'part_variation_percentage': (part_variance / total_variance) * 100,
'assessment': assessment
}
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import pandas as pd
class PredictiveMaintenanceSystem:
"""使用机器学习的预测性维护"""
def __init__(self):
self.model = RandomForestClassifier(n_estimators=100)
self.scaler = StandardScaler()
self.trained = False
def extract_features(self, sensor_data: dict) -> np.ndarray:
"""从传感器数据中提取特征"""
features = [
sensor_data['vibration_rms'],
sensor_data['vibration_peak'],
sensor_data['temperature_c'],
sensor_data['current_a'],
sensor_data['pressure_bar'],
sensor_data['speed_rpm'],
sensor_data['operating_hours'],
sensor_data['cycles_completed']
]
return np.array(features).reshape(1, -1)
def train_model(self, historical_data: pd.DataFrame):
"""训练预测性维护模型"""
# 提取特征和标签
X = historical_data.drop(['machine_id', 'timestamp', 'failure'], axis=1)
y = historical_data['failure']
# 缩放特征
X_scaled = self.scaler.fit_transform(X)
# 训练模型
self.model.fit(X_scaled, y)
self.trained = True
def predict_failure(self, sensor_data: dict) -> dict:
"""预测设备故障概率"""
if not self.trained:
return {'error': 'Model not trained'}
features = self.extract_features(sensor_data)
features_scaled = self.scaler.transform(features)
# 获取故障概率
failure_probability = self.model.predict_proba(features_scaled)[0][1]
# 计算剩余使用寿命 (简化版)
rul_hours = self._estimate_rul(failure_probability)
# 生成建议
if failure_probability > 0.8:
recommendation = "安排立即维护"
priority = "critical"
elif failure_probability > 0.5:
recommendation = "一周内安排维护"
priority = "high"
elif failure_probability > 0.3:
recommendation = "密切监控,安排维护"
priority = "medium"
else:
recommendation = "继续正常操作"
priority = "low"
return {
'failure_probability': failure_probability,
'remaining_useful_life_hours': rul_hours,
'recommendation': recommendation,
'priority': priority,
'timestamp': datetime.now().isoformat()
}
def _estimate_rul(self, failure_probability: float) -> float:
"""估算剩余使用寿命"""
# 简化的 RUL 估算
# 在生产中,使用更复杂的模型 (LSTM, CNN)
if failure_probability < 0.1:
return 720.0 # 30 天
elif failure_probability < 0.3:
return 360.0 # 15 天
elif failure_probability < 0.5:
return 168.0 # 7 天
elif failure_probability < 0.8:
return 48.0 # 2 天
else:
return 12.0 # 12 小时
def analyze_failure_modes(self, sensor_data: dict) -> List[dict]:
"""识别潜在的故障模式"""
failure_modes = []
# 检查轴承故障指标
if sensor_data['vibration_rms'] > 10.0:
failure_modes.append({
'mode': 'bearing_failure',
'indicator': 'high_vibration',
'severity': 'high'
})
# 检查过热
if sensor_data['temperature_c'] > 80.0:
failure_modes.append({
'mode': 'thermal_failure',
'indicator': 'high_temperature',
'severity': 'high'
})
# 检查电气问题
if sensor_data['current_a'] > sensor_data.get('rated_current', 100) * 1.2:
failure_modes.append({
'mode': 'electrical_failure',
'indicator': 'overcurrent',
'severity': 'medium'
})
return failure_modes
class DigitalTwin:
"""制造设备的数字孪生"""
def __init__(self, physical_asset_id: str):
self.asset_id = physical_asset_id
self.virtual_state = {}
self.historical_data = []
self.simulation_model = None
def sync_with_physical(self, sensor_data: dict):
"""将数字孪生与物理资产同步"""
self.virtual_state.update({
'timestamp': datetime.now(),
'sensors': sensor_data,
'calculated_metrics': self._calculate_metrics(sensor_data)
})
self.historical_data.append(self.virtual_state.copy())
def _calculate_metrics(self, sensor_data: dict) -> dict:
"""根据传感器数据计算衍生指标"""
return {
'efficiency': self._calculate_efficiency(sensor_data),
'health_score': self._calculate_health_score(sensor_data),
'energy_consumption': self._calculate_energy(sensor_data)
}
def simulate_scenario(self, scenario_params: dict) -> dict:
"""模拟假设情景"""
# 模拟不同的操作条件
simulated_state = self.virtual_state.copy()
# 应用情景参数
for param, value in scenario_params.items():
if param in simulated_state['sensors']:
simulated_state['sensors'][param] = value
# 重新计算指标
simulated_state['calculated_metrics'] = self._calculate_metrics(
simulated_state['sensors']
)
return {
'scenario': scenario_params,
'predicted_state': simulated_state,
'impact_analysis': self._analyze_impact(simulated_state)
}
def optimize_parameters(self, optimization_goal: str) -> dict:
"""优化操作参数"""
# 使用数字孪生寻找最优设置
# 这将使用优化算法
best_params = {}
best_score = 0
return {
'optimization_goal': optimization_goal,
'recommended_parameters': best_params,
'expected_improvement': best_score
}
def _calculate_efficiency(self, sensor_data: dict) -> float:
"""计算设备效率"""
return 85.0 # 简化版
def _calculate_health_score(self, sensor_data: dict) -> float:
"""计算设备健康评分 (0-100)"""
return 90.0 # 简化版
def _calculate_energy(self, sensor_data: dict) -> float:
"""计算能耗"""
return sensor_data.get('current_a', 0) * sensor_data.get('voltage_v', 0)
def _analyze_impact(self, state: dict) -> dict:
"""分析状态变化的影响"""
return {'impact': 'positive'}
❌ 生产记录手动录入 ❌ 没有预防性维护计划 ❌ 忽视质量控制数据 ❌ 系统孤岛 (无集成) ❌ 没有标准作业程序 ❌ 操作员培训不足 ❌ 关键设备无备用系统 ❌ 库存管理不善
每周安装数
141
仓库
GitHub 星标数
11
首次出现
2026年1月24日
安全审计
安装于
opencode126
gemini-cli125
codex124
cursor118
github-copilot114
amp108
Expert guidance for manufacturing systems, Industry 4.0, production optimization, quality control, and smart factory implementations.
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
from enum import Enum
class OrderStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
ON_HOLD = "on_hold"
CANCELLED = "cancelled"
class MachineStatus(Enum):
IDLE = "idle"
RUNNING = "running"
MAINTENANCE = "maintenance"
ERROR = "error"
OFFLINE = "offline"
@dataclass
class WorkOrder:
"""Manufacturing work order"""
order_id: str
product_id: str
quantity: int
priority: int # 1 (highest) to 5 (lowest)
due_date: datetime
status: OrderStatus
assigned_line: Optional[str]
started_at: Optional[datetime]
completed_at: Optional[datetime]
actual_quantity: int = 0
defect_quantity: int = 0
@dataclass
class Machine:
"""Production machine/equipment"""
machine_id: str
machine_type: str
status: MachineStatus
current_order: Optional[str]
production_rate: float # units per hour
uptime_percentage: float
last_maintenance: datetime
next_maintenance: datetime
oee: float # Overall Equipment Effectiveness
@dataclass
class ProductionMetrics:
"""Real-time production metrics"""
timestamp: datetime
line_id: str
produced_units: int
defective_units: int
downtime_minutes: int
cycle_time_seconds: float
efficiency_percentage: float
class ManufacturingExecutionSystem:
"""MES for production management"""
def __init__(self):
self.work_orders = {}
self.machines = {}
self.production_data = []
def create_work_order(self,
product_id: str,
quantity: int,
due_date: datetime,
priority: int = 3) -> WorkOrder:
"""Create new production work order"""
order_id = self._generate_order_id()
order = WorkOrder(
order_id=order_id,
product_id=product_id,
quantity=quantity,
priority=priority,
due_date=due_date,
status=OrderStatus.PENDING,
assigned_line=None,
started_at=None,
completed_at=None
)
self.work_orders[order_id] = order
return order
def schedule_production(self) -> List[dict]:
"""Schedule work orders to production lines"""
# Get pending orders sorted by priority and due date
pending_orders = [
order for order in self.work_orders.values()
if order.status == OrderStatus.PENDING
]
sorted_orders = sorted(
pending_orders,
key=lambda x: (x.priority, x.due_date)
)
# Get available machines
available_machines = [
machine for machine in self.machines.values()
if machine.status in [MachineStatus.IDLE, MachineStatus.RUNNING]
]
schedule = []
for order in sorted_orders:
# Find best machine for this order
best_machine = self._find_best_machine(order, available_machines)
if best_machine:
# Calculate estimated completion time
production_time = order.quantity / best_machine.production_rate
estimated_completion = datetime.now() + timedelta(hours=production_time)
schedule.append({
'order_id': order.order_id,
'machine_id': best_machine.machine_id,
'estimated_start': datetime.now(),
'estimated_completion': estimated_completion,
'estimated_duration_hours': production_time
})
# Update order
order.assigned_line = best_machine.machine_id
order.status = OrderStatus.IN_PROGRESS
return schedule
def _find_best_machine(self, order: WorkOrder, machines: List[Machine]) -> Optional[Machine]:
"""Find optimal machine for work order"""
if not machines:
return None
# Score machines based on multiple factors
scored_machines = []
for machine in machines:
score = 0
# Prefer machines with higher OEE
score += machine.oee * 50
# Prefer machines that are idle
if machine.status == MachineStatus.IDLE:
score += 30
# Prefer machines with recent maintenance
days_since_maintenance = (datetime.now() - machine.last_maintenance).days
score += max(0, 20 - days_since_maintenance)
scored_machines.append((score, machine))
# Return highest scoring machine
scored_machines.sort(reverse=True, key=lambda x: x[0])
return scored_machines[0][1]
def record_production(self,
order_id: str,
produced: int,
defective: int = 0) -> dict:
"""Record production output"""
order = self.work_orders.get(order_id)
if not order:
return {'error': 'Order not found'}
order.actual_quantity += produced
order.defect_quantity += defective
# Check if order is complete
if order.actual_quantity >= order.quantity:
order.status = OrderStatus.COMPLETED
order.completed_at = datetime.now()
# Calculate metrics
duration = order.completed_at - order.started_at
yield_rate = ((order.actual_quantity - order.defect_quantity) /
order.actual_quantity * 100)
return {
'order_id': order_id,
'status': 'completed',
'duration_hours': duration.total_seconds() / 3600,
'yield_rate': yield_rate,
'total_produced': order.actual_quantity,
'total_defective': order.defect_quantity
}
return {
'order_id': order_id,
'status': 'in_progress',
'progress_percentage': (order.actual_quantity / order.quantity) * 100
}
def calculate_oee(self,
machine_id: str,
time_period_hours: int = 24) -> dict:
"""Calculate Overall Equipment Effectiveness"""
machine = self.machines.get(machine_id)
if not machine:
return {'error': 'Machine not found'}
# OEE = Availability × Performance × Quality
# Availability: (Operating Time / Planned Production Time)
planned_time = time_period_hours * 60 # minutes
downtime = self._get_downtime(machine_id, time_period_hours)
operating_time = planned_time - downtime
availability = operating_time / planned_time
# Performance: (Actual Production / Ideal Production)
actual_production = self._get_production_count(machine_id, time_period_hours)
ideal_production = machine.production_rate * time_period_hours
performance = actual_production / ideal_production if ideal_production > 0 else 0
# Quality: (Good Units / Total Units)
defects = self._get_defect_count(machine_id, time_period_hours)
quality = (actual_production - defects) / actual_production if actual_production > 0 else 0
oee = availability * performance * quality
return {
'machine_id': machine_id,
'period_hours': time_period_hours,
'oee': oee * 100, # Percentage
'availability': availability * 100,
'performance': performance * 100,
'quality': quality * 100,
'world_class_oee': 85.0 # Benchmark
}
def _get_downtime(self, machine_id: str, hours: int) -> float:
"""Get machine downtime in minutes"""
# Query production data for downtime
# Implementation would aggregate from time-series data
return 0.0
def _get_production_count(self, machine_id: str, hours: int) -> int:
"""Get production count for machine"""
# Implementation would query production records
return 0
def _get_defect_count(self, machine_id: str, hours: int) -> int:
"""Get defect count for machine"""
# Implementation would query quality records
return 0
def _generate_order_id(self) -> str:
"""Generate unique order ID"""
import uuid
return f"WO-{uuid.uuid4().hex[:8].upper()}"
from scipy import stats
import numpy as np
class StatisticalProcessControl:
"""Statistical Process Control (SPC) for quality management"""
def __init__(self):
self.measurement_history = {}
def calculate_control_limits(self,
measurements: List[float],
sigma_level: float = 3.0) -> dict:
"""Calculate control limits for control charts"""
mean = np.mean(measurements)
std_dev = np.std(measurements, ddof=1)
ucl = mean + (sigma_level * std_dev) # Upper Control Limit
lcl = mean - (sigma_level * std_dev) # Lower Control Limit
return {
'mean': mean,
'std_dev': std_dev,
'ucl': ucl,
'lcl': lcl,
'sigma_level': sigma_level
}
def detect_out_of_control(self,
measurements: List[float],
control_limits: dict) -> dict:
"""Detect out-of-control conditions"""
violations = []
# Rule 1: Point beyond control limits
for i, value in enumerate(measurements):
if value > control_limits['ucl'] or value < control_limits['lcl']:
violations.append({
'rule': 'beyond_limits',
'index': i,
'value': value,
'severity': 'critical'
})
# Rule 2: 2 out of 3 consecutive points beyond 2σ
sigma_2 = control_limits['std_dev'] * 2
ucl_2 = control_limits['mean'] + sigma_2
lcl_2 = control_limits['mean'] - sigma_2
for i in range(len(measurements) - 2):
window = measurements[i:i+3]
beyond_2sigma = sum(1 for v in window if v > ucl_2 or v < lcl_2)
if beyond_2sigma >= 2:
violations.append({
'rule': '2_of_3_beyond_2sigma',
'index': i,
'severity': 'warning'
})
# Rule 3: 9 consecutive points on same side of mean
for i in range(len(measurements) - 8):
window = measurements[i:i+9]
all_above = all(v > control_limits['mean'] for v in window)
all_below = all(v < control_limits['mean'] for v in window)
if all_above or all_below:
violations.append({
'rule': '9_consecutive_same_side',
'index': i,
'severity': 'warning'
})
return {
'in_control': len(violations) == 0,
'violations': violations,
'total_violations': len(violations)
}
def calculate_cpk(self,
measurements: List[float],
lower_spec_limit: float,
upper_spec_limit: float) -> dict:
"""Calculate Process Capability Index (Cpk)"""
mean = np.mean(measurements)
std_dev = np.std(measurements, ddof=1)
# Cp: Process Capability
cp = (upper_spec_limit - lower_spec_limit) / (6 * std_dev)
# Cpk: Process Capability Index (accounts for centering)
cpu = (upper_spec_limit - mean) / (3 * std_dev)
cpl = (mean - lower_spec_limit) / (3 * std_dev)
cpk = min(cpu, cpl)
# Interpret Cpk
if cpk >= 2.0:
capability = "Excellent"
elif cpk >= 1.33:
capability = "Adequate"
elif cpk >= 1.0:
capability = "Marginal"
else:
capability = "Inadequate"
return {
'cp': cp,
'cpk': cpk,
'cpu': cpu,
'cpl': cpl,
'capability': capability,
'sigma_level': cpk * 3 if cpk > 0 else 0
}
def perform_gage_rr(self,
measurements: np.ndarray,
n_parts: int,
n_operators: int,
n_trials: int) -> dict:
"""Perform Gage Repeatability and Reproducibility study"""
# Reshape data: (parts × operators × trials)
data = measurements.reshape(n_parts, n_operators, n_trials)
# Calculate variance components
part_means = data.mean(axis=(1, 2))
operator_means = data.mean(axis=(0, 2))
overall_mean = data.mean()
# Part variation
part_variance = np.var(part_means, ddof=1)
# Repeatability (equipment variation)
within_operator_variance = np.mean([
np.var(data[:, op, :], ddof=1)
for op in range(n_operators)
])
# Reproducibility (operator variation)
operator_variance = np.var(operator_means, ddof=1)
# Total variation
total_variance = np.var(data, ddof=1)
# Gage R&R
gage_rr = within_operator_variance + operator_variance
gage_rr_percentage = (gage_rr / total_variance) * 100
# Interpretation
if gage_rr_percentage < 10:
assessment = "Acceptable"
elif gage_rr_percentage < 30:
assessment = "Marginal"
else:
assessment = "Unacceptable"
return {
'gage_rr_percentage': gage_rr_percentage,
'repeatability_percentage': (within_operator_variance / total_variance) * 100,
'reproducibility_percentage': (operator_variance / total_variance) * 100,
'part_variation_percentage': (part_variance / total_variance) * 100,
'assessment': assessment
}
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import pandas as pd
class PredictiveMaintenanceSystem:
"""Predictive maintenance using machine learning"""
def __init__(self):
self.model = RandomForestClassifier(n_estimators=100)
self.scaler = StandardScaler()
self.trained = False
def extract_features(self, sensor_data: dict) -> np.ndarray:
"""Extract features from sensor data"""
features = [
sensor_data['vibration_rms'],
sensor_data['vibration_peak'],
sensor_data['temperature_c'],
sensor_data['current_a'],
sensor_data['pressure_bar'],
sensor_data['speed_rpm'],
sensor_data['operating_hours'],
sensor_data['cycles_completed']
]
return np.array(features).reshape(1, -1)
def train_model(self, historical_data: pd.DataFrame):
"""Train predictive maintenance model"""
# Extract features and labels
X = historical_data.drop(['machine_id', 'timestamp', 'failure'], axis=1)
y = historical_data['failure']
# Scale features
X_scaled = self.scaler.fit_transform(X)
# Train model
self.model.fit(X_scaled, y)
self.trained = True
def predict_failure(self, sensor_data: dict) -> dict:
"""Predict equipment failure probability"""
if not self.trained:
return {'error': 'Model not trained'}
features = self.extract_features(sensor_data)
features_scaled = self.scaler.transform(features)
# Get failure probability
failure_probability = self.model.predict_proba(features_scaled)[0][1]
# Calculate remaining useful life (simplified)
rul_hours = self._estimate_rul(failure_probability)
# Generate recommendation
if failure_probability > 0.8:
recommendation = "Schedule immediate maintenance"
priority = "critical"
elif failure_probability > 0.5:
recommendation = "Schedule maintenance within 1 week"
priority = "high"
elif failure_probability > 0.3:
recommendation = "Monitor closely, schedule maintenance"
priority = "medium"
else:
recommendation = "Continue normal operation"
priority = "low"
return {
'failure_probability': failure_probability,
'remaining_useful_life_hours': rul_hours,
'recommendation': recommendation,
'priority': priority,
'timestamp': datetime.now().isoformat()
}
def _estimate_rul(self, failure_probability: float) -> float:
"""Estimate Remaining Useful Life"""
# Simplified RUL estimation
# In production, use more sophisticated models (LSTM, CNN)
if failure_probability < 0.1:
return 720.0 # 30 days
elif failure_probability < 0.3:
return 360.0 # 15 days
elif failure_probability < 0.5:
return 168.0 # 7 days
elif failure_probability < 0.8:
return 48.0 # 2 days
else:
return 12.0 # 12 hours
def analyze_failure_modes(self, sensor_data: dict) -> List[dict]:
"""Identify potential failure modes"""
failure_modes = []
# Check for bearing failure indicators
if sensor_data['vibration_rms'] > 10.0:
failure_modes.append({
'mode': 'bearing_failure',
'indicator': 'high_vibration',
'severity': 'high'
})
# Check for overheating
if sensor_data['temperature_c'] > 80.0:
failure_modes.append({
'mode': 'thermal_failure',
'indicator': 'high_temperature',
'severity': 'high'
})
# Check for electrical issues
if sensor_data['current_a'] > sensor_data.get('rated_current', 100) * 1.2:
failure_modes.append({
'mode': 'electrical_failure',
'indicator': 'overcurrent',
'severity': 'medium'
})
return failure_modes
class DigitalTwin:
"""Digital twin for manufacturing equipment"""
def __init__(self, physical_asset_id: str):
self.asset_id = physical_asset_id
self.virtual_state = {}
self.historical_data = []
self.simulation_model = None
def sync_with_physical(self, sensor_data: dict):
"""Synchronize digital twin with physical asset"""
self.virtual_state.update({
'timestamp': datetime.now(),
'sensors': sensor_data,
'calculated_metrics': self._calculate_metrics(sensor_data)
})
self.historical_data.append(self.virtual_state.copy())
def _calculate_metrics(self, sensor_data: dict) -> dict:
"""Calculate derived metrics from sensor data"""
return {
'efficiency': self._calculate_efficiency(sensor_data),
'health_score': self._calculate_health_score(sensor_data),
'energy_consumption': self._calculate_energy(sensor_data)
}
def simulate_scenario(self, scenario_params: dict) -> dict:
"""Simulate what-if scenarios"""
# Simulate different operating conditions
simulated_state = self.virtual_state.copy()
# Apply scenario parameters
for param, value in scenario_params.items():
if param in simulated_state['sensors']:
simulated_state['sensors'][param] = value
# Recalculate metrics
simulated_state['calculated_metrics'] = self._calculate_metrics(
simulated_state['sensors']
)
return {
'scenario': scenario_params,
'predicted_state': simulated_state,
'impact_analysis': self._analyze_impact(simulated_state)
}
def optimize_parameters(self, optimization_goal: str) -> dict:
"""Optimize operating parameters"""
# Use digital twin to find optimal settings
# This would use optimization algorithms
best_params = {}
best_score = 0
return {
'optimization_goal': optimization_goal,
'recommended_parameters': best_params,
'expected_improvement': best_score
}
def _calculate_efficiency(self, sensor_data: dict) -> float:
"""Calculate equipment efficiency"""
return 85.0 # Simplified
def _calculate_health_score(self, sensor_data: dict) -> float:
"""Calculate equipment health score (0-100)"""
return 90.0 # Simplified
def _calculate_energy(self, sensor_data: dict) -> float:
"""Calculate energy consumption"""
return sensor_data.get('current_a', 0) * sensor_data.get('voltage_v', 0)
def _analyze_impact(self, state: dict) -> dict:
"""Analyze impact of state change"""
return {'impact': 'positive'}
❌ Manual data entry for production records ❌ No preventive maintenance program ❌ Ignoring quality control data ❌ Siloed systems (no integration) ❌ No standard operating procedures ❌ Inadequate operator training ❌ No backup systems for critical equipment ❌ Poor inventory management
Weekly Installs
141
Repository
GitHub Stars
11
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode126
gemini-cli125
codex124
cursor118
github-copilot114
amp108
Sonos CLI 命令行工具:本地网络控制 Sonos 音箱,支持分组、收藏、Spotify 搜索
708 周安装
GMGN Portfolio:命令行工具查询Solana/BSC/Base链钱包投资组合与交易统计
1,100 周安装
UI/UX Pro Max - 50+样式与97配色方案,网页移动端设计指南与无障碍访问规范
1,000 周安装
Laravel架构模式指南:生产级开发模式与最佳实践
1,100 周安装
Kotlin Exposed ORM 模式指南:DSL查询、DAO、事务管理与生产配置
1,100 周安装
免费AI数据抓取智能体:自动化收集、丰富与存储网站/API数据
1,100 周安装
GraphQL 操作最佳实践指南:查询、变更、订阅编写与优化技巧
1,100 周安装