ML Pipeline Automation by aj-geddes/useful-ai-prompts
npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill 'ML Pipeline Automation'ML 管道自动化编排从数据接收到模型部署的整个机器学习工作流程,确保可重复性、可扩展性和可靠性。
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblib
import logging
from datetime import datetime
import json
import os
# Airflow imports
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# MLflow for tracking
import mlflow
import mlflow.sklearn
# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=== 1. Modular Pipeline Functions ===")
# Data ingestion
def ingest_data(**context):
"""Ingest and load data"""
logger.info("Starting data ingestion...")
X, y = make_classification(n_samples=2000, n_features=30,
n_informative=20, random_state=42)
data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])])
data['target'] = y
# Save to disk
data_path = '/tmp/raw_data.csv'
data.to_csv(data_path, index=False)
context['task_instance'].xcom_push(key='data_path', value=data_path)
logger.info(f"Data ingested: {len(data)} rows")
return {'status': 'success', 'samples': len(data)}
# Data processing
def process_data(**context):
"""Clean and preprocess data"""
logger.info("Starting data processing...")
# Get data path from previous task
task_instance = context['task_instance']
data_path = task_instance.xcom_pull(key='data_path', task_ids='ingest_data')
data = pd.read_csv(data_path)
# Handle missing values
data = data.fillna(data.mean())
# Remove duplicates
data = data.drop_duplicates()
# Remove outliers (simple approach)
numeric_cols = data.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
Q1 = data[col].quantile(0.25)
Q3 = data[col].quantile(0.75)
IQR = Q3 - Q1
data = data[(data[col] >= Q1 - 1.5 * IQR) & (data[col] <= Q3 + 1.5 * IQR)]
processed_path = '/tmp/processed_data.csv'
data.to_csv(processed_path, index=False)
task_instance.xcom_push(key='processed_path', value=processed_path)
logger.info(f"Data processed: {len(data)} rows after cleaning")
return {'status': 'success', 'rows_remaining': len(data)}
# Feature engineering
def engineer_features(**context):
"""Create new features"""
logger.info("Starting feature engineering...")
task_instance = context['task_instance']
processed_path = task_instance.xcom_pull(key='processed_path', task_ids='process_data')
data = pd.read_csv(processed_path)
# Create interaction features
feature_cols = [col for col in data.columns if col.startswith('feature_')]
for i in range(min(5, len(feature_cols))):
for j in range(i+1, min(6, len(feature_cols))):
data[f'interaction_{i}_{j}'] = data[feature_cols[i]] * data[feature_cols[j]]
# Create polynomial features
for col in feature_cols[:5]:
data[f'{col}_squared'] = data[col] ** 2
engineered_path = '/tmp/engineered_data.csv'
data.to_csv(engineered_path, index=False)
task_instance.xcom_push(key='engineered_path', value=engineered_path)
logger.info(f"Features engineered: {len(data.columns)} total features")
return {'status': 'success', 'features': len(data.columns)}
# Train model
def train_model(**context):
"""Train ML model"""
logger.info("Starting model training...")
task_instance = context['task_instance']
engineered_path = task_instance.xcom_pull(key='engineered_path', task_ids='engineer_features')
data = pd.read_csv(engineered_path)
X = data.drop('target', axis=1)
y = data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=15, random_state=42)
model.fit(X_train_scaled, y_train)
# Evaluate
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Save model
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'
joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)
task_instance.xcom_push(key='model_path', value=model_path)
task_instance.xcom_push(key='scaler_path', value=scaler_path)
# Log to MLflow
with mlflow.start_run():
mlflow.log_param('n_estimators', 100)
mlflow.log_param('max_depth', 15)
mlflow.log_metric('accuracy', accuracy)
mlflow.log_metric('f1_score', f1)
mlflow.sklearn.log_model(model, 'model')
logger.info(f"Model trained: Accuracy={accuracy:.4f}, F1={f1:.4f}")
return {'status': 'success', 'accuracy': accuracy, 'f1_score': f1}
# Validate model
def validate_model(**context):
"""Validate model performance"""
logger.info("Starting model validation...")
task_instance = context['task_instance']
model_path = task_instance.xcom_pull(key='model_path', task_ids='train_model')
engineered_path = task_instance.xcom_pull(key='engineered_path', task_ids='engineer_features')
model = joblib.load(model_path)
data = pd.read_csv(engineered_path)
X = data.drop('target', axis=1)
y = data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler_path = task_instance.xcom_pull(key='scaler_path', task_ids='train_model')
scaler = joblib.load(scaler_path)
X_test_scaled = scaler.transform(X_test)
# Validate
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
validation_result = {
'status': 'success' if accuracy > 0.85 else 'failed',
'accuracy': accuracy,
'threshold': 0.85,
'timestamp': datetime.now().isoformat()
}
task_instance.xcom_push(key='validation_result', value=json.dumps(validation_result))
logger.info(f"Validation result: {validation_result}")
return validation_result
# Deploy model
def deploy_model(**context):
"""Deploy validated model"""
logger.info("Starting model deployment...")
task_instance = context['task_instance']
validation_result = json.loads(task_instance.xcom_pull(
key='validation_result', task_ids='validate_model'))
if validation_result['status'] != 'success':
logger.warning("Validation failed, deployment skipped")
return {'status': 'skipped', 'reason': 'validation_failed'}
model_path = task_instance.xcom_pull(key='model_path', task_ids='train_model')
scaler_path = task_instance.xcom_pull(key='scaler_path', task_ids='train_model')
# Simulate deployment
deploy_path = '/tmp/deployed_model/'
os.makedirs(deploy_path, exist_ok=True)
import shutil
shutil.copy(model_path, os.path.join(deploy_path, 'model.pkl'))
shutil.copy(scaler_path, os.path.join(deploy_path, 'scaler.pkl'))
logger.info(f"Model deployed to {deploy_path}")
return {'status': 'success', 'deploy_path': deploy_path}
# 2. Airflow DAG Definition
print("\n=== 2. Airflow DAG ===")
dag_definition = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'ml_pipeline_dag',
default_args=default_args,
description='End-to-end ML pipeline',
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Task 1: Ingest Data
ingest = PythonOperator(
task_id='ingest_data',
python_callable=ingest_data,
)
# Task 2: Process Data
process = PythonOperator(
task_id='process_data',
python_callable=process_data,
)
# Task 3: Engineer Features
engineer = PythonOperator(
task_id='engineer_features',
python_callable=engineer_features,
)
# Task 4: Train Model
train = PythonOperator(
task_id='train_model',
python_callable=train_model,
)
# Task 5: Validate Model
validate = PythonOperator(
task_id='validate_model',
python_callable=validate_model,
)
# Task 6: Deploy Model
deploy = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
)
# Define dependencies
ingest >> process >> engineer >> train >> validate >> deploy
'''
print("Airflow DAG defined with 6 tasks")
# 3. Pipeline execution summary
print("\n=== 3. Pipeline Execution ===")
class PipelineOrchestrator:
def __init__(self):
self.execution_log = []
self.start_time = None
self.end_time = None
def run_pipeline(self):
self.start_time = datetime.now()
logger.info("Starting ML pipeline execution")
try:
# Execute pipeline tasks
result1 = ingest_data(task_instance=self)
self.execution_log.append(('ingest_data', result1))
result2 = process_data(task_instance=self)
self.execution_log.append(('process_data', result2))
result3 = engineer_features(task_instance=self)
self.execution_log.append(('engineer_features', result3))
result4 = train_model(task_instance=self)
self.execution_log.append(('train_model', result4))
result5 = validate_model(task_instance=self)
self.execution_log.append(('validate_model', result5))
result6 = deploy_model(task_instance=self)
self.execution_log.append(('deploy_model', result6))
self.end_time = datetime.now()
logger.info("Pipeline execution completed successfully")
except Exception as e:
logger.error(f"Pipeline execution failed: {str(e)}")
def xcom_push(self, key, value):
if not hasattr(self, 'xcom_storage'):
self.xcom_storage = {}
self.xcom_storage[key] = value
def xcom_pull(self, key, task_ids):
if hasattr(self, 'xcom_storage') and key in self.xcom_storage:
return self.xcom_storage[key]
return None
def get_summary(self):
duration = (self.end_time - self.start_time).total_seconds() if self.end_time else 0
return {
'start_time': self.start_time.isoformat() if self.start_time else None,
'end_time': self.end_time.isoformat() if self.end_time else None,
'duration_seconds': duration,
'tasks_executed': len(self.execution_log),
'execution_log': self.execution_log
}
# Execute pipeline
orchestrator = PipelineOrchestrator()
orchestrator.run_pipeline()
summary = orchestrator.get_summary()
print("\n=== Pipeline Summary ===")
for key, value in summary.items():
if key != 'execution_log':
print(f"{key}: {value}")
print("\nTask Execution Log:")
for task_name, result in summary['execution_log']:
print(f" {task_name}: {result}")
print("\nML pipeline automation setup completed!")
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
每周安装数
0
代码仓库
GitHub 星标数
121
首次出现
1970年1月1日
安全审计
ML pipeline automation orchestrates the entire machine learning workflow from data ingestion through model deployment, ensuring reproducibility, scalability, and reliability.
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblib
import logging
from datetime import datetime
import json
import os
# Airflow imports
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# MLflow for tracking
import mlflow
import mlflow.sklearn
# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=== 1. Modular Pipeline Functions ===")
# Data ingestion
def ingest_data(**context):
"""Ingest and load data"""
logger.info("Starting data ingestion...")
X, y = make_classification(n_samples=2000, n_features=30,
n_informative=20, random_state=42)
data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])])
data['target'] = y
# Save to disk
data_path = '/tmp/raw_data.csv'
data.to_csv(data_path, index=False)
context['task_instance'].xcom_push(key='data_path', value=data_path)
logger.info(f"Data ingested: {len(data)} rows")
return {'status': 'success', 'samples': len(data)}
# Data processing
def process_data(**context):
"""Clean and preprocess data"""
logger.info("Starting data processing...")
# Get data path from previous task
task_instance = context['task_instance']
data_path = task_instance.xcom_pull(key='data_path', task_ids='ingest_data')
data = pd.read_csv(data_path)
# Handle missing values
data = data.fillna(data.mean())
# Remove duplicates
data = data.drop_duplicates()
# Remove outliers (simple approach)
numeric_cols = data.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
Q1 = data[col].quantile(0.25)
Q3 = data[col].quantile(0.75)
IQR = Q3 - Q1
data = data[(data[col] >= Q1 - 1.5 * IQR) & (data[col] <= Q3 + 1.5 * IQR)]
processed_path = '/tmp/processed_data.csv'
data.to_csv(processed_path, index=False)
task_instance.xcom_push(key='processed_path', value=processed_path)
logger.info(f"Data processed: {len(data)} rows after cleaning")
return {'status': 'success', 'rows_remaining': len(data)}
# Feature engineering
def engineer_features(**context):
"""Create new features"""
logger.info("Starting feature engineering...")
task_instance = context['task_instance']
processed_path = task_instance.xcom_pull(key='processed_path', task_ids='process_data')
data = pd.read_csv(processed_path)
# Create interaction features
feature_cols = [col for col in data.columns if col.startswith('feature_')]
for i in range(min(5, len(feature_cols))):
for j in range(i+1, min(6, len(feature_cols))):
data[f'interaction_{i}_{j}'] = data[feature_cols[i]] * data[feature_cols[j]]
# Create polynomial features
for col in feature_cols[:5]:
data[f'{col}_squared'] = data[col] ** 2
engineered_path = '/tmp/engineered_data.csv'
data.to_csv(engineered_path, index=False)
task_instance.xcom_push(key='engineered_path', value=engineered_path)
logger.info(f"Features engineered: {len(data.columns)} total features")
return {'status': 'success', 'features': len(data.columns)}
# Train model
def train_model(**context):
"""Train ML model"""
logger.info("Starting model training...")
task_instance = context['task_instance']
engineered_path = task_instance.xcom_pull(key='engineered_path', task_ids='engineer_features')
data = pd.read_csv(engineered_path)
X = data.drop('target', axis=1)
y = data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=15, random_state=42)
model.fit(X_train_scaled, y_train)
# Evaluate
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Save model
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'
joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)
task_instance.xcom_push(key='model_path', value=model_path)
task_instance.xcom_push(key='scaler_path', value=scaler_path)
# Log to MLflow
with mlflow.start_run():
mlflow.log_param('n_estimators', 100)
mlflow.log_param('max_depth', 15)
mlflow.log_metric('accuracy', accuracy)
mlflow.log_metric('f1_score', f1)
mlflow.sklearn.log_model(model, 'model')
logger.info(f"Model trained: Accuracy={accuracy:.4f}, F1={f1:.4f}")
return {'status': 'success', 'accuracy': accuracy, 'f1_score': f1}
# Validate model
def validate_model(**context):
"""Validate model performance"""
logger.info("Starting model validation...")
task_instance = context['task_instance']
model_path = task_instance.xcom_pull(key='model_path', task_ids='train_model')
engineered_path = task_instance.xcom_pull(key='engineered_path', task_ids='engineer_features')
model = joblib.load(model_path)
data = pd.read_csv(engineered_path)
X = data.drop('target', axis=1)
y = data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler_path = task_instance.xcom_pull(key='scaler_path', task_ids='train_model')
scaler = joblib.load(scaler_path)
X_test_scaled = scaler.transform(X_test)
# Validate
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
validation_result = {
'status': 'success' if accuracy > 0.85 else 'failed',
'accuracy': accuracy,
'threshold': 0.85,
'timestamp': datetime.now().isoformat()
}
task_instance.xcom_push(key='validation_result', value=json.dumps(validation_result))
logger.info(f"Validation result: {validation_result}")
return validation_result
# Deploy model
def deploy_model(**context):
"""Deploy validated model"""
logger.info("Starting model deployment...")
task_instance = context['task_instance']
validation_result = json.loads(task_instance.xcom_pull(
key='validation_result', task_ids='validate_model'))
if validation_result['status'] != 'success':
logger.warning("Validation failed, deployment skipped")
return {'status': 'skipped', 'reason': 'validation_failed'}
model_path = task_instance.xcom_pull(key='model_path', task_ids='train_model')
scaler_path = task_instance.xcom_pull(key='scaler_path', task_ids='train_model')
# Simulate deployment
deploy_path = '/tmp/deployed_model/'
os.makedirs(deploy_path, exist_ok=True)
import shutil
shutil.copy(model_path, os.path.join(deploy_path, 'model.pkl'))
shutil.copy(scaler_path, os.path.join(deploy_path, 'scaler.pkl'))
logger.info(f"Model deployed to {deploy_path}")
return {'status': 'success', 'deploy_path': deploy_path}
# 2. Airflow DAG Definition
print("\n=== 2. Airflow DAG ===")
dag_definition = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'ml_pipeline_dag',
default_args=default_args,
description='End-to-end ML pipeline',
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Task 1: Ingest Data
ingest = PythonOperator(
task_id='ingest_data',
python_callable=ingest_data,
)
# Task 2: Process Data
process = PythonOperator(
task_id='process_data',
python_callable=process_data,
)
# Task 3: Engineer Features
engineer = PythonOperator(
task_id='engineer_features',
python_callable=engineer_features,
)
# Task 4: Train Model
train = PythonOperator(
task_id='train_model',
python_callable=train_model,
)
# Task 5: Validate Model
validate = PythonOperator(
task_id='validate_model',
python_callable=validate_model,
)
# Task 6: Deploy Model
deploy = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
)
# Define dependencies
ingest >> process >> engineer >> train >> validate >> deploy
'''
print("Airflow DAG defined with 6 tasks")
# 3. Pipeline execution summary
print("\n=== 3. Pipeline Execution ===")
class PipelineOrchestrator:
def __init__(self):
self.execution_log = []
self.start_time = None
self.end_time = None
def run_pipeline(self):
self.start_time = datetime.now()
logger.info("Starting ML pipeline execution")
try:
# Execute pipeline tasks
result1 = ingest_data(task_instance=self)
self.execution_log.append(('ingest_data', result1))
result2 = process_data(task_instance=self)
self.execution_log.append(('process_data', result2))
result3 = engineer_features(task_instance=self)
self.execution_log.append(('engineer_features', result3))
result4 = train_model(task_instance=self)
self.execution_log.append(('train_model', result4))
result5 = validate_model(task_instance=self)
self.execution_log.append(('validate_model', result5))
result6 = deploy_model(task_instance=self)
self.execution_log.append(('deploy_model', result6))
self.end_time = datetime.now()
logger.info("Pipeline execution completed successfully")
except Exception as e:
logger.error(f"Pipeline execution failed: {str(e)}")
def xcom_push(self, key, value):
if not hasattr(self, 'xcom_storage'):
self.xcom_storage = {}
self.xcom_storage[key] = value
def xcom_pull(self, key, task_ids):
if hasattr(self, 'xcom_storage') and key in self.xcom_storage:
return self.xcom_storage[key]
return None
def get_summary(self):
duration = (self.end_time - self.start_time).total_seconds() if self.end_time else 0
return {
'start_time': self.start_time.isoformat() if self.start_time else None,
'end_time': self.end_time.isoformat() if self.end_time else None,
'duration_seconds': duration,
'tasks_executed': len(self.execution_log),
'execution_log': self.execution_log
}
# Execute pipeline
orchestrator = PipelineOrchestrator()
orchestrator.run_pipeline()
summary = orchestrator.get_summary()
print("\n=== Pipeline Summary ===")
for key, value in summary.items():
if key != 'execution_log':
print(f"{key}: {value}")
print("\nTask Execution Log:")
for task_name, result in summary['execution_log']:
print(f" {task_name}: {result}")
print("\nML pipeline automation setup completed!")
Weekly Installs
0
Repository
GitHub Stars
121
First Seen
Jan 1, 1970
Security Audits
Azure RBAC 权限管理工具:查找最小角色、创建自定义角色与自动化分配
117,000 周安装