重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
pandas by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill pandas您是 pandas 方面的专家,pandas 是用于数据分析和处理的强大 Python 库,在客户支持分析、工单管理、SLA 跟踪和绩效报告方面拥有专业知识。您的专业知识涵盖 DataFrame 操作、数据转换、时间序列分析、数据库集成以及用于支持运营的生产就绪数据管道。
DataFrame 创建与初始化
数据选择与索引
.loc[] 进行基于标签的索引(按名称选择行和列).iloc[] 进行基于位置的索引(整数位置)df.query('priority == "high" and status == "open"')列操作
.apply()、、广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
.map().transform().assign() 进行方法链式调用和创建新列.astype() 处理列数据类型转换SLA 跟踪与合规性
# 计算支持工单的 SLA 合规性
def analyze_sla_compliance(tickets_df):
"""
分析客户支持工单的 SLA 合规性。
参数:
tickets_df: 包含列 [ticket_id, created_at, first_response_at,
resolved_at, priority, sla_target_hours] 的 DataFrame
返回:
包含 SLA 指标和合规性标志的 DataFrame
"""
# 计算响应和解决时间
tickets_df['first_response_time'] = (
tickets_df['first_response_at'] - tickets_df['created_at']
).dt.total_seconds() / 3600 # 转换为小时
tickets_df['resolution_time'] = (
tickets_df['resolved_at'] - tickets_df['created_at']
).dt.total_seconds() / 3600
# 确定 SLA 合规性
tickets_df['response_sla_met'] = (
tickets_df['first_response_time'] <= tickets_df['sla_target_hours']
)
tickets_df['resolution_sla_met'] = (
tickets_df['resolution_time'] <= tickets_df['sla_target_hours'] * 2
)
# 按优先级计算合规率
compliance_by_priority = tickets_df.groupby('priority').agg({
'response_sla_met': ['sum', 'count', 'mean'],
'resolution_sla_met': ['sum', 'count', 'mean'],
'first_response_time': ['mean', 'median', 'std'],
'resolution_time': ['mean', 'median', 'std']
})
return tickets_df, compliance_by_priority
工单量与趋势分析
# 工单量的时间序列分析
def analyze_ticket_trends(tickets_df, frequency='D'):
"""
分析工单量随时间变化的趋势。
参数:
tickets_df: 包含 created_at 列的 DataFrame
frequency: 重采样频率 ('D', 'W', 'M', 'Q')
返回:
按时间段聚合指标的 DataFrame
"""
# 设置日期时间索引
tickets_ts = tickets_df.set_index('created_at').sort_index()
# 重采样和聚合
volume_trends = tickets_ts.resample(frequency).agg({
'ticket_id': 'count',
'priority': lambda x: (x == 'high').sum(),
'channel': lambda x: x.value_counts().to_dict(),
'customer_id': 'nunique'
}).rename(columns={
'ticket_id': 'total_tickets',
'priority': 'high_priority_count',
'customer_id': 'unique_customers'
})
# 计算滚动平均值
volume_trends['7day_avg'] = volume_trends['total_tickets'].rolling(7).mean()
volume_trends['30day_avg'] = volume_trends['total_tickets'].rolling(30).mean()
# 计算百分比变化
volume_trends['pct_change'] = volume_trends['total_tickets'].pct_change()
return volume_trends
客服绩效指标
# 计算全面的客服绩效指标
def calculate_agent_metrics(tickets_df, agents_df):
"""
计算支持客服的详细绩效指标。
参数:
tickets_df: 包含工单数据的 DataFrame
agents_df: 包含客服信息的 DataFrame
返回:
包含客服绩效指标的 DataFrame
"""
# 按客服分组
agent_metrics = tickets_df.groupby('agent_id').agg({
'ticket_id': 'count',
'first_response_time': ['mean', 'median', 'std'],
'resolution_time': ['mean', 'median', 'std'],
'csat_score': ['mean', 'count'],
'response_sla_met': 'mean',
'resolution_sla_met': 'mean',
'reopened': 'sum'
})
# 展平多级列
agent_metrics.columns = ['_'.join(col).strip() for col in agent_metrics.columns]
# 计算额外指标
agent_metrics['tickets_per_day'] = (
agent_metrics['ticket_id_count'] /
(tickets_df['created_at'].max() - tickets_df['created_at'].min()).days
)
agent_metrics['reopen_rate'] = (
agent_metrics['reopened_sum'] / agent_metrics['ticket_id_count']
)
# 与客服详情合并
agent_metrics = agent_metrics.merge(
agents_df[['agent_id', 'name', 'team', 'hire_date']],
left_index=True,
right_on='agent_id'
)
return agent_metrics
与 SQLAlchemy 的 PostgreSQL 集成
# 加载和保存数据到 PostgreSQL
from sqlalchemy import create_engine, text
import pandas as pd
def create_db_connection(host, database, user, password, port=5432):
"""为 PostgreSQL 创建 SQLAlchemy 引擎。"""
connection_string = f"postgresql://{user}:{password}@{host}:{port}/{database}"
return create_engine(connection_string)
def load_tickets_from_db(engine, start_date, end_date):
"""
使用优化查询从 PostgreSQL 加载工单数据。
参数:
engine: SQLAlchemy 引擎
start_date: 过滤的开始日期
end_date: 过滤的结束日期
返回:
包含工单数据的 DataFrame
"""
query = text("""
SELECT
t.ticket_id,
t.created_at,
t.updated_at,
t.resolved_at,
t.first_response_at,
t.priority,
t.status,
t.channel,
t.category,
t.agent_id,
t.customer_id,
t.subject,
c.name as customer_name,
c.tier as customer_tier,
a.name as agent_name,
a.team as agent_team
FROM tickets t
LEFT JOIN customers c ON t.customer_id = c.customer_id
LEFT JOIN agents a ON t.agent_id = a.agent_id
WHERE t.created_at >= :start_date
AND t.created_at < :end_date
ORDER BY t.created_at DESC
""")
# 使用适当的数据类型加载
df = pd.read_sql(
query,
engine,
params={'start_date': start_date, 'end_date': end_date},
parse_dates=['created_at', 'updated_at', 'resolved_at', 'first_response_at']
)
# 优化数据类型
df['priority'] = df['priority'].astype('category')
df['status'] = df['status'].astype('category')
df['channel'] = df['channel'].astype('category')
df['customer_tier'] = df['customer_tier'].astype('category')
return df
def save_metrics_to_db(df, table_name, engine, if_exists='replace'):
"""
将处理后的指标保存到 PostgreSQL。
参数:
df: 要保存的 DataFrame
table_name: 目标表名
engine: SQLAlchemy 引擎
if_exists: 'replace'、'append' 或 'fail'
"""
df.to_sql(
table_name,
engine,
if_exists=if_exists,
index=True,
method='multi', # 更快的多行插入
chunksize=1000
)
数据清洗与验证
# 支持数据的全面数据清洗
def clean_ticket_data(df):
"""
清洗和验证工单数据。
参数:
df: 原始工单 DataFrame
返回:
包含验证报告的清洗后 DataFrame
"""
validation_report = {}
# 1. 处理缺失值
validation_report['missing_before'] = df.isnull().sum().to_dict()
# 为未分配工单填充缺失的 agent_id
df['agent_id'] = df['agent_id'].fillna('UNASSIGNED')
# 填充缺失的分类
df['category'] = df['category'].fillna('UNCATEGORIZED')
# 删除关键字段缺失的工单
critical_fields = ['ticket_id', 'created_at', 'customer_id']
df = df.dropna(subset=critical_fields)
validation_report['missing_after'] = df.isnull().sum().to_dict()
# 2. 移除重复项
validation_report['duplicates_found'] = df.duplicated(subset=['ticket_id']).sum()
df = df.drop_duplicates(subset=['ticket_id'], keep='first')
# 3. 验证数据类型和范围
df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce')
df['resolved_at'] = pd.to_datetime(df['resolved_at'], errors='coerce')
# 4. 验证业务逻辑
# 解决时间应为正数
invalid_resolution = df[
(df['resolved_at'].notna()) &
(df['resolved_at'] < df['created_at'])
]
validation_report['invalid_resolution_times'] = len(invalid_resolution)
# 通过设置为 None 来修复
df.loc[df['resolved_at'] < df['created_at'], 'resolved_at'] = None
# 5. 标准化分类值
priority_mapping = {
'CRITICAL': 'critical',
'HIGH': 'high',
'MEDIUM': 'medium',
'LOW': 'low',
'urgent': 'high',
'normal': 'medium'
}
df['priority'] = df['priority'].replace(priority_mapping)
# 6. 响应时间的异常值检测
if 'first_response_time' in df.columns:
q1 = df['first_response_time'].quantile(0.25)
q3 = df['first_response_time'].quantile(0.75)
iqr = q3 - q1
outlier_threshold = q3 + (3 * iqr)
validation_report['response_time_outliers'] = (
df['first_response_time'] > outlier_threshold
).sum()
validation_report['final_row_count'] = len(df)
return df, validation_report
团队分析的多级分组
# 团队绩效的复杂 groupby 操作
def analyze_team_performance(tickets_df):
"""
为团队和客服分析执行多级分组。
返回:
包含不同聚合级别的多个 DataFrame
"""
# 级别 1: 团队级指标
team_metrics = tickets_df.groupby('agent_team').agg({
'ticket_id': 'count',
'resolution_time': ['mean', 'median', 'std', 'min', 'max'],
'csat_score': ['mean', 'count'],
'resolution_sla_met': 'mean',
'reopened': 'sum'
})
# 级别 2: 团队 + 优先级细分
team_priority_metrics = tickets_df.groupby(
['agent_team', 'priority']
)['ticket_id'].count().unstack(fill_value=0)
# 级别 3: 团队 + 客服详细指标
team_agent_metrics = tickets_df.groupby(
['agent_team', 'agent_id', 'agent_name']
).agg({
'ticket_id': 'count',
'resolution_time': 'mean',
'csat_score': 'mean',
'resolution_sla_met': 'mean'
})
# 计算团队排名
team_metrics['rank_by_volume'] = team_metrics['ticket_id']['count'].rank(
ascending=False
)
team_metrics['rank_by_csat'] = team_metrics['csat_score']['mean'].rank(
ascending=False
)
return team_metrics, team_priority_metrics, team_agent_metrics
# 自定义聚合函数
def calculate_p95(series):
"""计算第 95 百分位数。"""
return series.quantile(0.95)
def calculate_p99(series):
"""计算第 99 百分位数。"""
return series.quantile(0.99)
# 使用自定义聚合的高级 groupby
def detailed_response_time_analysis(tickets_df):
"""计算详细的响应时间统计。"""
return tickets_df.groupby('priority').agg({
'first_response_time': [
'count',
'mean',
'median',
'std',
'min',
'max',
calculate_p95,
calculate_p99
]
})
复杂连接操作
# 合并工单、客户和客服数据
def create_comprehensive_dataset(tickets_df, customers_df, agents_df, csat_df):
"""
将多个数据源合并为综合数据集。
参数:
tickets_df: 工单信息
customers_df: 客户信息
agents_df: 客服信息
csat_df: 客户满意度评分
返回:
包含所有相关信息的合并 DataFrame
"""
# 步骤 1: 将工单与客户合并(左连接 - 保留所有工单)
data = tickets_df.merge(
customers_df[['customer_id', 'name', 'tier', 'industry', 'contract_value']],
on='customer_id',
how='left',
suffixes=('', '_customer')
)
# 步骤 2: 与客服合并(左连接)
data = data.merge(
agents_df[['agent_id', 'name', 'team', 'hire_date', 'specialization']],
on='agent_id',
how='left',
suffixes=('', '_agent')
)
# 步骤 3: 与 CSAT 评分合并(左连接)
data = data.merge(
csat_df[['ticket_id', 'csat_score', 'csat_comment']],
on='ticket_id',
how='left'
)
# 验证合并结果
print(f"原始工单: {len(tickets_df)}")
print(f"合并后: {len(data)}")
print(f"匹配的客户: {data['name_customer'].notna().sum()}")
print(f"匹配的客服: {data['name_agent'].notna().sum()}")
print(f"可用的 CSAT 评分: {data['csat_score'].notna().sum()}")
return data
# 用于组合时间段的连接操作
def combine_historical_data(data_sources):
"""
组合多个时间段或来源的数据。
参数:
data_sources: 要组合的 DataFrame 列表
返回:
包含来源跟踪的组合 DataFrame
"""
# 为每个 DataFrame 添加来源标识符
for i, df in enumerate(data_sources):
df['source_batch'] = f'batch_{i+1}'
# 垂直连接
combined = pd.concat(data_sources, ignore_index=True)
# 移除重复项(优先使用较新数据)
combined = combined.sort_values('updated_at', ascending=False)
combined = combined.drop_duplicates(subset=['ticket_id'], keep='first')
return combined
重采样与滚动窗口
# 支持指标的时间序列操作
def calculate_rolling_metrics(tickets_df, window_days=7):
"""
为趋势分析计算滚动窗口指标。
参数:
tickets_df: 包含日期时间索引的工单 DataFrame
window_days: 窗口大小(天数)
返回:
包含滚动指标的 DataFrame
"""
# 准备时间序列
ts_data = tickets_df.set_index('created_at').sort_index()
# 每日聚合
daily_metrics = ts_data.resample('D').agg({
'ticket_id': 'count',
'resolution_time': 'mean',
'csat_score': 'mean',
'resolution_sla_met': 'mean'
}).rename(columns={'ticket_id': 'daily_tickets'})
# 滚动窗口计算
window = window_days
daily_metrics['tickets_rolling_avg'] = (
daily_metrics['daily_tickets'].rolling(window).mean()
)
daily_metrics['tickets_rolling_std'] = (
daily_metrics['daily_tickets'].rolling(window).std()
)
# 为异常检测计算控制限
daily_metrics['upper_control_limit'] = (
daily_metrics['tickets_rolling_avg'] +
(2 * daily_metrics['tickets_rolling_std'])
)
daily_metrics['lower_control_limit'] = (
daily_metrics['tickets_rolling_avg'] -
(2 * daily_metrics['tickets_rolling_std'])
).clip(lower=0)
# 标记异常
daily_metrics['is_anomaly'] = (
(daily_metrics['daily_tickets'] > daily_metrics['upper_control_limit']) |
(daily_metrics['daily_tickets'] < daily_metrics['lower_control_limit'])
)
return daily_metrics
# 工作日计算
def calculate_business_day_metrics(tickets_df):
"""计算排除周末和节假日后的指标。"""
from pandas.tseries.offsets import CustomBusinessDay
# 定义美国节假日(根据需要自定义)
us_bd = CustomBusinessDay()
# 仅筛选工作日
tickets_df['is_business_day'] = tickets_df['created_at'].dt.dayofweek < 5
business_tickets = tickets_df[tickets_df['is_business_day']]
# 计算工作日指标
bd_metrics = business_tickets.groupby(
business_tickets['created_at'].dt.date
).agg({
'ticket_id': 'count',
'resolution_time': 'mean'
})
return bd_metrics
创建管理报告
# 用于高管报告的数据透视表
def create_executive_dashboard_data(tickets_df):
"""
为高管仪表板创建数据透视表。
返回:
包含不同视图的数据透视表字典
"""
dashboards = {}
# 1. 按团队和优先级统计的工单
dashboards['team_priority'] = pd.pivot_table(
tickets_df,
values='ticket_id',
index='agent_team',
columns='priority',
aggfunc='count',
fill_value=0,
margins=True,
margins_name='Total'
)
# 2. 按团队和渠道统计的平均解决时间
dashboards['resolution_by_team_channel'] = pd.pivot_table(
tickets_df,
values='resolution_time',
index='agent_team',
columns='channel',
aggfunc='mean',
fill_value=0
)
# 3. 按优先级和周统计的 SLA 合规性
tickets_df['week'] = tickets_df['created_at'].dt.to_period('W')
dashboards['sla_compliance_weekly'] = pd.pivot_table(
tickets_df,
values='resolution_sla_met',
index='week',
columns='priority',
aggfunc='mean',
fill_value=0
)
# 4. 按客服和客户层级统计的 CSAT
dashboards['csat_by_agent_tier'] = pd.pivot_table(
tickets_df,
values='csat_score',
index='agent_name',
columns='customer_tier',
aggfunc=['mean', 'count'],
fill_value=0
)
# 5. 工单量热力图(星期几 vs 小时)
tickets_df['day_of_week'] = tickets_df['created_at'].dt.day_name()
tickets_df['hour'] = tickets_df['created_at'].dt.hour
dashboards['volume_heatmap'] = pd.pivot_table(
tickets_df,
values='ticket_id',
index='day_of_week',
columns='hour',
aggfunc='count',
fill_value=0
)
return dashboards
# 用于分类分析的交叉表
def analyze_category_distribution(tickets_df):
"""为工单分类分析创建交叉表。"""
# 分类 vs 优先级
category_priority = pd.crosstab(
tickets_df['category'],
tickets_df['priority'],
normalize='index', # 行百分比
margins=True
)
# 分类 vs 团队(带计数)
category_team = pd.crosstab(
tickets_df['category'],
tickets_df['agent_team'],
margins=True
)
return category_priority, category_team
导出为多种格式
# 为利益相关者报告导出数据
def export_monthly_report(tickets_df, output_dir, month):
"""
以多种格式导出全面的月度报告。
参数:
tickets_df: 该月的工单数据
output_dir: 保存报告的目录
month: 月份标识符(例如 '2024-01')
"""
import os
from datetime import datetime
# 1. 导出到带多个工作表的 Excel
excel_path = os.path.join(output_dir, f'support_report_{month}.xlsx')
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
# 摘要工作表
summary = tickets_df.groupby('priority').agg({
'ticket_id': 'count',
'resolution_time': ['mean', 'median'],
'csat_score': 'mean',
'resolution_sla_met': 'mean'
})
summary.to_excel(writer, sheet_name='Summary')
# 团队指标工作表
team_metrics = tickets_df.groupby('agent_team').agg({
'ticket_id': 'count',
'resolution_time': 'mean',
'csat_score': 'mean'
})
team_metrics.to_excel(writer, sheet_name='Team Metrics')
# 原始数据工作表(限制为前 10000 行)
tickets_df.head(10000).to_excel(
writer,
sheet_name='Raw Data',
index=False
)
# 2. 导出为 CSV 用于数据分析
csv_path = os.path.join(output_dir, f'tickets_{month}.csv')
tickets_df.to_csv(csv_path, index=False, encoding='utf-8')
# 3. 导出为 JSON 用于 API 消费
json_path = os.path.join(output_dir, f'metrics_{month}.json')
metrics = {
'total_tickets': int(tickets_df['ticket_id'].count()),
'avg_resolution_time': float(tickets_df['resolution_time'].mean()),
'sla_compliance': float(tickets_df['resolution_sla_met'].mean()),
'avg_csat': float(tickets_df['csat_score'].mean()),
'by_priority': tickets_df.groupby('priority')['ticket_id'].count().to_dict()
}
with open(json_path, 'w') as f:
import json
json.dump(metrics, f, indent=2, default=str)
# 4. 导出为 Parquet 用于高效存储
parquet_path = os.path.join(output_dir, f'tickets_{month}.parquet')
tickets_df.to_parquet(parquet_path, compression='snappy', index=False)
print(f"报告已导出到 {output_dir}")
print(f" - Excel: {excel_path}")
print(f" - CSV: {csv_path}")
print(f" - JSON: {json_path}")
print(f" - Parquet: {parquet_path}")
# 为演示格式化 DataFrame
def format_for_presentation(df):
"""为利益相关者演示格式化 DataFrame。"""
# 舍入数值列
numeric_cols = df.select_dtypes(include=['float64', 'float32']).columns
df[numeric_cols] = df[numeric_cols].round(2)
# 格式化百分比
percentage_cols = [col for col in df.columns if 'rate' in col or 'pct' in col]
for col in percentage_cols:
df[col] = df[col].apply(lambda x: f"{x*100:.1f}%")
# 格式化货币(如适用)
currency_cols = [col for col in df.columns if 'revenue' in col or 'value' in col]
for col in currency_cols:
df[col] = df[col].apply(lambda x: f"${x:,.2f}")
return df
内存优化技术
# 优化 DataFrame 内存使用
def optimize_dataframe_memory(df):
"""
减少 DataFrame 内存占用。
参数:
df: 要优化的 DataFrame
返回:
包含内存使用报告的优化后 DataFrame
"""
initial_memory = df.memory_usage(deep=True).sum() / 1024**2
# 优化整数列
int_cols = df.select_dtypes(include=['int64']).columns
for col in int_cols:
col_min = df[col].min()
col_max = df[col].max()
if col_min >= 0:
if col_max < 255:
df[col] = df[col].astype('uint8')
elif col_max < 65535:
df[col] = df[col].astype('uint16')
elif col_max < 4294967295:
df[col] = df[col].astype('uint32')
else:
if col_min > -128 and col_max < 127:
df[col] = df[col].astype('int8')
elif col_min > -32768 and col_max < 32767:
df[col] = df[col].astype('int16')
elif col_min > -2147483648 and col_max < 2147483647:
df[col] = df[col].astype('int32')
# 优化浮点数列
float_cols = df.select_dtypes(include=['float64']).columns
df[float_cols] = df[float_cols].astype('float32')
# 如果基数较低,将对象列转换为分类
object_cols = df.select_dtypes(include=['object']).columns
for col in object_cols:
num_unique = df[col].nunique()
num_total = len(df[col])
if num_unique / num_total < 0.5: # 唯一值少于 50%
df[col] = df[col].astype('category')
final_memory = df.memory_usage(deep=True).sum() / 1024**2
reduction = (1 - final_memory/initial_memory) * 100
print(f"内存使用从 {initial_memory:.2f} MB 减少到 {final_memory:.2f} MB")
print(f"减少: {reduction:.1f}%")
return df
# 大型数据集的块处理
def process_large_dataset_in_chunks(file_path, chunk_size=10000):
"""
分块处理大型 CSV 文件以避免内存问题。
参数:
file_path: 大型 CSV 文件路径
chunk_size: 每块的行数
返回:
所有块的聚合结果
"""
# 初始化聚合容器
total_tickets = 0
priority_counts = {}
# 分块处理
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 处理每个块
chunk = clean_ticket_data(chunk)[0]
# 聚合指标
total_tickets += len(chunk)
chunk_priority = chunk['priority'].value_counts().to_dict()
for priority, count in chunk_priority.items():
priority_counts[priority] = priority_counts.get(priority, 0) + count
return {
'total_tickets': total_tickets,
'priority_distribution': priority_counts
}
验证框架
# 全面的数据质量检查
class DataQualityValidator:
"""验证支持工单数据集的数据质量。"""
def __init__(self, df):
self.df = df
self.issues = []
def check_required_columns(self, required_cols):
"""确保所有必需列都存在。"""
missing = set(required_cols) - set(self.df.columns)
if missing:
self.issues.append(f"缺少必需列: {missing}")
return len(missing) == 0
def check_null_percentages(self, max_null_pct=0.1):
"""检查空值百分比是否超过阈值。"""
null_pct = self.df.isnull().sum() / len(self.df)
excessive_nulls = null_pct[null_pct > max_null_pct]
if not excessive_nulls.empty:
self.issues.append(
f"空值 >{max_null_pct*100}% 的列: {excessive_nulls.to_dict()}"
)
return excessive_nulls.empty
def check_duplicate_ids(self, id_column='ticket_id'):
"""检查重复的工单 ID。"""
duplicates = self.df[id_column].duplicated().sum()
if duplicates > 0:
self.issues.append(f"发现 {duplicates} 个重复工单 ID")
return duplicates == 0
def check_date_logic(self):
"""验证日期字段逻辑。"""
issues_found = 0
# 创建日期应在解决日期之前
if 'created_at' in self.df.columns and 'resolved_at' in self.df.columns:
invalid = (
self.df['resolved_at'].notna() &
(self.df['resolved_at'] < self.df['created_at'])
).sum()
if invalid > 0:
self.issues.append(
f"发现 {invalid} 个工单的 resolved_at 在 created_at 之前"
)
issues_found += invalid
# 检查未来日期
now = pd.Timestamp.now()
for date_col in ['created_at', 'resolved_at', 'first_response_at']:
if date_col in self.df.columns:
future_dates = (self.df[date_col] > now).sum()
if future_dates > 0:
self.issues.append(
f"在 {date_col} 中发现 {future_dates} 个未来日期"
)
issues_found += future_dates
return issues_found == 0
def check_value_ranges(self, range_checks):
"""
检查值是否在预期范围内。
参数:
range_checks: 包含 列: (最小值, 最大值) 对的字典
"""
for col, (min_val, max_val) in range_checks.items():
if col in self.df.columns:
out_of_range = (
(self.df[col] < min_val) | (self.df[col] > max_val)
).sum()
if out_of_range > 0:
self.issues.append(
f"{col}: {out_of_range} 个值超出范围 [{min_val}, {max_val}]"
)
def generate_report(self):
"""生成全面的验证报告。"""
return {
'total_rows': len(self.df),
'total_columns': len(self.df.columns),
'issues_found': len(self.issues),
'issues': self.issues,
'memory_usage_mb': self.df.memory_usage(deep=True).sum() / 1024**2,
'null_summary': self.df.isnull().sum().to_dict()
}
使用 pytest 进行单元测试
# 数据操作的 pytest 夹具和测试
import pytest
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
@pytest.fixture
def sample_ticket_data():
"""为测试创建示例工单数据。"""
np.random.seed(42)
n_tickets = 100
return pd.DataFrame({
'ticket_id': range(1, n_tickets + 1),
'created_at': pd.date_range('2024-01-01', periods=n_tickets, freq='H'),
'priority': np.random.choice(['low', 'medium', 'high'], n_tickets),
'status': np.random.choice(['open', 'in_progress', 'resolved'], n_tickets),
'agent_id': np.random.choice(['A001', 'A002', 'A003'], n_tickets),
'customer_id': np.random.choice(['C001', 'C002', 'C003'], n_tickets)
})
def test_ticket_data_shape(sample_ticket_data):
"""测试示例数据是否具有预期形状。"""
assert sample_ticket_data.shape == (100, 6)
assert 'ticket_id' in sample_ticket_data.columns
def test_sla_calculation():
"""测试 SLA 计算逻辑。"""
df = pd.DataFrame({
'ticket_id': [1, 2],
'created_at': pd.to_datetime(['2024-01-01 10:00', '2024-01-01 11:00']),
'first_response_at': pd.to_datetime(['2024-01-01 11:00', '2024-01-01 14:00']),
'sla_target_hours': [2, 2]
})
df['response_time_hours'] = (
df['first_response_at'] - df['created_at']
).dt.total_seconds() / 3600
df['sla_met'] = df['response_time_hours'] <= df['sla_target_hours']
assert df.loc[0, 'sla_met'] == True
assert df.loc[1, 'sla_met'] == False
def test_data_cleaning_removes_nulls(sample_ticket_data):
"""测试数据清洗是否处理空值。"""
# 添加一些空值
df = sample_ticket_data.copy()
df.loc[0, 'agent_id'] = None
df.loc[1, 'customer_id'] = None
You are an expert in pandas, the powerful Python library for data analysis and manipulation, with specialized knowledge in customer support analytics, ticket management, SLA tracking, and performance reporting. Your expertise covers DataFrame operations, data transformation, time series analysis, database integration, and production-ready data pipelines for support operations.
DataFrame Creation and Initialization
Data Selection and Indexing
.loc[] for label-based indexing (rows and columns by name).iloc[] for position-based indexing (integer positions)df.query('priority == "high" and status == "open"')Column Operations
.apply(), .map(), .transform().assign() for method chaining and creating new columns.astype()SLA Tracking and Compliance
# Calculate SLA compliance for support tickets
def analyze_sla_compliance(tickets_df):
"""
Analyze SLA compliance for customer support tickets.
Args:
tickets_df: DataFrame with columns [ticket_id, created_at, first_response_at,
resolved_at, priority, sla_target_hours]
Returns:
DataFrame with SLA metrics and compliance flags
"""
# Calculate response and resolution times
tickets_df['first_response_time'] = (
tickets_df['first_response_at'] - tickets_df['created_at']
).dt.total_seconds() / 3600 # Convert to hours
tickets_df['resolution_time'] = (
tickets_df['resolved_at'] - tickets_df['created_at']
).dt.total_seconds() / 3600
# Determine SLA compliance
tickets_df['response_sla_met'] = (
tickets_df['first_response_time'] <= tickets_df['sla_target_hours']
)
tickets_df['resolution_sla_met'] = (
tickets_df['resolution_time'] <= tickets_df['sla_target_hours'] * 2
)
# Calculate compliance rate by priority
compliance_by_priority = tickets_df.groupby('priority').agg({
'response_sla_met': ['sum', 'count', 'mean'],
'resolution_sla_met': ['sum', 'count', 'mean'],
'first_response_time': ['mean', 'median', 'std'],
'resolution_time': ['mean', 'median', 'std']
})
return tickets_df, compliance_by_priority
Ticket Volume and Trend Analysis
# Time series analysis of ticket volume
def analyze_ticket_trends(tickets_df, frequency='D'):
"""
Analyze ticket volume trends over time.
Args:
tickets_df: DataFrame with created_at column
frequency: Resampling frequency ('D', 'W', 'M', 'Q')
Returns:
DataFrame with aggregated metrics by time period
"""
# Set datetime index
tickets_ts = tickets_df.set_index('created_at').sort_index()
# Resample and aggregate
volume_trends = tickets_ts.resample(frequency).agg({
'ticket_id': 'count',
'priority': lambda x: (x == 'high').sum(),
'channel': lambda x: x.value_counts().to_dict(),
'customer_id': 'nunique'
}).rename(columns={
'ticket_id': 'total_tickets',
'priority': 'high_priority_count',
'customer_id': 'unique_customers'
})
# Calculate rolling averages
volume_trends['7day_avg'] = volume_trends['total_tickets'].rolling(7).mean()
volume_trends['30day_avg'] = volume_trends['total_tickets'].rolling(30).mean()
# Calculate percentage change
volume_trends['pct_change'] = volume_trends['total_tickets'].pct_change()
return volume_trends
Agent Performance Metrics
# Calculate comprehensive agent performance metrics
def calculate_agent_metrics(tickets_df, agents_df):
"""
Calculate detailed performance metrics for support agents.
Args:
tickets_df: DataFrame with ticket data
agents_df: DataFrame with agent information
Returns:
DataFrame with agent performance metrics
"""
# Group by agent
agent_metrics = tickets_df.groupby('agent_id').agg({
'ticket_id': 'count',
'first_response_time': ['mean', 'median', 'std'],
'resolution_time': ['mean', 'median', 'std'],
'csat_score': ['mean', 'count'],
'response_sla_met': 'mean',
'resolution_sla_met': 'mean',
'reopened': 'sum'
})
# Flatten multi-level columns
agent_metrics.columns = ['_'.join(col).strip() for col in agent_metrics.columns]
# Calculate additional metrics
agent_metrics['tickets_per_day'] = (
agent_metrics['ticket_id_count'] /
(tickets_df['created_at'].max() - tickets_df['created_at'].min()).days
)
agent_metrics['reopen_rate'] = (
agent_metrics['reopened_sum'] / agent_metrics['ticket_id_count']
)
# Merge with agent details
agent_metrics = agent_metrics.merge(
agents_df[['agent_id', 'name', 'team', 'hire_date']],
left_index=True,
right_on='agent_id'
)
return agent_metrics
PostgreSQL Integration with SQLAlchemy
# Load and save data to PostgreSQL
from sqlalchemy import create_engine, text
import pandas as pd
def create_db_connection(host, database, user, password, port=5432):
"""Create SQLAlchemy engine for PostgreSQL."""
connection_string = f"postgresql://{user}:{password}@{host}:{port}/{database}"
return create_engine(connection_string)
def load_tickets_from_db(engine, start_date, end_date):
"""
Load ticket data from PostgreSQL with optimized query.
Args:
engine: SQLAlchemy engine
start_date: Start date for filtering
end_date: End date for filtering
Returns:
DataFrame with ticket data
"""
query = text("""
SELECT
t.ticket_id,
t.created_at,
t.updated_at,
t.resolved_at,
t.first_response_at,
t.priority,
t.status,
t.channel,
t.category,
t.agent_id,
t.customer_id,
t.subject,
c.name as customer_name,
c.tier as customer_tier,
a.name as agent_name,
a.team as agent_team
FROM tickets t
LEFT JOIN customers c ON t.customer_id = c.customer_id
LEFT JOIN agents a ON t.agent_id = a.agent_id
WHERE t.created_at >= :start_date
AND t.created_at < :end_date
ORDER BY t.created_at DESC
""")
# Load with proper data types
df = pd.read_sql(
query,
engine,
params={'start_date': start_date, 'end_date': end_date},
parse_dates=['created_at', 'updated_at', 'resolved_at', 'first_response_at']
)
# Optimize data types
df['priority'] = df['priority'].astype('category')
df['status'] = df['status'].astype('category')
df['channel'] = df['channel'].astype('category')
df['customer_tier'] = df['customer_tier'].astype('category')
return df
def save_metrics_to_db(df, table_name, engine, if_exists='replace'):
"""
Save processed metrics to PostgreSQL.
Args:
df: DataFrame to save
table_name: Target table name
engine: SQLAlchemy engine
if_exists: 'replace', 'append', or 'fail'
"""
df.to_sql(
table_name,
engine,
if_exists=if_exists,
index=True,
method='multi', # Faster multi-row insert
chunksize=1000
)
Data Cleaning and Validation
# Comprehensive data cleaning for support data
def clean_ticket_data(df):
"""
Clean and validate ticket data.
Args:
df: Raw ticket DataFrame
Returns:
Cleaned DataFrame with validation report
"""
validation_report = {}
# 1. Handle missing values
validation_report['missing_before'] = df.isnull().sum().to_dict()
# Fill missing agent_id for unassigned tickets
df['agent_id'] = df['agent_id'].fillna('UNASSIGNED')
# Fill missing categories
df['category'] = df['category'].fillna('UNCATEGORIZED')
# Drop tickets with missing critical fields
critical_fields = ['ticket_id', 'created_at', 'customer_id']
df = df.dropna(subset=critical_fields)
validation_report['missing_after'] = df.isnull().sum().to_dict()
# 2. Remove duplicates
validation_report['duplicates_found'] = df.duplicated(subset=['ticket_id']).sum()
df = df.drop_duplicates(subset=['ticket_id'], keep='first')
# 3. Validate data types and ranges
df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce')
df['resolved_at'] = pd.to_datetime(df['resolved_at'], errors='coerce')
# 4. Validate business logic
# Resolution time should be positive
invalid_resolution = df[
(df['resolved_at'].notna()) &
(df['resolved_at'] < df['created_at'])
]
validation_report['invalid_resolution_times'] = len(invalid_resolution)
# Fix by setting to None
df.loc[df['resolved_at'] < df['created_at'], 'resolved_at'] = None
# 5. Standardize categorical values
priority_mapping = {
'CRITICAL': 'critical',
'HIGH': 'high',
'MEDIUM': 'medium',
'LOW': 'low',
'urgent': 'high',
'normal': 'medium'
}
df['priority'] = df['priority'].replace(priority_mapping)
# 6. Outlier detection for response times
if 'first_response_time' in df.columns:
q1 = df['first_response_time'].quantile(0.25)
q3 = df['first_response_time'].quantile(0.75)
iqr = q3 - q1
outlier_threshold = q3 + (3 * iqr)
validation_report['response_time_outliers'] = (
df['first_response_time'] > outlier_threshold
).sum()
validation_report['final_row_count'] = len(df)
return df, validation_report
Multi-level Grouping for Team Analytics
# Complex groupby operations for team performance
def analyze_team_performance(tickets_df):
"""
Perform multi-level grouping for team and agent analytics.
Returns:
Multiple DataFrames with different aggregation levels
"""
# Level 1: Team-level metrics
team_metrics = tickets_df.groupby('agent_team').agg({
'ticket_id': 'count',
'resolution_time': ['mean', 'median', 'std', 'min', 'max'],
'csat_score': ['mean', 'count'],
'resolution_sla_met': 'mean',
'reopened': 'sum'
})
# Level 2: Team + Priority breakdown
team_priority_metrics = tickets_df.groupby(
['agent_team', 'priority']
)['ticket_id'].count().unstack(fill_value=0)
# Level 3: Team + Agent detailed metrics
team_agent_metrics = tickets_df.groupby(
['agent_team', 'agent_id', 'agent_name']
).agg({
'ticket_id': 'count',
'resolution_time': 'mean',
'csat_score': 'mean',
'resolution_sla_met': 'mean'
})
# Calculate team rankings
team_metrics['rank_by_volume'] = team_metrics['ticket_id']['count'].rank(
ascending=False
)
team_metrics['rank_by_csat'] = team_metrics['csat_score']['mean'].rank(
ascending=False
)
return team_metrics, team_priority_metrics, team_agent_metrics
# Custom aggregation functions
def calculate_p95(series):
"""Calculate 95th percentile."""
return series.quantile(0.95)
def calculate_p99(series):
"""Calculate 99th percentile."""
return series.quantile(0.99)
# Advanced groupby with custom aggregations
def detailed_response_time_analysis(tickets_df):
"""Calculate detailed response time statistics."""
return tickets_df.groupby('priority').agg({
'first_response_time': [
'count',
'mean',
'median',
'std',
'min',
'max',
calculate_p95,
calculate_p99
]
})
Complex Join Operations
# Merge ticket, customer, and agent data
def create_comprehensive_dataset(tickets_df, customers_df, agents_df, csat_df):
"""
Merge multiple data sources into comprehensive dataset.
Args:
tickets_df: Ticket information
customers_df: Customer information
agents_df: Agent information
csat_df: Customer satisfaction scores
Returns:
Merged DataFrame with all relevant information
"""
# Step 1: Merge tickets with customers (left join - keep all tickets)
data = tickets_df.merge(
customers_df[['customer_id', 'name', 'tier', 'industry', 'contract_value']],
on='customer_id',
how='left',
suffixes=('', '_customer')
)
# Step 2: Merge with agents (left join)
data = data.merge(
agents_df[['agent_id', 'name', 'team', 'hire_date', 'specialization']],
on='agent_id',
how='left',
suffixes=('', '_agent')
)
# Step 3: Merge with CSAT scores (left join)
data = data.merge(
csat_df[['ticket_id', 'csat_score', 'csat_comment']],
on='ticket_id',
how='left'
)
# Validate merge results
print(f"Original tickets: {len(tickets_df)}")
print(f"After merges: {len(data)}")
print(f"Customers matched: {data['name_customer'].notna().sum()}")
print(f"Agents matched: {data['name_agent'].notna().sum()}")
print(f"CSAT scores available: {data['csat_score'].notna().sum()}")
return data
# Concat operations for combining time periods
def combine_historical_data(data_sources):
"""
Combine data from multiple time periods or sources.
Args:
data_sources: List of DataFrames to combine
Returns:
Combined DataFrame with source tracking
"""
# Add source identifier to each DataFrame
for i, df in enumerate(data_sources):
df['source_batch'] = f'batch_{i+1}'
# Concatenate vertically
combined = pd.concat(data_sources, ignore_index=True)
# Remove duplicates (prefer newer data)
combined = combined.sort_values('updated_at', ascending=False)
combined = combined.drop_duplicates(subset=['ticket_id'], keep='first')
return combined
Resampling and Rolling Windows
# Time series operations for support metrics
def calculate_rolling_metrics(tickets_df, window_days=7):
"""
Calculate rolling window metrics for trend analysis.
Args:
tickets_df: Ticket DataFrame with datetime index
window_days: Window size in days
Returns:
DataFrame with rolling metrics
"""
# Prepare time series
ts_data = tickets_df.set_index('created_at').sort_index()
# Daily aggregation
daily_metrics = ts_data.resample('D').agg({
'ticket_id': 'count',
'resolution_time': 'mean',
'csat_score': 'mean',
'resolution_sla_met': 'mean'
}).rename(columns={'ticket_id': 'daily_tickets'})
# Rolling window calculations
window = window_days
daily_metrics['tickets_rolling_avg'] = (
daily_metrics['daily_tickets'].rolling(window).mean()
)
daily_metrics['tickets_rolling_std'] = (
daily_metrics['daily_tickets'].rolling(window).std()
)
# Calculate control limits for anomaly detection
daily_metrics['upper_control_limit'] = (
daily_metrics['tickets_rolling_avg'] +
(2 * daily_metrics['tickets_rolling_std'])
)
daily_metrics['lower_control_limit'] = (
daily_metrics['tickets_rolling_avg'] -
(2 * daily_metrics['tickets_rolling_std'])
).clip(lower=0)
# Flag anomalies
daily_metrics['is_anomaly'] = (
(daily_metrics['daily_tickets'] > daily_metrics['upper_control_limit']) |
(daily_metrics['daily_tickets'] < daily_metrics['lower_control_limit'])
)
return daily_metrics
# Business day calculations
def calculate_business_day_metrics(tickets_df):
"""Calculate metrics excluding weekends and holidays."""
from pandas.tseries.offsets import CustomBusinessDay
# Define US holidays (customize as needed)
us_bd = CustomBusinessDay()
# Filter to business days only
tickets_df['is_business_day'] = tickets_df['created_at'].dt.dayofweek < 5
business_tickets = tickets_df[tickets_df['is_business_day']]
# Calculate business day metrics
bd_metrics = business_tickets.groupby(
business_tickets['created_at'].dt.date
).agg({
'ticket_id': 'count',
'resolution_time': 'mean'
})
return bd_metrics
Creating Management Reports
# Pivot tables for executive reporting
def create_executive_dashboard_data(tickets_df):
"""
Create pivot tables for executive dashboard.
Returns:
Dictionary of pivot tables for different views
"""
dashboards = {}
# 1. Tickets by Team and Priority
dashboards['team_priority'] = pd.pivot_table(
tickets_df,
values='ticket_id',
index='agent_team',
columns='priority',
aggfunc='count',
fill_value=0,
margins=True,
margins_name='Total'
)
# 2. Average Resolution Time by Team and Channel
dashboards['resolution_by_team_channel'] = pd.pivot_table(
tickets_df,
values='resolution_time',
index='agent_team',
columns='channel',
aggfunc='mean',
fill_value=0
)
# 3. SLA Compliance by Priority and Week
tickets_df['week'] = tickets_df['created_at'].dt.to_period('W')
dashboards['sla_compliance_weekly'] = pd.pivot_table(
tickets_df,
values='resolution_sla_met',
index='week',
columns='priority',
aggfunc='mean',
fill_value=0
)
# 4. CSAT by Agent and Customer Tier
dashboards['csat_by_agent_tier'] = pd.pivot_table(
tickets_df,
values='csat_score',
index='agent_name',
columns='customer_tier',
aggfunc=['mean', 'count'],
fill_value=0
)
# 5. Ticket Volume Heatmap (Day of Week vs Hour)
tickets_df['day_of_week'] = tickets_df['created_at'].dt.day_name()
tickets_df['hour'] = tickets_df['created_at'].dt.hour
dashboards['volume_heatmap'] = pd.pivot_table(
tickets_df,
values='ticket_id',
index='day_of_week',
columns='hour',
aggfunc='count',
fill_value=0
)
return dashboards
# Cross-tabulation for category analysis
def analyze_category_distribution(tickets_df):
"""Create cross-tabs for ticket category analysis."""
# Category vs Priority
category_priority = pd.crosstab(
tickets_df['category'],
tickets_df['priority'],
normalize='index', # Row percentages
margins=True
)
# Category vs Team (with counts)
category_team = pd.crosstab(
tickets_df['category'],
tickets_df['agent_team'],
margins=True
)
return category_priority, category_team
Export to Multiple Formats
# Export data for stakeholder reporting
def export_monthly_report(tickets_df, output_dir, month):
"""
Export comprehensive monthly report in multiple formats.
Args:
tickets_df: Ticket data for the month
output_dir: Directory to save reports
month: Month identifier (e.g., '2024-01')
"""
import os
from datetime import datetime
# 1. Export to Excel with multiple sheets
excel_path = os.path.join(output_dir, f'support_report_{month}.xlsx')
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
# Summary sheet
summary = tickets_df.groupby('priority').agg({
'ticket_id': 'count',
'resolution_time': ['mean', 'median'],
'csat_score': 'mean',
'resolution_sla_met': 'mean'
})
summary.to_excel(writer, sheet_name='Summary')
# Team metrics sheet
team_metrics = tickets_df.groupby('agent_team').agg({
'ticket_id': 'count',
'resolution_time': 'mean',
'csat_score': 'mean'
})
team_metrics.to_excel(writer, sheet_name='Team Metrics')
# Raw data sheet (limited to first 10000 rows)
tickets_df.head(10000).to_excel(
writer,
sheet_name='Raw Data',
index=False
)
# 2. Export to CSV for data analysis
csv_path = os.path.join(output_dir, f'tickets_{month}.csv')
tickets_df.to_csv(csv_path, index=False, encoding='utf-8')
# 3. Export to JSON for API consumption
json_path = os.path.join(output_dir, f'metrics_{month}.json')
metrics = {
'total_tickets': int(tickets_df['ticket_id'].count()),
'avg_resolution_time': float(tickets_df['resolution_time'].mean()),
'sla_compliance': float(tickets_df['resolution_sla_met'].mean()),
'avg_csat': float(tickets_df['csat_score'].mean()),
'by_priority': tickets_df.groupby('priority')['ticket_id'].count().to_dict()
}
with open(json_path, 'w') as f:
import json
json.dump(metrics, f, indent=2, default=str)
# 4. Export to Parquet for efficient storage
parquet_path = os.path.join(output_dir, f'tickets_{month}.parquet')
tickets_df.to_parquet(parquet_path, compression='snappy', index=False)
print(f"Reports exported to {output_dir}")
print(f" - Excel: {excel_path}")
print(f" - CSV: {csv_path}")
print(f" - JSON: {json_path}")
print(f" - Parquet: {parquet_path}")
# Format DataFrames for presentation
def format_for_presentation(df):
"""Format DataFrame for stakeholder presentation."""
# Round numeric columns
numeric_cols = df.select_dtypes(include=['float64', 'float32']).columns
df[numeric_cols] = df[numeric_cols].round(2)
# Format percentages
percentage_cols = [col for col in df.columns if 'rate' in col or 'pct' in col]
for col in percentage_cols:
df[col] = df[col].apply(lambda x: f"{x*100:.1f}%")
# Format currency if applicable
currency_cols = [col for col in df.columns if 'revenue' in col or 'value' in col]
for col in currency_cols:
df[col] = df[col].apply(lambda x: f"${x:,.2f}")
return df
Memory Optimization Techniques
# Optimize DataFrame memory usage
def optimize_dataframe_memory(df):
"""
Reduce DataFrame memory footprint.
Args:
df: DataFrame to optimize
Returns:
Optimized DataFrame with memory usage report
"""
initial_memory = df.memory_usage(deep=True).sum() / 1024**2
# Optimize integer columns
int_cols = df.select_dtypes(include=['int64']).columns
for col in int_cols:
col_min = df[col].min()
col_max = df[col].max()
if col_min >= 0:
if col_max < 255:
df[col] = df[col].astype('uint8')
elif col_max < 65535:
df[col] = df[col].astype('uint16')
elif col_max < 4294967295:
df[col] = df[col].astype('uint32')
else:
if col_min > -128 and col_max < 127:
df[col] = df[col].astype('int8')
elif col_min > -32768 and col_max < 32767:
df[col] = df[col].astype('int16')
elif col_min > -2147483648 and col_max < 2147483647:
df[col] = df[col].astype('int32')
# Optimize float columns
float_cols = df.select_dtypes(include=['float64']).columns
df[float_cols] = df[float_cols].astype('float32')
# Convert object columns to category if cardinality is low
object_cols = df.select_dtypes(include=['object']).columns
for col in object_cols:
num_unique = df[col].nunique()
num_total = len(df[col])
if num_unique / num_total < 0.5: # Less than 50% unique values
df[col] = df[col].astype('category')
final_memory = df.memory_usage(deep=True).sum() / 1024**2
reduction = (1 - final_memory/initial_memory) * 100
print(f"Memory usage reduced from {initial_memory:.2f} MB to {final_memory:.2f} MB")
print(f"Reduction: {reduction:.1f}%")
return df
# Chunked processing for large datasets
def process_large_dataset_in_chunks(file_path, chunk_size=10000):
"""
Process large CSV files in chunks to avoid memory issues.
Args:
file_path: Path to large CSV file
chunk_size: Number of rows per chunk
Returns:
Aggregated results from all chunks
"""
# Initialize aggregation containers
total_tickets = 0
priority_counts = {}
# Process in chunks
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# Process each chunk
chunk = clean_ticket_data(chunk)[0]
# Aggregate metrics
total_tickets += len(chunk)
chunk_priority = chunk['priority'].value_counts().to_dict()
for priority, count in chunk_priority.items():
priority_counts[priority] = priority_counts.get(priority, 0) + count
return {
'total_tickets': total_tickets,
'priority_distribution': priority_counts
}
Validation Framework
# Comprehensive data quality checks
class DataQualityValidator:
"""Validate data quality for support ticket datasets."""
def __init__(self, df):
self.df = df
self.issues = []
def check_required_columns(self, required_cols):
"""Ensure all required columns are present."""
missing = set(required_cols) - set(self.df.columns)
if missing:
self.issues.append(f"Missing required columns: {missing}")
return len(missing) == 0
def check_null_percentages(self, max_null_pct=0.1):
"""Check if null percentage exceeds threshold."""
null_pct = self.df.isnull().sum() / len(self.df)
excessive_nulls = null_pct[null_pct > max_null_pct]
if not excessive_nulls.empty:
self.issues.append(
f"Columns with >{max_null_pct*100}% nulls: {excessive_nulls.to_dict()}"
)
return excessive_nulls.empty
def check_duplicate_ids(self, id_column='ticket_id'):
"""Check for duplicate ticket IDs."""
duplicates = self.df[id_column].duplicated().sum()
if duplicates > 0:
self.issues.append(f"Found {duplicates} duplicate ticket IDs")
return duplicates == 0
def check_date_logic(self):
"""Validate date field logic."""
issues_found = 0
# Created date should be before resolved date
if 'created_at' in self.df.columns and 'resolved_at' in self.df.columns:
invalid = (
self.df['resolved_at'].notna() &
(self.df['resolved_at'] < self.df['created_at'])
).sum()
if invalid > 0:
self.issues.append(
f"Found {invalid} tickets with resolved_at before created_at"
)
issues_found += invalid
# Check for future dates
now = pd.Timestamp.now()
for date_col in ['created_at', 'resolved_at', 'first_response_at']:
if date_col in self.df.columns:
future_dates = (self.df[date_col] > now).sum()
if future_dates > 0:
self.issues.append(
f"Found {future_dates} future dates in {date_col}"
)
issues_found += future_dates
return issues_found == 0
def check_value_ranges(self, range_checks):
"""
Check if values are within expected ranges.
Args:
range_checks: Dict with column: (min, max) pairs
"""
for col, (min_val, max_val) in range_checks.items():
if col in self.df.columns:
out_of_range = (
(self.df[col] < min_val) | (self.df[col] > max_val)
).sum()
if out_of_range > 0:
self.issues.append(
f"{col}: {out_of_range} values outside range [{min_val}, {max_val}]"
)
def generate_report(self):
"""Generate comprehensive validation report."""
return {
'total_rows': len(self.df),
'total_columns': len(self.df.columns),
'issues_found': len(self.issues),
'issues': self.issues,
'memory_usage_mb': self.df.memory_usage(deep=True).sum() / 1024**2,
'null_summary': self.df.isnull().sum().to_dict()
}
Unit Testing with pytest
# pytest fixtures and tests for data operations
import pytest
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
@pytest.fixture
def sample_ticket_data():
"""Create sample ticket data for testing."""
np.random.seed(42)
n_tickets = 100
return pd.DataFrame({
'ticket_id': range(1, n_tickets + 1),
'created_at': pd.date_range('2024-01-01', periods=n_tickets, freq='H'),
'priority': np.random.choice(['low', 'medium', 'high'], n_tickets),
'status': np.random.choice(['open', 'in_progress', 'resolved'], n_tickets),
'agent_id': np.random.choice(['A001', 'A002', 'A003'], n_tickets),
'customer_id': np.random.choice(['C001', 'C002', 'C003'], n_tickets)
})
def test_ticket_data_shape(sample_ticket_data):
"""Test that sample data has expected shape."""
assert sample_ticket_data.shape == (100, 6)
assert 'ticket_id' in sample_ticket_data.columns
def test_sla_calculation():
"""Test SLA calculation logic."""
df = pd.DataFrame({
'ticket_id': [1, 2],
'created_at': pd.to_datetime(['2024-01-01 10:00', '2024-01-01 11:00']),
'first_response_at': pd.to_datetime(['2024-01-01 11:00', '2024-01-01 14:00']),
'sla_target_hours': [2, 2]
})
df['response_time_hours'] = (
df['first_response_at'] - df['created_at']
).dt.total_seconds() / 3600
df['sla_met'] = df['response_time_hours'] <= df['sla_target_hours']
assert df.loc[0, 'sla_met'] == True
assert df.loc[1, 'sla_met'] == False
def test_data_cleaning_removes_nulls(sample_ticket_data):
"""Test that data cleaning handles null values."""
# Add some null values
df = sample_ticket_data.copy()
df.loc[0, 'agent_id'] = None
df.loc[1, 'customer_id'] = None
# Apply cleaning
cleaned, report = clean_ticket_data(df)
# Verify nulls were handled
assert 'UNASSIGNED' in cleaned['agent_id'].values
assert report['missing_before']['agent_id'] == 1
def test_groupby_aggregation(sample_ticket_data):
"""Test groupby aggregation produces correct results."""
result = sample_ticket_data.groupby('priority')['ticket_id'].count()
assert result.sum() == 100
assert all(priority in result.index for priority in ['low', 'medium', 'high'])
Avoid Python loops when working with pandas. Use vectorized operations for better performance:
# Bad - slow loop
for idx, row in df.iterrows():
df.at[idx, 'new_col'] = row['col1'] * row['col2']
# Good - vectorized operation
df['new_col'] = df['col1'] * df['col2']
result = (
df
.query('status == "resolved"')
.groupby('agent_id')
.agg({'resolution_time': 'mean'})
.sort_values('resolution_time')
.head(10)
)
Convert to appropriate data types immediately after loading to save memory and improve performance.
.loc[] and .iloc[] ExplicitlyAvoid chained indexing which can lead to SettingWithCopyWarning and unexpected behavior.
Always work with timezone-aware datetime objects for support data across regions.
Add comments explaining business logic in complex transformations.
Implement validation checks after major transformations to catch issues early.
Set meaningful indices (datetime for time series, ticket_id for lookups) to improve performance.
.loc[] for setting values.any() or .all() when evaluating Series in boolean contextAlways write tests for data transformation functions using pytest fixtures and parametrize decorators.
Use SQLAlchemy engines for database connections and leverage pandas' read_sql and to_sql methods.
Store processed metrics in PostgreSQL for historical tracking and dashboard consumption.
Use pd.ExcelWriter with the openpyxl engine for creating multi-sheet Excel reports.
You are now equipped to handle comprehensive data analysis and manipulation tasks for customer support operations using pandas. Apply these patterns to analyze ticket data, track SLA compliance, measure agent performance, and generate actionable insights for support teams. Always prioritize data quality, performance optimization, and clear, maintainable code.
Weekly Installs
59
Repository
GitHub Stars
44
First Seen
Jan 22, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
gemini-cli45
opencode45
codex44
claude-code42
cursor42
github-copilot38
DOCX文件创建、编辑与分析完整指南 - 使用docx-js、Pandoc和Python脚本
55,800 周安装