Causal Inference by aj-geddes/useful-ai-prompts
npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill 'Causal Inference'因果推断旨在确定因果关系并估计处理效应,它超越了相关性分析,致力于理解事物之间的因果机制。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.preprocessing import StandardScaler
from scipy import stats
# 生成带有混杂的观测数据
np.random.seed(42)
n = 1000
# 混杂变量:年龄(同时影响处理和结果)
age = np.random.uniform(25, 75, n)
# 处理:培训项目(年轻人更可能参加)
treatment_prob = 0.3 + 0.3 * (75 - age) / 50 # 与年龄呈负相关
treatment = (np.random.uniform(0, 1, n) < treatment_prob).astype(int)
# 结果:薪资(同时受处理和年龄影响)
# 处理的真实因果效应:+$5000
salary = 40000 + 500 * age + 5000 * treatment + np.random.normal(0, 10000, n)
df = pd.DataFrame({
'age': age,
'treatment': treatment,
'salary': salary,
})
print("观测数据摘要:")
print(df.describe())
print(f"\n处理率: {df['treatment'].mean():.1%}")
print(f"平均薪资 (对照组): ${df[df['treatment']==0]['salary'].mean():.0f}")
print(f"平均薪资 (处理组): ${df[df['treatment']==1]['salary'].mean():.0f}")
# 1. 简单比较 (有偏 - 忽略混杂)
naive_effect = df[df['treatment']==1]['salary'].mean() - df[df['treatment']==0]['salary'].mean()
print(f"\n1. 简单比较: ${naive_effect:.0f} (有偏)")
# 2. 回归调整 (协变量调整)
X = df[['treatment', 'age']]
y = df['salary']
model = LinearRegression()
model.fit(X, y)
regression_effect = model.coef_[0]
print(f"\n2. 回归调整: ${regression_effect:.0f}")
# 3. 倾向得分匹配
# 估计给定协变量下的处理概率
ps_model = LogisticRegression()
ps_model.fit(df[['age']], df['treatment'])
df['propensity_score'] = ps_model.predict_proba(df[['age']])[:, 1]
print(f"\n3. 倾向得分匹配:")
print(f"倾向得分范围: [{df['propensity_score'].min():.3f}, {df['propensity_score'].max():.3f}]")
# 匹配:为每个处理单元寻找对照单元
matched_pairs = []
treated_units = df[df['treatment'] == 1].index
for treated_idx in treated_units:
treated_ps = df.loc[treated_idx, 'propensity_score']
treated_age = df.loc[treated_idx, 'age']
# 寻找最接近的对照单元
control_units = df[(df['treatment'] == 0) &
(df['propensity_score'] >= treated_ps - 0.1) &
(df['propensity_score'] <= treated_ps + 0.1)].index
if len(control_units) > 0:
closest_control = min(control_units,
key=lambda x: abs(df.loc[x, 'propensity_score'] - treated_ps))
matched_pairs.append({
'treated_idx': treated_idx,
'control_idx': closest_control,
'treated_salary': df.loc[treated_idx, 'salary'],
'control_salary': df.loc[closest_control, 'salary'],
})
matched_df = pd.DataFrame(matched_pairs)
psm_effect = (matched_df['treated_salary'] - matched_df['control_salary']).mean()
print(f"PSM 效应: ${psm_effect:.0f}")
print(f"匹配对数: {len(matched_df)}")
# 4. 按倾向得分分层
df['ps_stratum'] = pd.qcut(df['propensity_score'], q=5, labels=False, duplicates='drop')
stratified_effects = []
for stratum in df['ps_stratum'].unique():
stratum_data = df[df['ps_stratum'] == stratum]
if (stratum_data['treatment'] == 0).sum() > 0 and (stratum_data['treatment'] == 1).sum() > 0:
treated_mean = stratum_data[stratum_data['treatment'] == 1]['salary'].mean()
control_mean = stratum_data[stratum_data['treatment'] == 0]['salary'].mean()
effect = treated_mean - control_mean
stratified_effects.append(effect)
stratified_effect = np.mean(stratified_effects)
print(f"\n4. 倾向得分分层: ${stratified_effect:.0f}")
# 5. 可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 按年龄的处理分布
ax = axes[0, 0]
treated = df[df['treatment'] == 1]
control = df[df['treatment'] == 0]
ax.hist(control['age'], bins=20, alpha=0.6, label='对照组', color='blue')
ax.hist(treated['age'], bins=20, alpha=0.6, label='处理组', color='red')
ax.set_xlabel('年龄')
ax.set_ylabel('频数')
ax.set_title('按处理分组的年龄分布')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')
# 薪资 vs 年龄 (按处理着色)
ax = axes[0, 1]
ax.scatter(control['age'], control['salary'], alpha=0.5, label='对照组', s=30)
ax.scatter(treated['age'], treated['salary'], alpha=0.5, label='处理组', s=30, color='red')
ax.set_xlabel('年龄')
ax.set_ylabel('薪资')
ax.set_title('按处理分组的薪资与年龄关系')
ax.legend()
ax.grid(True, alpha=0.3)
# 倾向得分分布
ax = axes[1, 0]
ax.hist(df[df['treatment'] == 0]['propensity_score'], bins=20, alpha=0.6, label='对照组', color='blue')
ax.hist(df[df['treatment'] == 1]['propensity_score'], bins=20, alpha=0.6, label='处理组', color='red')
ax.set_xlabel('倾向得分')
ax.set_ylabel('频数')
ax.set_title('倾向得分分布')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')
# 处理效应比较
ax = axes[1, 1]
methods = ['简单比较', '回归调整', 'PSM', '分层']
effects = [naive_effect, regression_effect, psm_effect, stratified_effect]
true_effect = 5000
ax.bar(methods, effects, color=['red', 'orange', 'yellow', 'lightgreen'], alpha=0.7, edgecolor='black')
ax.axhline(y=true_effect, color='green', linestyle='--', linewidth=2, label=f'真实效应 (${true_effect:.0f})')
ax.set_ylabel('处理效应 ($)')
ax.set_title('不同方法的处理效应估计')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')
for i, effect in enumerate(effects):
ax.text(i, effect + 200, f'${effect:.0f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
# 6. 双重稳健估计
from sklearn.ensemble import RandomForestRegressor
# 倾向得分模型
ps_model_dr = LogisticRegression().fit(df[['age']], df['treatment'])
ps_scores = ps_model_dr.predict_proba(df[['age']])[:, 1]
# 结果模型
outcome_model = RandomForestRegressor(n_estimators=50, random_state=42)
outcome_model.fit(df[['treatment', 'age']], df['salary'])
# 双重稳健估计量
treated_mask = df['treatment'] == 1
control_mask = df['treatment'] == 0
# 根据倾向得分调整
treated_adjusted = (treated_mask.astype(int) * df['salary']) / (ps_scores + 0.01)
control_adjusted = (control_mask.astype(int) * df['salary']) / (1 - ps_scores + 0.01)
# 结果预测
pred_treated = outcome_model.predict(df[['treatment', 'age']].replace({'treatment': 0, 1: 1}))
pred_control = outcome_model.predict(df[['treatment', 'age']].replace({'treatment': 1, 0: 0}))
dr_effect = treated_adjusted.sum() / treated_mask.sum() - control_adjusted.sum() / control_mask.sum()
print(f"\n6. 双重稳健估计: ${dr_effect:.0f}")
# 7. 异质性处理效应
print(f"\n7. 异质性处理效应 (按年龄四分位数):")
for age_q in pd.qcut(df['age'], q=4, duplicates='drop').unique():
mask = (df['age'] >= age_q.left) & (df['age'] < age_q.right)
stratum_data = df[mask]
if (stratum_data['treatment'] == 0).sum() > 0 and (stratum_data['treatment'] == 1).sum() > 0:
treated_mean = stratum_data[stratum_data['treatment'] == 1]['salary'].mean()
control_mean = stratum_data[stratum_data['treatment'] == 0]['salary'].mean()
effect = treated_mean - control_mean
print(f" 年龄 {age_q.left:.0f}-{age_q.right:.0f}: ${effect:.0f}")
# 8. 敏感性分析
print(f"\n8. 敏感性分析 (隐藏混杂变量的影响):")
# 改变隐藏混杂变量与结果的相关性
for hidden_effect in [1000, 2000, 5000, 10000]:
adjusted_effect = regression_effect - hidden_effect * 0.1
print(f" 如果隐藏混杂变量价值 ${hidden_effect}: 效应 = ${adjusted_effect:.0f}")
# 9. 摘要表格
print(f"\n" + "="*60)
print("因果推断摘要")
print("="*60)
print(f"真实处理效应: ${true_effect:,.0f}")
print(f"\n估计值:")
print(f" 简单比较 (有偏): ${naive_effect:,.0f}")
print(f" 回归调整: ${regression_effect:,.0f}")
print(f" 倾向得分匹配: ${psm_effect:,.0f}")
print(f" 分层: ${stratified_effect:,.0f}")
print(f" 双重稳健: ${dr_effect:,.0f}")
print("="*60)
# 10. 因果图 (文本表示)
print(f"\n10. 因果图 (DAG):")
print(f"""
年龄 → 处理 ← (选择偏误)
↓ ↓
└─→ 薪资
解读:
- 年龄是混杂变量
- 处理对薪资有因果影响
- 年龄直接影响薪资
- 年龄影响接受处理的概率
""")
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
每周安装量
0
代码仓库
GitHub 星标数
116
首次出现
1970年1月1日
安全审计
Causal inference determines cause-and-effect relationships and estimates treatment effects, going beyond correlation to understand what causes what.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.preprocessing import StandardScaler
from scipy import stats
# Generate observational data with confounding
np.random.seed(42)
n = 1000
# Confounder: Age (affects both treatment and outcome)
age = np.random.uniform(25, 75, n)
# Treatment: Training program (more likely for younger people)
treatment_prob = 0.3 + 0.3 * (75 - age) / 50 # Inverse relationship with age
treatment = (np.random.uniform(0, 1, n) < treatment_prob).astype(int)
# Outcome: Salary (affected by both treatment and age)
# True causal effect of treatment: +$5000
salary = 40000 + 500 * age + 5000 * treatment + np.random.normal(0, 10000, n)
df = pd.DataFrame({
'age': age,
'treatment': treatment,
'salary': salary,
})
print("Observational Data Summary:")
print(df.describe())
print(f"\nTreatment Rate: {df['treatment'].mean():.1%}")
print(f"Average Salary (Control): ${df[df['treatment']==0]['salary'].mean():.0f}")
print(f"Average Salary (Treatment): ${df[df['treatment']==1]['salary'].mean():.0f}")
# 1. Naive Comparison (BIASED - ignores confounding)
naive_effect = df[df['treatment']==1]['salary'].mean() - df[df['treatment']==0]['salary'].mean()
print(f"\n1. Naive Comparison: ${naive_effect:.0f} (BIASED)")
# 2. Regression Adjustment (Covariate Adjustment)
X = df[['treatment', 'age']]
y = df['salary']
model = LinearRegression()
model.fit(X, y)
regression_effect = model.coef_[0]
print(f"\n2. Regression Adjustment: ${regression_effect:.0f}")
# 3. Propensity Score Matching
# Estimate probability of treatment given covariates
ps_model = LogisticRegression()
ps_model.fit(df[['age']], df['treatment'])
df['propensity_score'] = ps_model.predict_proba(df[['age']])[:, 1]
print(f"\n3. Propensity Score Matching:")
print(f"PS range: [{df['propensity_score'].min():.3f}, {df['propensity_score'].max():.3f}]")
# Matching: find control for each treated unit
matched_pairs = []
treated_units = df[df['treatment'] == 1].index
for treated_idx in treated_units:
treated_ps = df.loc[treated_idx, 'propensity_score']
treated_age = df.loc[treated_idx, 'age']
# Find closest control unit
control_units = df[(df['treatment'] == 0) &
(df['propensity_score'] >= treated_ps - 0.1) &
(df['propensity_score'] <= treated_ps + 0.1)].index
if len(control_units) > 0:
closest_control = min(control_units,
key=lambda x: abs(df.loc[x, 'propensity_score'] - treated_ps))
matched_pairs.append({
'treated_idx': treated_idx,
'control_idx': closest_control,
'treated_salary': df.loc[treated_idx, 'salary'],
'control_salary': df.loc[closest_control, 'salary'],
})
matched_df = pd.DataFrame(matched_pairs)
psm_effect = (matched_df['treated_salary'] - matched_df['control_salary']).mean()
print(f"PSM Effect: ${psm_effect:.0f}")
print(f"Matched pairs: {len(matched_df)}")
# 4. Stratification by Propensity Score
df['ps_stratum'] = pd.qcut(df['propensity_score'], q=5, labels=False, duplicates='drop')
stratified_effects = []
for stratum in df['ps_stratum'].unique():
stratum_data = df[df['ps_stratum'] == stratum]
if (stratum_data['treatment'] == 0).sum() > 0 and (stratum_data['treatment'] == 1).sum() > 0:
treated_mean = stratum_data[stratum_data['treatment'] == 1]['salary'].mean()
control_mean = stratum_data[stratum_data['treatment'] == 0]['salary'].mean()
effect = treated_mean - control_mean
stratified_effects.append(effect)
stratified_effect = np.mean(stratified_effects)
print(f"\n4. Stratification by PS: ${stratified_effect:.0f}")
# 5. Visualization
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# Treatment distribution by age
ax = axes[0, 0]
treated = df[df['treatment'] == 1]
control = df[df['treatment'] == 0]
ax.hist(control['age'], bins=20, alpha=0.6, label='Control', color='blue')
ax.hist(treated['age'], bins=20, alpha=0.6, label='Treated', color='red')
ax.set_xlabel('Age')
ax.set_ylabel('Frequency')
ax.set_title('Age Distribution by Treatment')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')
# Salary vs Age (colored by treatment)
ax = axes[0, 1]
ax.scatter(control['age'], control['salary'], alpha=0.5, label='Control', s=30)
ax.scatter(treated['age'], treated['salary'], alpha=0.5, label='Treated', s=30, color='red')
ax.set_xlabel('Age')
ax.set_ylabel('Salary')
ax.set_title('Salary vs Age by Treatment')
ax.legend()
ax.grid(True, alpha=0.3)
# Propensity Score Distribution
ax = axes[1, 0]
ax.hist(df[df['treatment'] == 0]['propensity_score'], bins=20, alpha=0.6, label='Control', color='blue')
ax.hist(df[df['treatment'] == 1]['propensity_score'], bins=20, alpha=0.6, label='Treated', color='red')
ax.set_xlabel('Propensity Score')
ax.set_ylabel('Frequency')
ax.set_title('Propensity Score Distribution')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')
# Treatment Effect Comparison
ax = axes[1, 1]
methods = ['Naive', 'Regression', 'PSM', 'Stratified']
effects = [naive_effect, regression_effect, psm_effect, stratified_effect]
true_effect = 5000
ax.bar(methods, effects, color=['red', 'orange', 'yellow', 'lightgreen'], alpha=0.7, edgecolor='black')
ax.axhline(y=true_effect, color='green', linestyle='--', linewidth=2, label=f'True Effect (${true_effect:.0f})')
ax.set_ylabel('Treatment Effect ($)')
ax.set_title('Treatment Effect Estimates by Method')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')
for i, effect in enumerate(effects):
ax.text(i, effect + 200, f'${effect:.0f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
# 6. Doubly Robust Estimation
from sklearn.ensemble import RandomForestRegressor
# Propensity score model
ps_model_dr = LogisticRegression().fit(df[['age']], df['treatment'])
ps_scores = ps_model_dr.predict_proba(df[['age']])[:, 1]
# Outcome model
outcome_model = RandomForestRegressor(n_estimators=50, random_state=42)
outcome_model.fit(df[['treatment', 'age']], df['salary'])
# Doubly robust estimator
treated_mask = df['treatment'] == 1
control_mask = df['treatment'] == 0
# Adjust for propensity score
treated_adjusted = (treated_mask.astype(int) * df['salary']) / (ps_scores + 0.01)
control_adjusted = (control_mask.astype(int) * df['salary']) / (1 - ps_scores + 0.01)
# Outcome predictions
pred_treated = outcome_model.predict(df[['treatment', 'age']].replace({'treatment': 0, 1: 1}))
pred_control = outcome_model.predict(df[['treatment', 'age']].replace({'treatment': 1, 0: 0}))
dr_effect = treated_adjusted.sum() / treated_mask.sum() - control_adjusted.sum() / control_mask.sum()
print(f"\n6. Doubly Robust Estimation: ${dr_effect:.0f}")
# 7. Heterogeneous Treatment Effects
print(f"\n7. Heterogeneous Treatment Effects (by Age Quartile):")
for age_q in pd.qcut(df['age'], q=4, duplicates='drop').unique():
mask = (df['age'] >= age_q.left) & (df['age'] < age_q.right)
stratum_data = df[mask]
if (stratum_data['treatment'] == 0).sum() > 0 and (stratum_data['treatment'] == 1).sum() > 0:
treated_mean = stratum_data[stratum_data['treatment'] == 1]['salary'].mean()
control_mean = stratum_data[stratum_data['treatment'] == 0]['salary'].mean()
effect = treated_mean - control_mean
print(f" Age {age_q.left:.0f}-{age_q.right:.0f}: ${effect:.0f}")
# 8. Sensitivity Analysis
print(f"\n8. Sensitivity Analysis (Hidden Confounder Impact):")
# Vary hidden confounder correlation with outcome
for hidden_effect in [1000, 2000, 5000, 10000]:
adjusted_effect = regression_effect - hidden_effect * 0.1
print(f" If hidden confounder worth ${hidden_effect}: Effect = ${adjusted_effect:.0f}")
# 9. Summary Table
print(f"\n" + "="*60)
print("CAUSAL INFERENCE SUMMARY")
print("="*60)
print(f"True Treatment Effect: ${true_effect:,.0f}")
print(f"\nEstimates:")
print(f" Naive (BIASED): ${naive_effect:,.0f}")
print(f" Regression Adjustment: ${regression_effect:,.0f}")
print(f" Propensity Score Matching: ${psm_effect:,.0f}")
print(f" Stratification: ${stratified_effect:,.0f}")
print(f" Doubly Robust: ${dr_effect:,.0f}")
print("="*60)
# 10. Causal Graph (Text representation)
print(f"\n10. Causal Graph (DAG):")
print(f"""
Age → Treatment ← (Selection Bias)
↓ ↓
└─→ Salary
Interpretation:
- Age is a confounder
- Treatment causally affects Salary
- Age directly affects Salary
- Age affects probability of Treatment
""")
Weekly Installs
0
Repository
GitHub Stars
116
First Seen
Jan 1, 1970
Security Audits
专业SEO审计工具:全面网站诊断、技术SEO优化与页面分析指南
57,600 周安装