retail-expert by personamanagmentlayer/pcl
npx skills add https://github.com/personamanagmentlayer/pcl --skill retail-expert为零售系统、销售点解决方案、库存管理、电子商务平台、客户分析和全渠道零售策略提供专业指导。
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import List, Optional
from enum import Enum
class PaymentMethod(Enum):
CASH = "cash"
CREDIT_CARD = "credit_card"
DEBIT_CARD = "debit_card"
MOBILE_PAYMENT = "mobile_payment"
GIFT_CARD = "gift_card"
class TransactionStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
VOIDED = "voided"
REFUNDED = "refunded"
@dataclass
class Product:
"""产品/SKU 信息"""
sku: str
name: str
description: str
price: Decimal
cost: Decimal
barcode: str
category: str
department: str
tax_rate: Decimal
is_taxable: bool
stock_quantity: int
reorder_point: int
@dataclass
class LineItem:
"""交易行项目"""
sku: str
product_name: str
quantity: int
unit_price: Decimal
discount_amount: Decimal
tax_amount: Decimal
line_total: Decimal
@dataclass
class Transaction:
"""POS 交易"""
transaction_id: str
store_id: str
register_id: str
cashier_id: str
timestamp: datetime
items: List[LineItem]
subtotal: Decimal
tax_total: Decimal
discount_total: Decimal
grand_total: Decimal
payment_method: PaymentMethod
status: TransactionStatus
customer_id: Optional[str]
class POSSystem:
"""销售点系统"""
def __init__(self, store_id: str, register_id: str):
self.store_id = store_id
self.register_id = register_id
self.current_transaction = None
self.products = {}
def start_transaction(self, cashier_id: str) -> str:
"""开始新交易"""
transaction_id = self._generate_transaction_id()
self.current_transaction = Transaction(
transaction_id=transaction_id,
store_id=self.store_id,
register_id=self.register_id,
cashier_id=cashier_id,
timestamp=datetime.now(),
items=[],
subtotal=Decimal('0'),
tax_total=Decimal('0'),
discount_total=Decimal('0'),
grand_total=Decimal('0'),
payment_method=None,
status=TransactionStatus.PENDING,
customer_id=None
)
return transaction_id
def scan_item(self, barcode: str, quantity: int = 1) -> dict:
"""扫描商品并添加到交易"""
if not self.current_transaction:
return {'error': 'No active transaction'}
# 查找商品
product = self._lookup_product(barcode)
if not product:
return {'error': 'Product not found', 'barcode': barcode}
# 检查库存
if product.stock_quantity < quantity:
return {
'error': 'Insufficient inventory',
'available': product.stock_quantity
}
# 计算行项目总计
unit_price = product.price
line_subtotal = unit_price * quantity
discount_amount = Decimal('0') # 在此处应用促销
# 计算税费
tax_amount = Decimal('0')
if product.is_taxable:
tax_amount = (line_subtotal - discount_amount) * product.tax_rate
line_total = line_subtotal - discount_amount + tax_amount
# 创建行项目
line_item = LineItem(
sku=product.sku,
product_name=product.name,
quantity=quantity,
unit_price=unit_price,
discount_amount=discount_amount,
tax_amount=tax_amount,
line_total=line_total
)
# 添加到交易
self.current_transaction.items.append(line_item)
# 更新交易总计
self._recalculate_totals()
return {
'success': True,
'item': {
'name': product.name,
'quantity': quantity,
'price': float(unit_price),
'line_total': float(line_total)
},
'transaction_total': float(self.current_transaction.grand_total)
}
def apply_discount(self, discount_code: str) -> dict:
"""将折扣/促销应用于交易"""
if not self.current_transaction:
return {'error': 'No active transaction'}
discount = self._validate_discount(discount_code)
if not discount:
return {'error': 'Invalid discount code'}
# 根据类型应用折扣
if discount['type'] == 'percentage':
discount_amount = self.current_transaction.subtotal * (discount['value'] / 100)
elif discount['type'] == 'fixed':
discount_amount = Decimal(str(discount['value']))
else:
return {'error': 'Unknown discount type'}
self.current_transaction.discount_total += discount_amount
self._recalculate_totals()
return {
'success': True,
'discount_applied': float(discount_amount),
'new_total': float(self.current_transaction.grand_total)
}
def process_payment(self,
payment_method: PaymentMethod,
amount: Decimal,
payment_details: dict = None) -> dict:
"""处理交易付款"""
if not self.current_transaction:
return {'error': 'No active transaction'}
if amount < self.current_transaction.grand_total:
return {'error': 'Insufficient payment amount'}
# 通过支付网关处理付款
payment_result = self._process_payment_gateway(
payment_method,
amount,
payment_details
)
if not payment_result['success']:
return payment_result
# 完成交易
self.current_transaction.payment_method = payment_method
self.current_transaction.status = TransactionStatus.COMPLETED
# 更新库存
self._update_inventory()
# 计算找零
change = amount - self.current_transaction.grand_total
# 生成收据
receipt = self._generate_receipt()
transaction_id = self.current_transaction.transaction_id
self.current_transaction = None # 清除当前交易
return {
'success': True,
'transaction_id': transaction_id,
'amount_paid': float(amount),
'change': float(change),
'receipt': receipt
}
def void_transaction(self, reason: str) -> dict:
"""作废当前交易"""
if not self.current_transaction:
return {'error': 'No active transaction'}
self.current_transaction.status = TransactionStatus.VOIDED
transaction_id = self.current_transaction.transaction_id
self.current_transaction = None
return {
'success': True,
'transaction_id': transaction_id,
'reason': reason
}
def _recalculate_totals(self):
"""重新计算交易总计"""
self.current_transaction.subtotal = sum(
item.unit_price * item.quantity for item in self.current_transaction.items
)
self.current_transaction.tax_total = sum(
item.tax_amount for item in self.current_transaction.items
)
self.current_transaction.grand_total = (
self.current_transaction.subtotal +
self.current_transaction.tax_total -
self.current_transaction.discount_total
)
def _lookup_product(self, barcode: str) -> Optional[Product]:
"""通过条形码查找商品"""
return self.products.get(barcode)
def _validate_discount(self, discount_code: str) -> Optional[dict]:
"""验证并获取折扣详情"""
# 实现将对照促销数据库进行检查
return None
def _process_payment_gateway(self,
payment_method: PaymentMethod,
amount: Decimal,
details: dict) -> dict:
"""通过网关处理付款"""
# 与支付处理商集成
return {'success': True, 'transaction_id': 'pay_123456'}
def _update_inventory(self):
"""销售后更新库存"""
for item in self.current_transaction.items:
product = self.products.get(item.sku)
if product:
product.stock_quantity -= item.quantity
def _generate_receipt(self) -> dict:
"""生成交易收据"""
return {
'transaction_id': self.current_transaction.transaction_id,
'timestamp': self.current_transaction.timestamp.isoformat(),
'items': [
{
'name': item.product_name,
'qty': item.quantity,
'price': float(item.unit_price),
'total': float(item.line_total)
}
for item in self.current_transaction.items
],
'subtotal': float(self.current_transaction.subtotal),
'tax': float(self.current_transaction.tax_total),
'discount': float(self.current_transaction.discount_total),
'total': float(self.current_transaction.grand_total)
}
def _generate_transaction_id(self) -> str:
"""生成唯一交易 ID"""
import uuid
return f"TXN-{uuid.uuid4().hex[:12].upper()}"
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
import numpy as np
from datetime import datetime, timedelta
class InventoryManagementSystem:
"""库存管理与优化"""
def __init__(self):
self.products = {}
self.warehouses = {}
self.transfer_orders = []
def calculate_reorder_point(self,
average_daily_demand: float,
lead_time_days: int,
service_level: float = 0.95) -> dict:
"""计算最佳再订货点"""
# 安全库存计算
demand_std_dev = average_daily_demand * 0.2 # 假设 20% 的波动
# 服务水平的 Z 分数
from scipy import stats
z_score = stats.norm.ppf(service_level)
safety_stock = z_score * demand_std_dev * np.sqrt(lead_time_days)
reorder_point = (average_daily_demand * lead_time_days) + safety_stock
return {
'reorder_point': int(np.ceil(reorder_point)),
'safety_stock': int(np.ceil(safety_stock)),
'average_daily_demand': average_daily_demand,
'lead_time_days': lead_time_days,
'service_level': service_level
}
def calculate_economic_order_quantity(self,
annual_demand: float,
ordering_cost: Decimal,
holding_cost_per_unit: Decimal) -> dict:
"""计算经济订货批量"""
eoq = np.sqrt(
(2 * annual_demand * float(ordering_cost)) /
float(holding_cost_per_unit)
)
# 计算年度总成本
number_of_orders = annual_demand / eoq
ordering_cost_total = number_of_orders * float(ordering_cost)
holding_cost_total = (eoq / 2) * float(holding_cost_per_unit)
total_cost = ordering_cost_total + holding_cost_total
return {
'eoq': int(np.ceil(eoq)),
'orders_per_year': number_of_orders,
'order_frequency_days': int(365 / number_of_orders),
'total_annual_cost': total_cost,
'ordering_cost': ordering_cost_total,
'holding_cost': holding_cost_total
}
def analyze_abc(self, products: List[dict]) -> dict:
"""用于库存分类的 ABC 分析"""
# 计算每个产品的年度价值
for product in products:
product['annual_value'] = (
product['unit_cost'] * product['annual_demand']
)
# 按年度价值排序
sorted_products = sorted(
products,
key=lambda x: x['annual_value'],
reverse=True
)
total_value = sum(p['annual_value'] for p in sorted_products)
cumulative_value = 0
results = {'A': [], 'B': [], 'C': []}
for product in sorted_products:
cumulative_value += product['annual_value']
percentage = (cumulative_value / total_value) * 100
if percentage <= 80:
category = 'A' # 前 20% 项目,80% 价值
elif percentage <= 95:
category = 'B' # 接下来 30% 项目,15% 价值
else:
category = 'C' # 后 50% 项目,5% 价值
product['abc_category'] = category
results[category].append(product)
return {
'classification': results,
'summary': {
'A_items': len(results['A']),
'B_items': len(results['B']),
'C_items': len(results['C']),
'total_value': total_value
}
}
def forecast_demand(self,
historical_sales: List[float],
periods_ahead: int = 12) -> dict:
"""使用指数平滑法预测未来需求"""
# 三重指数平滑
alpha = 0.3 # 水平平滑
beta = 0.1 # 趋势平滑
gamma = 0.2 # 季节性平滑
season_length = 12 # 月度季节性
n = len(historical_sales)
forecast = []
# 初始化水平和趋势
level = np.mean(historical_sales[:season_length])
trend = (np.mean(historical_sales[season_length:2*season_length]) -
np.mean(historical_sales[:season_length])) / season_length
# 初始化季节性指数
seasonal = np.array(historical_sales[:season_length]) / level
# 生成预测
for i in range(periods_ahead):
season_idx = i % season_length
forecast_value = (level + trend * (i + 1)) * seasonal[season_idx]
forecast.append(max(0, forecast_value))
return {
'forecast': forecast,
'periods_ahead': periods_ahead,
'method': 'holt_winters',
'confidence_interval_95': self._calculate_confidence_interval(
historical_sales,
forecast
)
}
def check_stock_levels(self) -> List[dict]:
"""检查库存水平并生成警报"""
alerts = []
for sku, product in self.products.items():
# 检查低库存
if product.stock_quantity <= product.reorder_point:
alerts.append({
'type': 'reorder',
'severity': 'high',
'sku': sku,
'product_name': product.name,
'current_stock': product.stock_quantity,
'reorder_point': product.reorder_point,
'action': 'Place purchase order'
})
# 检查库存过多
max_stock = product.reorder_point * 3
if product.stock_quantity > max_stock:
alerts.append({
'type': 'overstock',
'severity': 'medium',
'sku': sku,
'product_name': product.name,
'current_stock': product.stock_quantity,
'max_stock': max_stock,
'action': 'Review purchasing strategy'
})
# 检查无销售情况
# 实现将检查销售历史
return alerts
def _calculate_confidence_interval(self,
historical: List[float],
forecast: List[float]) -> dict:
"""计算预测的 95% 置信区间"""
# 简化的置信区间
std_error = np.std(historical) * 1.5
return {
'lower': [max(0, f - 1.96 * std_error) for f in forecast],
'upper': [f + 1.96 * std_error for f in forecast]
}
from sklearn.cluster import KMeans
import pandas as pd
class CustomerAnalytics:
"""客户细分与分析"""
def __init__(self):
self.customers = {}
self.transactions = []
def calculate_rfm(self, customer_transactions: pd.DataFrame) -> pd.DataFrame:
"""计算 RFM 分数"""
current_date = datetime.now()
rfm = customer_transactions.groupby('customer_id').agg({
'transaction_date': lambda x: (current_date - x.max()).days, # 最近性
'transaction_id': 'count', # 频率
'amount': 'sum' # 货币价值
})
rfm.columns = ['recency', 'frequency', 'monetary']
# 计算 RFM 分数
rfm['r_score'] = pd.qcut(rfm['recency'], 5, labels=[5, 4, 3, 2, 1])
rfm['f_score'] = pd.qcut(rfm['frequency'].rank(method='first'), 5, labels=[1, 2, 3, 4, 5])
rfm['m_score'] = pd.qcut(rfm['monetary'], 5, labels=[1, 2, 3, 4, 5])
# 组合 RFM 分数
rfm['rfm_score'] = (
rfm['r_score'].astype(int) +
rfm['f_score'].astype(int) +
rfm['m_score'].astype(int)
)
return rfm
def segment_customers(self, rfm_data: pd.DataFrame) -> dict:
"""基于 RFM 分数细分客户"""
segments = {}
for customer_id, row in rfm_data.iterrows():
r, f, m = int(row['r_score']), int(row['f_score']), int(row['m_score'])
if r >= 4 and f >= 4 and m >= 4:
segment = 'Champions'
elif r >= 3 and f >= 3 and m >= 3:
segment = 'Loyal Customers'
elif r >= 4 and f <= 2:
segment = 'New Customers'
elif r <= 2 and f >= 3:
segment = 'At Risk'
elif r <= 2 and f <= 2:
segment = 'Lost Customers'
elif m >= 4:
segment = 'Big Spenders'
else:
segment = 'Regular Customers'
segments[customer_id] = {
'segment': segment,
'rfm_scores': {'r': r, 'f': f, 'm': m}
}
return segments
def calculate_customer_lifetime_value(self,
average_purchase_value: Decimal,
purchase_frequency: float,
customer_lifespan_years: float) -> Decimal:
"""计算客户终身价值"""
clv = (
float(average_purchase_value) *
purchase_frequency *
customer_lifespan_years
)
return Decimal(str(clv)).quantize(Decimal('0.01'))
def predict_churn(self, customer_features: dict) -> dict:
"""预测客户流失概率"""
# 特征:最近性、频率、货币价值、上次购买天数等
# 这将使用训练好的机器学习模型
churn_score = 0.35 # 占位符
if churn_score > 0.7:
risk = 'high'
action = 'Send personalized offer immediately'
elif churn_score > 0.4:
risk = 'medium'
action = 'Include in next marketing campaign'
else:
risk = 'low'
action = 'Continue regular engagement'
return {
'churn_probability': churn_score,
'risk_level': risk,
'recommended_action': action
}
def recommend_products(self,
customer_id: str,
top_n: int = 5) -> List[dict]:
"""生成产品推荐"""
# 协同过滤或基于内容的推荐
# 这将使用推荐算法
recommendations = [
{
'sku': 'PROD001',
'name': 'Recommended Product 1',
'score': 0.95,
'reason': 'Frequently bought together'
}
]
return recommendations[:top_n]
❌ 没有库存追踪或计数不准确 ❌ 仅支持单一支付方式 ❌ 糟糕的结账体验 ❌ 不收集客户数据 ❌ 线上和线下系统孤立 ❌ 跨地点手动更新价格 ❌ POS 系统没有备份 ❌ 忽视购物车放弃 ❌ 没有产品推荐
每周安装次数
82
代码仓库
GitHub 星标数
11
首次出现
2026年1月24日
安全审计
安装于
opencode71
codex71
gemini-cli69
github-copilot63
cursor62
kimi-cli60
Expert guidance for retail systems, point-of-sale solutions, inventory management, e-commerce platforms, customer analytics, and omnichannel retail strategies.
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import List, Optional
from enum import Enum
class PaymentMethod(Enum):
CASH = "cash"
CREDIT_CARD = "credit_card"
DEBIT_CARD = "debit_card"
MOBILE_PAYMENT = "mobile_payment"
GIFT_CARD = "gift_card"
class TransactionStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
VOIDED = "voided"
REFUNDED = "refunded"
@dataclass
class Product:
"""Product/SKU information"""
sku: str
name: str
description: str
price: Decimal
cost: Decimal
barcode: str
category: str
department: str
tax_rate: Decimal
is_taxable: bool
stock_quantity: int
reorder_point: int
@dataclass
class LineItem:
"""Transaction line item"""
sku: str
product_name: str
quantity: int
unit_price: Decimal
discount_amount: Decimal
tax_amount: Decimal
line_total: Decimal
@dataclass
class Transaction:
"""POS transaction"""
transaction_id: str
store_id: str
register_id: str
cashier_id: str
timestamp: datetime
items: List[LineItem]
subtotal: Decimal
tax_total: Decimal
discount_total: Decimal
grand_total: Decimal
payment_method: PaymentMethod
status: TransactionStatus
customer_id: Optional[str]
class POSSystem:
"""Point of Sale system"""
def __init__(self, store_id: str, register_id: str):
self.store_id = store_id
self.register_id = register_id
self.current_transaction = None
self.products = {}
def start_transaction(self, cashier_id: str) -> str:
"""Start new transaction"""
transaction_id = self._generate_transaction_id()
self.current_transaction = Transaction(
transaction_id=transaction_id,
store_id=self.store_id,
register_id=self.register_id,
cashier_id=cashier_id,
timestamp=datetime.now(),
items=[],
subtotal=Decimal('0'),
tax_total=Decimal('0'),
discount_total=Decimal('0'),
grand_total=Decimal('0'),
payment_method=None,
status=TransactionStatus.PENDING,
customer_id=None
)
return transaction_id
def scan_item(self, barcode: str, quantity: int = 1) -> dict:
"""Scan and add item to transaction"""
if not self.current_transaction:
return {'error': 'No active transaction'}
# Lookup product
product = self._lookup_product(barcode)
if not product:
return {'error': 'Product not found', 'barcode': barcode}
# Check inventory
if product.stock_quantity < quantity:
return {
'error': 'Insufficient inventory',
'available': product.stock_quantity
}
# Calculate line item totals
unit_price = product.price
line_subtotal = unit_price * quantity
discount_amount = Decimal('0') # Apply promotions here
# Calculate tax
tax_amount = Decimal('0')
if product.is_taxable:
tax_amount = (line_subtotal - discount_amount) * product.tax_rate
line_total = line_subtotal - discount_amount + tax_amount
# Create line item
line_item = LineItem(
sku=product.sku,
product_name=product.name,
quantity=quantity,
unit_price=unit_price,
discount_amount=discount_amount,
tax_amount=tax_amount,
line_total=line_total
)
# Add to transaction
self.current_transaction.items.append(line_item)
# Update transaction totals
self._recalculate_totals()
return {
'success': True,
'item': {
'name': product.name,
'quantity': quantity,
'price': float(unit_price),
'line_total': float(line_total)
},
'transaction_total': float(self.current_transaction.grand_total)
}
def apply_discount(self, discount_code: str) -> dict:
"""Apply discount/promotion to transaction"""
if not self.current_transaction:
return {'error': 'No active transaction'}
discount = self._validate_discount(discount_code)
if not discount:
return {'error': 'Invalid discount code'}
# Apply discount based on type
if discount['type'] == 'percentage':
discount_amount = self.current_transaction.subtotal * (discount['value'] / 100)
elif discount['type'] == 'fixed':
discount_amount = Decimal(str(discount['value']))
else:
return {'error': 'Unknown discount type'}
self.current_transaction.discount_total += discount_amount
self._recalculate_totals()
return {
'success': True,
'discount_applied': float(discount_amount),
'new_total': float(self.current_transaction.grand_total)
}
def process_payment(self,
payment_method: PaymentMethod,
amount: Decimal,
payment_details: dict = None) -> dict:
"""Process payment for transaction"""
if not self.current_transaction:
return {'error': 'No active transaction'}
if amount < self.current_transaction.grand_total:
return {'error': 'Insufficient payment amount'}
# Process payment through payment gateway
payment_result = self._process_payment_gateway(
payment_method,
amount,
payment_details
)
if not payment_result['success']:
return payment_result
# Complete transaction
self.current_transaction.payment_method = payment_method
self.current_transaction.status = TransactionStatus.COMPLETED
# Update inventory
self._update_inventory()
# Calculate change
change = amount - self.current_transaction.grand_total
# Generate receipt
receipt = self._generate_receipt()
transaction_id = self.current_transaction.transaction_id
self.current_transaction = None # Clear current transaction
return {
'success': True,
'transaction_id': transaction_id,
'amount_paid': float(amount),
'change': float(change),
'receipt': receipt
}
def void_transaction(self, reason: str) -> dict:
"""Void current transaction"""
if not self.current_transaction:
return {'error': 'No active transaction'}
self.current_transaction.status = TransactionStatus.VOIDED
transaction_id = self.current_transaction.transaction_id
self.current_transaction = None
return {
'success': True,
'transaction_id': transaction_id,
'reason': reason
}
def _recalculate_totals(self):
"""Recalculate transaction totals"""
self.current_transaction.subtotal = sum(
item.unit_price * item.quantity for item in self.current_transaction.items
)
self.current_transaction.tax_total = sum(
item.tax_amount for item in self.current_transaction.items
)
self.current_transaction.grand_total = (
self.current_transaction.subtotal +
self.current_transaction.tax_total -
self.current_transaction.discount_total
)
def _lookup_product(self, barcode: str) -> Optional[Product]:
"""Lookup product by barcode"""
return self.products.get(barcode)
def _validate_discount(self, discount_code: str) -> Optional[dict]:
"""Validate and retrieve discount details"""
# Implementation would check against promotion database
return None
def _process_payment_gateway(self,
payment_method: PaymentMethod,
amount: Decimal,
details: dict) -> dict:
"""Process payment through gateway"""
# Integration with payment processor (Stripe, Square, etc.)
return {'success': True, 'transaction_id': 'pay_123456'}
def _update_inventory(self):
"""Update inventory after sale"""
for item in self.current_transaction.items:
product = self.products.get(item.sku)
if product:
product.stock_quantity -= item.quantity
def _generate_receipt(self) -> dict:
"""Generate transaction receipt"""
return {
'transaction_id': self.current_transaction.transaction_id,
'timestamp': self.current_transaction.timestamp.isoformat(),
'items': [
{
'name': item.product_name,
'qty': item.quantity,
'price': float(item.unit_price),
'total': float(item.line_total)
}
for item in self.current_transaction.items
],
'subtotal': float(self.current_transaction.subtotal),
'tax': float(self.current_transaction.tax_total),
'discount': float(self.current_transaction.discount_total),
'total': float(self.current_transaction.grand_total)
}
def _generate_transaction_id(self) -> str:
"""Generate unique transaction ID"""
import uuid
return f"TXN-{uuid.uuid4().hex[:12].upper()}"
import numpy as np
from datetime import datetime, timedelta
class InventoryManagementSystem:
"""Inventory management and optimization"""
def __init__(self):
self.products = {}
self.warehouses = {}
self.transfer_orders = []
def calculate_reorder_point(self,
average_daily_demand: float,
lead_time_days: int,
service_level: float = 0.95) -> dict:
"""Calculate optimal reorder point"""
# Safety stock calculation
demand_std_dev = average_daily_demand * 0.2 # Assume 20% variation
# Z-score for service level
from scipy import stats
z_score = stats.norm.ppf(service_level)
safety_stock = z_score * demand_std_dev * np.sqrt(lead_time_days)
reorder_point = (average_daily_demand * lead_time_days) + safety_stock
return {
'reorder_point': int(np.ceil(reorder_point)),
'safety_stock': int(np.ceil(safety_stock)),
'average_daily_demand': average_daily_demand,
'lead_time_days': lead_time_days,
'service_level': service_level
}
def calculate_economic_order_quantity(self,
annual_demand: float,
ordering_cost: Decimal,
holding_cost_per_unit: Decimal) -> dict:
"""Calculate Economic Order Quantity (EOQ)"""
eoq = np.sqrt(
(2 * annual_demand * float(ordering_cost)) /
float(holding_cost_per_unit)
)
# Calculate total annual cost
number_of_orders = annual_demand / eoq
ordering_cost_total = number_of_orders * float(ordering_cost)
holding_cost_total = (eoq / 2) * float(holding_cost_per_unit)
total_cost = ordering_cost_total + holding_cost_total
return {
'eoq': int(np.ceil(eoq)),
'orders_per_year': number_of_orders,
'order_frequency_days': int(365 / number_of_orders),
'total_annual_cost': total_cost,
'ordering_cost': ordering_cost_total,
'holding_cost': holding_cost_total
}
def analyze_abc(self, products: List[dict]) -> dict:
"""ABC analysis for inventory classification"""
# Calculate annual value for each product
for product in products:
product['annual_value'] = (
product['unit_cost'] * product['annual_demand']
)
# Sort by annual value
sorted_products = sorted(
products,
key=lambda x: x['annual_value'],
reverse=True
)
total_value = sum(p['annual_value'] for p in sorted_products)
cumulative_value = 0
results = {'A': [], 'B': [], 'C': []}
for product in sorted_products:
cumulative_value += product['annual_value']
percentage = (cumulative_value / total_value) * 100
if percentage <= 80:
category = 'A' # Top 20% items, 80% value
elif percentage <= 95:
category = 'B' # Next 30% items, 15% value
else:
category = 'C' # Bottom 50% items, 5% value
product['abc_category'] = category
results[category].append(product)
return {
'classification': results,
'summary': {
'A_items': len(results['A']),
'B_items': len(results['B']),
'C_items': len(results['C']),
'total_value': total_value
}
}
def forecast_demand(self,
historical_sales: List[float],
periods_ahead: int = 12) -> dict:
"""Forecast future demand using exponential smoothing"""
# Triple exponential smoothing (Holt-Winters)
alpha = 0.3 # Level smoothing
beta = 0.1 # Trend smoothing
gamma = 0.2 # Seasonality smoothing
season_length = 12 # Monthly seasonality
n = len(historical_sales)
forecast = []
# Initialize level and trend
level = np.mean(historical_sales[:season_length])
trend = (np.mean(historical_sales[season_length:2*season_length]) -
np.mean(historical_sales[:season_length])) / season_length
# Initialize seasonal indices
seasonal = np.array(historical_sales[:season_length]) / level
# Generate forecasts
for i in range(periods_ahead):
season_idx = i % season_length
forecast_value = (level + trend * (i + 1)) * seasonal[season_idx]
forecast.append(max(0, forecast_value))
return {
'forecast': forecast,
'periods_ahead': periods_ahead,
'method': 'holt_winters',
'confidence_interval_95': self._calculate_confidence_interval(
historical_sales,
forecast
)
}
def check_stock_levels(self) -> List[dict]:
"""Check stock levels and generate alerts"""
alerts = []
for sku, product in self.products.items():
# Check for low stock
if product.stock_quantity <= product.reorder_point:
alerts.append({
'type': 'reorder',
'severity': 'high',
'sku': sku,
'product_name': product.name,
'current_stock': product.stock_quantity,
'reorder_point': product.reorder_point,
'action': 'Place purchase order'
})
# Check for overstock
max_stock = product.reorder_point * 3
if product.stock_quantity > max_stock:
alerts.append({
'type': 'overstock',
'severity': 'medium',
'sku': sku,
'product_name': product.name,
'current_stock': product.stock_quantity,
'max_stock': max_stock,
'action': 'Review purchasing strategy'
})
# Check for no sales (dead stock)
# Implementation would check sales history
return alerts
def _calculate_confidence_interval(self,
historical: List[float],
forecast: List[float]) -> dict:
"""Calculate 95% confidence interval for forecast"""
# Simplified confidence interval
std_error = np.std(historical) * 1.5
return {
'lower': [max(0, f - 1.96 * std_error) for f in forecast],
'upper': [f + 1.96 * std_error for f in forecast]
}
from sklearn.cluster import KMeans
import pandas as pd
class CustomerAnalytics:
"""Customer segmentation and analytics"""
def __init__(self):
self.customers = {}
self.transactions = []
def calculate_rfm(self, customer_transactions: pd.DataFrame) -> pd.DataFrame:
"""Calculate RFM (Recency, Frequency, Monetary) scores"""
current_date = datetime.now()
rfm = customer_transactions.groupby('customer_id').agg({
'transaction_date': lambda x: (current_date - x.max()).days, # Recency
'transaction_id': 'count', # Frequency
'amount': 'sum' # Monetary
})
rfm.columns = ['recency', 'frequency', 'monetary']
# Calculate RFM scores (1-5 scale)
rfm['r_score'] = pd.qcut(rfm['recency'], 5, labels=[5, 4, 3, 2, 1])
rfm['f_score'] = pd.qcut(rfm['frequency'].rank(method='first'), 5, labels=[1, 2, 3, 4, 5])
rfm['m_score'] = pd.qcut(rfm['monetary'], 5, labels=[1, 2, 3, 4, 5])
# Combined RFM score
rfm['rfm_score'] = (
rfm['r_score'].astype(int) +
rfm['f_score'].astype(int) +
rfm['m_score'].astype(int)
)
return rfm
def segment_customers(self, rfm_data: pd.DataFrame) -> dict:
"""Segment customers based on RFM scores"""
segments = {}
for customer_id, row in rfm_data.iterrows():
r, f, m = int(row['r_score']), int(row['f_score']), int(row['m_score'])
if r >= 4 and f >= 4 and m >= 4:
segment = 'Champions'
elif r >= 3 and f >= 3 and m >= 3:
segment = 'Loyal Customers'
elif r >= 4 and f <= 2:
segment = 'New Customers'
elif r <= 2 and f >= 3:
segment = 'At Risk'
elif r <= 2 and f <= 2:
segment = 'Lost Customers'
elif m >= 4:
segment = 'Big Spenders'
else:
segment = 'Regular Customers'
segments[customer_id] = {
'segment': segment,
'rfm_scores': {'r': r, 'f': f, 'm': m}
}
return segments
def calculate_customer_lifetime_value(self,
average_purchase_value: Decimal,
purchase_frequency: float,
customer_lifespan_years: float) -> Decimal:
"""Calculate Customer Lifetime Value (CLV)"""
clv = (
float(average_purchase_value) *
purchase_frequency *
customer_lifespan_years
)
return Decimal(str(clv)).quantize(Decimal('0.01'))
def predict_churn(self, customer_features: dict) -> dict:
"""Predict customer churn probability"""
# Features: recency, frequency, monetary, days_since_last_purchase, etc.
# This would use a trained ML model
churn_score = 0.35 # Placeholder
if churn_score > 0.7:
risk = 'high'
action = 'Send personalized offer immediately'
elif churn_score > 0.4:
risk = 'medium'
action = 'Include in next marketing campaign'
else:
risk = 'low'
action = 'Continue regular engagement'
return {
'churn_probability': churn_score,
'risk_level': risk,
'recommended_action': action
}
def recommend_products(self,
customer_id: str,
top_n: int = 5) -> List[dict]:
"""Generate product recommendations"""
# Collaborative filtering or content-based recommendations
# This would use recommendation algorithms
recommendations = [
{
'sku': 'PROD001',
'name': 'Recommended Product 1',
'score': 0.95,
'reason': 'Frequently bought together'
}
]
return recommendations[:top_n]
❌ No inventory tracking or inaccurate counts ❌ Single payment method only ❌ Poor checkout experience (slow/complex) ❌ No customer data collection ❌ Siloed online and offline systems ❌ Manual price updates across locations ❌ No backup for POS systems ❌ Ignoring cart abandonment ❌ No product recommendations
Weekly Installs
82
Repository
GitHub Stars
11
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
opencode71
codex71
gemini-cli69
github-copilot63
cursor62
kimi-cli60
Python Excel自动化:openpyxl库操作XLSX文件教程,创建编辑格式化电子表格
839 周安装