Data Cleaning Pipeline by aj-geddes/useful-ai-prompts
npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill 'Data Cleaning Pipeline'数据清洗流水线通过系统性地处理缺失值、异常值和数据质量问题,将原始、混乱的数据转换为适合分析和建模的干净、标准化格式。
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.impute import SimpleImputer, KNNImputer
# 加载原始数据
df = pd.read_csv('raw_data.csv')
# 步骤 1: 识别并处理缺失值
print("Missing values:\n", df.isnull().sum())
# 策略 1: 删除关键字段缺失的行
df = df.dropna(subset=['customer_id', 'transaction_date'])
# 策略 2: 使用中位数填补数值列
imputer = SimpleImputer(strategy='median')
df['age'] = imputer.fit_transform(df[['age']])
# 策略 3: 对相关特征使用 KNN 填补
knn_imputer = KNNImputer(n_neighbors=5)
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = knn_imputer.fit_transform(df[numeric_cols])
# 策略 4: 使用众数填充分类变量
df['category'] = df['category'].fillna(df['category'].mode()[0])
# 步骤 2: 处理重复项
print(f"Duplicate rows: {df.duplicated().sum()}")
df = df.drop_duplicates()
# 基于特定列的重复项
df = df.drop_duplicates(subset=['customer_id', 'transaction_date'])
# 步骤 3: 异常值检测与处理
Q1 = df['amount'].quantile(0.25)
Q3 = df['amount'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 移除异常值
df = df[(df['amount'] >= lower_bound) & (df['amount'] <= upper_bound)]
# 替代方案:截断异常值
df['amount'] = df['amount'].clip(lower=lower_bound, upper=upper_bound)
# 步骤 4: 数据类型标准化
df['transaction_date'] = pd.to_datetime(df['transaction_date'])
df['customer_id'] = df['customer_id'].astype('int64')
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# 步骤 5: 文本清洗
df['name'] = df['name'].str.strip().str.lower()
df['name'] = df['name'].str.replace('[^a-z0-9\s]', '', regex=True)
# 步骤 6: 归一化和缩放
scaler = StandardScaler()
df[['age', 'income']] = scaler.fit_transform(df[['age', 'income']])
# 使用 MinMax 缩放至有界范围 [0, 1]
minmax_scaler = MinMaxScaler()
df[['score']] = minmax_scaler.fit_transform(df[['score']])
# 步骤 7: 创建数据质量报告
def create_quality_report(df_original, df_cleaned):
report = {
'Original rows': len(df_original),
'Cleaned rows': len(df_cleaned),
'Rows removed': len(df_original) - len(df_cleaned),
'Removal percentage': ((len(df_original) - len(df_cleaned)) / len(df_original) * 100),
'Original missing': df_original.isnull().sum().sum(),
'Cleaned missing': df_cleaned.isnull().sum().sum(),
}
return pd.DataFrame(report, index=[0])
quality = create_quality_report(df, df)
print(quality)
# 步骤 8: 验证检查
assert df['age'].isnull().sum() == 0, "Age has missing values"
assert df['transaction_date'].dtype == 'datetime64[ns]', "Date not datetime"
assert (df['amount'] >= 0).all(), "Negative amounts detected"
print("Data cleaning pipeline completed successfully!")
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
class DataCleaningPipeline:
def __init__(self):
self.cleaner_steps = []
def add_step(self, func, description):
self.cleaner_steps.append((func, description))
return self
def execute(self, df):
for func, desc in self.cleaner_steps:
print(f"Executing: {desc}")
df = func(df)
return df
# 用法
pipeline = DataCleaningPipeline()
pipeline.add_step(
lambda df: df.dropna(subset=['customer_id']),
"Remove rows with missing customer_id"
).add_step(
lambda df: df.drop_duplicates(),
"Remove duplicate rows"
).add_step(
lambda df: df[(df['amount'] > 0) & (df['amount'] < 100000)],
"Filter invalid amount ranges"
)
df_clean = pipeline.execute(df)
# 步骤 9: 特定特征清洗
df['phone'] = df['phone'].str.replace(r'\D', '', regex=True) # 移除非数字字符
# 步骤 10: 日期时间处理
df['created_date'] = pd.to_datetime(df['created_date'], errors='coerce')
df['days_since_creation'] = (pd.Timestamp.now() - df['created_date']).dt.days
# 步骤 11: 分类变量标准化
df['status'] = df['status'].str.lower().str.strip()
df['status'] = df['status'].replace({
'active': 'active',
'inactive': 'inactive',
'pending': 'pending',
})
# 步骤 12: 数值约束检查
df['age'] = df['age'].where((df['age'] >= 0) & (df['age'] <= 150), np.nan)
df['percentage'] = df['percentage'].where((df['percentage'] >= 0) & (df['percentage'] <= 100), np.nan)
# 步骤 13: 创建数据质量评分
quality_score = {
'Missing %': (df.isnull().sum() / len(df) * 100).mean(),
'Duplicates %': (df.duplicated().sum() / len(df) * 100),
'Complete Features': (df.notna().sum() / len(df)).mean() * 100,
}
# 步骤 14: 生成清洗报告
cleaning_report = f"""
DATA CLEANING REPORT
====================
Rows removed: {len(df) - len(df_clean)}
Columns: {len(df_clean.columns)}
Remaining rows: {len(df_clean)}
Completeness: {(df_clean.notna().sum().sum() / (len(df_clean) * len(df_clean.columns)) * 100):.1f}%
"""
print(cleaning_report)
每周安装量
0
代码仓库
GitHub 星标数
126
首次出现
1970年1月1日
安全审计
Data cleaning pipelines transform raw, messy data into clean, standardized formats suitable for analysis and modeling through systematic handling of missing values, outliers, and data quality issues.
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.impute import SimpleImputer, KNNImputer
# Load raw data
df = pd.read_csv('raw_data.csv')
# Step 1: Identify and handle missing values
print("Missing values:\n", df.isnull().sum())
# Strategy 1: Delete rows with critical missing values
df = df.dropna(subset=['customer_id', 'transaction_date'])
# Strategy 2: Impute numerical columns with median
imputer = SimpleImputer(strategy='median')
df['age'] = imputer.fit_transform(df[['age']])
# Strategy 3: Use KNN imputation for related features
knn_imputer = KNNImputer(n_neighbors=5)
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = knn_imputer.fit_transform(df[numeric_cols])
# Strategy 4: Fill categorical with mode
df['category'] = df['category'].fillna(df['category'].mode()[0])
# Step 2: Handle duplicates
print(f"Duplicate rows: {df.duplicated().sum()}")
df = df.drop_duplicates()
# Duplicate on specific columns
df = df.drop_duplicates(subset=['customer_id', 'transaction_date'])
# Step 3: Outlier detection and handling
Q1 = df['amount'].quantile(0.25)
Q3 = df['amount'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# Remove outliers
df = df[(df['amount'] >= lower_bound) & (df['amount'] <= upper_bound)]
# Alternative: Cap outliers
df['amount'] = df['amount'].clip(lower=lower_bound, upper=upper_bound)
# Step 4: Data type standardization
df['transaction_date'] = pd.to_datetime(df['transaction_date'])
df['customer_id'] = df['customer_id'].astype('int64')
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# Step 5: Text cleaning
df['name'] = df['name'].str.strip().str.lower()
df['name'] = df['name'].str.replace('[^a-z0-9\s]', '', regex=True)
# Step 6: Normalization and scaling
scaler = StandardScaler()
df[['age', 'income']] = scaler.fit_transform(df[['age', 'income']])
# MinMax scaling for bounded range [0, 1]
minmax_scaler = MinMaxScaler()
df[['score']] = minmax_scaler.fit_transform(df[['score']])
# Step 7: Create data quality report
def create_quality_report(df_original, df_cleaned):
report = {
'Original rows': len(df_original),
'Cleaned rows': len(df_cleaned),
'Rows removed': len(df_original) - len(df_cleaned),
'Removal percentage': ((len(df_original) - len(df_cleaned)) / len(df_original) * 100),
'Original missing': df_original.isnull().sum().sum(),
'Cleaned missing': df_cleaned.isnull().sum().sum(),
}
return pd.DataFrame(report, index=[0])
quality = create_quality_report(df, df)
print(quality)
# Step 8: Validation checks
assert df['age'].isnull().sum() == 0, "Age has missing values"
assert df['transaction_date'].dtype == 'datetime64[ns]', "Date not datetime"
assert (df['amount'] >= 0).all(), "Negative amounts detected"
print("Data cleaning pipeline completed successfully!")
class DataCleaningPipeline:
def __init__(self):
self.cleaner_steps = []
def add_step(self, func, description):
self.cleaner_steps.append((func, description))
return self
def execute(self, df):
for func, desc in self.cleaner_steps:
print(f"Executing: {desc}")
df = func(df)
return df
# Usage
pipeline = DataCleaningPipeline()
pipeline.add_step(
lambda df: df.dropna(subset=['customer_id']),
"Remove rows with missing customer_id"
).add_step(
lambda df: df.drop_duplicates(),
"Remove duplicate rows"
).add_step(
lambda df: df[(df['amount'] > 0) & (df['amount'] < 100000)],
"Filter invalid amount ranges"
)
df_clean = pipeline.execute(df)
# Step 9: Feature-specific cleaning
df['phone'] = df['phone'].str.replace(r'\D', '', regex=True) # Remove non-digits
# Step 10: Datetime handling
df['created_date'] = pd.to_datetime(df['created_date'], errors='coerce')
df['days_since_creation'] = (pd.Timestamp.now() - df['created_date']).dt.days
# Step 11: Categorical standardization
df['status'] = df['status'].str.lower().str.strip()
df['status'] = df['status'].replace({
'active': 'active',
'inactive': 'inactive',
'pending': 'pending',
})
# Step 12: Numeric constraint checking
df['age'] = df['age'].where((df['age'] >= 0) & (df['age'] <= 150), np.nan)
df['percentage'] = df['percentage'].where((df['percentage'] >= 0) & (df['percentage'] <= 100), np.nan)
# Step 13: Create data quality score
quality_score = {
'Missing %': (df.isnull().sum() / len(df) * 100).mean(),
'Duplicates %': (df.duplicated().sum() / len(df) * 100),
'Complete Features': (df.notna().sum() / len(df)).mean() * 100,
}
# Step 14: Generate cleaning report
cleaning_report = f"""
DATA CLEANING REPORT
====================
Rows removed: {len(df) - len(df_clean)}
Columns: {len(df_clean.columns)}
Remaining rows: {len(df_clean)}
Completeness: {(df_clean.notna().sum().sum() / (len(df_clean) * len(df_clean.columns)) * 100):.1f}%
"""
print(cleaning_report)
Weekly Installs
0
Repository
GitHub Stars
126
First Seen
Jan 1, 1970
Security Audits
AI Elements:基于shadcn/ui的AI原生应用组件库,快速构建对话界面
58,500 周安装