The Agent Skills Directory
npx skills add https://smithery.ai/skills/aj-geddes/data-cleaning-pipeline数据清洗流程通过系统性地处理缺失值、异常值和数据质量问题,将原始、杂乱的数据转换为适用于分析和建模的干净、标准化格式。
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.impute import SimpleImputer, KNNImputer
# 加载原始数据
df = pd.read_csv('raw_data.csv')
# 步骤 1: 识别和处理缺失值
print("缺失值:\n", df.isnull().sum())
# 策略 1: 删除关键字段缺失的行
df = df.dropna(subset=['customer_id', 'transaction_date'])
# 策略 2: 使用中位数填补数值列
imputer = SimpleImputer(strategy='median')
df['age'] = imputer.fit_transform(df[['age']])
# 策略 3: 对相关特征使用 KNN 填补
knn_imputer = KNNImputer(n_neighbors=5)
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = knn_imputer.fit_transform(df[numeric_cols])
# 策略 4: 使用众数填补分类变量
df['category'] = df['category'].fillna(df['category'].mode()[0])
# 步骤 2: 处理重复项
print(f"重复行数: {df.duplicated().sum()}")
df = df.drop_duplicates()
# 基于特定列的重复项
df = df.drop_duplicates(subset=['customer_id', 'transaction_date'])
# 步骤 3: 异常值检测和处理
Q1 = df['amount'].quantile(0.25)
Q3 = df['amount'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 移除异常值
df = df[(df['amount'] >= lower_bound) & (df['amount'] <= upper_bound)]
# 替代方案: 截断异常值
df['amount'] = df['amount'].clip(lower=lower_bound, upper=upper_bound)
# 步骤 4: 数据类型标准化
df['transaction_date'] = pd.to_datetime(df['transaction_date'])
df['customer_id'] = df['customer_id'].astype('int64')
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# 步骤 5: 文本清洗
df['name'] = df['name'].str.strip().str.lower()
df['name'] = df['name'].str.replace('[^a-z0-9\s]', '', regex=True)
# 步骤 6: 归一化和缩放
scaler = StandardScaler()
df[['age', 'income']] = scaler.fit_transform(df[['age', 'income']])
# 使用 MinMax 缩放至有界范围 [0, 1]
minmax_scaler = MinMaxScaler()
df[['score']] = minmax_scaler.fit_transform(df[['score']])
# 步骤 7: 创建数据质量报告
def create_quality_report(df_original, df_cleaned):
report = {
'原始行数': len(df_original),
'清洗后行数': len(df_cleaned),
'移除行数': len(df_original) - len(df_cleaned),
'移除百分比': ((len(df_original) - len(df_cleaned)) / len(df_original) * 100),
'原始缺失值数': df_original.isnull().sum().sum(),
'清洗后缺失值数': df_cleaned.isnull().sum().sum(),
}
return pd.DataFrame(report, index=[0])
quality = create_quality_report(df, df)
print(quality)
# 步骤 8: 验证检查
assert df['age'].isnull().sum() == 0, "Age 存在缺失值"
assert df['transaction_date'].dtype == 'datetime64[ns]', "日期不是 datetime 类型"
assert (df['amount'] >= 0).all(), "检测到负值金额"
print("数据清洗流程成功完成!")
class DataCleaningPipeline:
def __init__(self):
self.cleaner_steps = []
def add_step(self, func, description):
self.cleaner_steps.append((func, description))
return self
def execute(self, df):
for func, desc in self.cleaner_steps:
print(f"正在执行: {desc}")
df = func(df)
return df
# 使用示例
pipeline = DataCleaningPipeline()
pipeline.add_step(
lambda df: df.dropna(subset=['customer_id']),
"移除 customer_id 缺失的行"
).add_step(
lambda df: df.drop_duplicates(),
"移除重复行"
).add_step(
lambda df: df[(df['amount'] > 0) & (df['amount'] < 100000)],
"过滤无效金额范围"
)
df_clean = pipeline.execute(df)
# 步骤 9: 特定特征清洗
df['phone'] = df['phone'].str.replace(r'\D', '', regex=True) # 移除非数字字符
# 步骤 10: 日期时间处理
df['created_date'] = pd.to_datetime(df['created_date'], errors='coerce')
df['days_since_creation'] = (pd.Timestamp.now() - df['created_date']).dt.days
# 步骤 11: 分类变量标准化
df['status'] = df['status'].str.lower().str.strip()
df['status'] = df['status'].replace({
'active': 'active',
'inactive': 'inactive',
'pending': 'pending',
})
# 步骤 12: 数值约束检查
df['age'] = df['age'].where((df['age'] >= 0) & (df['age'] <= 150), np.nan)
df['percentage'] = df['percentage'].where((df['percentage'] >= 0) & (df['percentage'] <= 100), np.nan)
# 步骤 13: 创建数据质量评分
quality_score = {
'缺失百分比': (df.isnull().sum() / len(df) * 100).mean(),
'重复百分比': (df.duplicated().sum() / len(df) * 100),
'完整特征百分比': (df.notna().sum() / len(df)).mean() * 100,
}
# 步骤 14: 生成清洗报告
cleaning_report = f"""
数据清洗报告
====================
移除行数: {len(df) - len(df_clean)}
列数: {len(df_clean.columns)}
剩余行数: {len(df_clean)}
数据完整性: {(df_clean.notna().sum().sum() / (len(df_clean) * len(df_clean.columns)) * 100):.1f}%
"""
print(cleaning_report)
每周安装次数
–
来源
首次出现
–
Data cleaning pipelines transform raw, messy data into clean, standardized formats suitable for analysis and modeling through systematic handling of missing values, outliers, and data quality issues.
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.impute import SimpleImputer, KNNImputer
# Load raw data
df = pd.read_csv('raw_data.csv')
# Step 1: Identify and handle missing values
print("Missing values:\n", df.isnull().sum())
# Strategy 1: Delete rows with critical missing values
df = df.dropna(subset=['customer_id', 'transaction_date'])
# Strategy 2: Impute numerical columns with median
imputer = SimpleImputer(strategy='median')
df['age'] = imputer.fit_transform(df[['age']])
# Strategy 3: Use KNN imputation for related features
knn_imputer = KNNImputer(n_neighbors=5)
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = knn_imputer.fit_transform(df[numeric_cols])
# Strategy 4: Fill categorical with mode
df['category'] = df['category'].fillna(df['category'].mode()[0])
# Step 2: Handle duplicates
print(f"Duplicate rows: {df.duplicated().sum()}")
df = df.drop_duplicates()
# Duplicate on specific columns
df = df.drop_duplicates(subset=['customer_id', 'transaction_date'])
# Step 3: Outlier detection and handling
Q1 = df['amount'].quantile(0.25)
Q3 = df['amount'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# Remove outliers
df = df[(df['amount'] >= lower_bound) & (df['amount'] <= upper_bound)]
# Alternative: Cap outliers
df['amount'] = df['amount'].clip(lower=lower_bound, upper=upper_bound)
# Step 4: Data type standardization
df['transaction_date'] = pd.to_datetime(df['transaction_date'])
df['customer_id'] = df['customer_id'].astype('int64')
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# Step 5: Text cleaning
df['name'] = df['name'].str.strip().str.lower()
df['name'] = df['name'].str.replace('[^a-z0-9\s]', '', regex=True)
# Step 6: Normalization and scaling
scaler = StandardScaler()
df[['age', 'income']] = scaler.fit_transform(df[['age', 'income']])
# MinMax scaling for bounded range [0, 1]
minmax_scaler = MinMaxScaler()
df[['score']] = minmax_scaler.fit_transform(df[['score']])
# Step 7: Create data quality report
def create_quality_report(df_original, df_cleaned):
report = {
'Original rows': len(df_original),
'Cleaned rows': len(df_cleaned),
'Rows removed': len(df_original) - len(df_cleaned),
'Removal percentage': ((len(df_original) - len(df_cleaned)) / len(df_original) * 100),
'Original missing': df_original.isnull().sum().sum(),
'Cleaned missing': df_cleaned.isnull().sum().sum(),
}
return pd.DataFrame(report, index=[0])
quality = create_quality_report(df, df)
print(quality)
# Step 8: Validation checks
assert df['age'].isnull().sum() == 0, "Age has missing values"
assert df['transaction_date'].dtype == 'datetime64[ns]', "Date not datetime"
assert (df['amount'] >= 0).all(), "Negative amounts detected"
print("Data cleaning pipeline completed successfully!")
class DataCleaningPipeline:
def __init__(self):
self.cleaner_steps = []
def add_step(self, func, description):
self.cleaner_steps.append((func, description))
return self
def execute(self, df):
for func, desc in self.cleaner_steps:
print(f"Executing: {desc}")
df = func(df)
return df
# Usage
pipeline = DataCleaningPipeline()
pipeline.add_step(
lambda df: df.dropna(subset=['customer_id']),
"Remove rows with missing customer_id"
).add_step(
lambda df: df.drop_duplicates(),
"Remove duplicate rows"
).add_step(
lambda df: df[(df['amount'] > 0) & (df['amount'] < 100000)],
"Filter invalid amount ranges"
)
df_clean = pipeline.execute(df)
# Step 9: Feature-specific cleaning
df['phone'] = df['phone'].str.replace(r'\D', '', regex=True) # Remove non-digits
# Step 10: Datetime handling
df['created_date'] = pd.to_datetime(df['created_date'], errors='coerce')
df['days_since_creation'] = (pd.Timestamp.now() - df['created_date']).dt.days
# Step 11: Categorical standardization
df['status'] = df['status'].str.lower().str.strip()
df['status'] = df['status'].replace({
'active': 'active',
'inactive': 'inactive',
'pending': 'pending',
})
# Step 12: Numeric constraint checking
df['age'] = df['age'].where((df['age'] >= 0) & (df['age'] <= 150), np.nan)
df['percentage'] = df['percentage'].where((df['percentage'] >= 0) & (df['percentage'] <= 100), np.nan)
# Step 13: Create data quality score
quality_score = {
'Missing %': (df.isnull().sum() / len(df) * 100).mean(),
'Duplicates %': (df.duplicated().sum() / len(df) * 100),
'Complete Features': (df.notna().sum() / len(df)).mean() * 100,
}
# Step 14: Generate cleaning report
cleaning_report = f"""
DATA CLEANING REPORT
====================
Rows removed: {len(df) - len(df_clean)}
Columns: {len(df_clean.columns)}
Remaining rows: {len(df_clean)}
Completeness: {(df_clean.notna().sum().sum() / (len(df_clean) * len(df_clean.columns)) * 100):.1f}%
"""
print(cleaning_report)
Weekly Installs
–
Source
First Seen
–
DOCX文件创建、编辑与分析完整指南 - 使用docx-js、Pandoc和Python脚本
46,400 周安装
技能升级器:使用决策理论v5和RAG将任何技能升级为元技能
211 周安装
Claude技能开发指南:创建自定义MCP管道技能与元技能开发教程
211 周安装
色彩可访问性指南:WCAG对比度标准、色盲模拟与最佳实践
212 周安装
AgentOps技能转换器 - 一键将技能转换为Codex、Cursor等AI平台格式
212 周安装
Agile Skill Build:快速创建和扩展ace-skills的自动化工具,提升AI技能开发效率
1 周安装
LLM评估工具lm-evaluation-harness使用指南:HuggingFace模型基准测试与性能分析
212 周安装