重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
databricks-expert by personamanagmentlayer/pcl
npx skills add https://github.com/personamanagmentlayer/pcl --skill databricks-expert您是 Databricks 领域的专家,精通 Apache Spark、Delta Lake、MLflow、Notebook、集群管理和湖仓一体架构。您在 Databricks 平台上设计和实施可扩展的数据管道和机器学习工作流。
集群类型与配置:
# Databricks CLI - Create cluster
databricks clusters create --json '{
"cluster_name": "data-engineering-cluster",
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.2xlarge",
"num_workers": 4,
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"autotermination_minutes": 120,
"spark_conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true"
},
"custom_tags": {
"team": "data-engineering",
"environment": "production"
},
"init_scripts": [
{
"dbfs": {
"destination": "dbfs:/databricks/init-scripts/install-libs.sh"
}
}
]
}'
# Job cluster configuration (optimized for cost)
job_cluster_config = {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 3,
"spark_conf": {
"spark.speculation": "true",
"spark.task.maxFailures": "4"
}
}
# High-concurrency cluster (for SQL Analytics)
high_concurrency_config = {
"cluster_name": "sql-analytics-cluster",
"spark_version": "13.3.x-sql-scala2.12",
"node_type_id": "i3.2xlarge",
"autoscale": {
"min_workers": 1,
"max_workers": 10
},
"enable_elastic_disk": True,
"data_security_mode": "USER_ISOLATION"
}
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
实例池:
# Create instance pool
instance_pool_config = {
"instance_pool_name": "production-pool",
"min_idle_instances": 2,
"max_capacity": 20,
"node_type_id": "i3.xlarge",
"idle_instance_autotermination_minutes": 15,
"preloaded_spark_versions": [
"13.3.x-scala2.12"
]
}
# Use instance pool in cluster
cluster_with_pool = {
"cluster_name": "pool-cluster",
"spark_version": "13.3.x-scala2.12",
"instance_pool_id": "0101-120000-abc123",
"autoscale": {
"min_workers": 2,
"max_workers": 8
}
}
创建和管理 Delta 表:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, expr
spark = SparkSession.builder.getOrCreate()
# Create Delta table
df = spark.read.json("/mnt/raw/events")
df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy("date", "event_type") \
.save("/mnt/delta/events")
# Create managed table
df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("production.events")
# Create table with SQL
spark.sql("""
CREATE TABLE IF NOT EXISTS production.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
total_amount DECIMAL(10,2),
status STRING,
metadata MAP<STRING, STRING>
)
USING DELTA
PARTITIONED BY (order_date)
LOCATION '/mnt/delta/orders'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# Add constraints
spark.sql("""
ALTER TABLE production.orders
ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'completed', 'cancelled'))
""")
# Add generated columns
spark.sql("""
ALTER TABLE production.orders
ADD COLUMN month INT GENERATED ALWAYS AS (MONTH(order_date))
""")
MERGE 操作(更新插入):
# Upsert with Delta Lake
from delta.tables import DeltaTable
# Load Delta table
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
# New or updated data
updates_df = spark.read.format("parquet").load("/mnt/staging/order_updates")
# Merge (upsert)
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.updated_at > target.updated_at",
set={
"total_amount": "source.total_amount",
"status": "source.status",
"updated_at": "source.updated_at"
}
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"customer_id": "source.customer_id",
"order_date": "source.order_date",
"total_amount": "source.total_amount",
"status": "source.status",
"created_at": "source.created_at",
"updated_at": "source.updated_at"
}
).execute()
# Merge with delete
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.is_active = true",
set={"status": "source.status"}
).whenMatchedDelete(
condition="source.is_active = false"
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"status": "source.status"
}
).execute()
时间旅行和版本控制:
# Query historical versions
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/mnt/delta/orders")
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15") \
.load("/mnt/delta/orders")
# View history
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
delta_table.history().show()
# Restore to previous version
delta_table.restoreToVersion(5)
delta_table.restoreToTimestamp("2024-01-15")
# Vacuum old files (delete files older than retention period)
delta_table.vacuum(168) # 7 days in hours
# View table details
delta_table.detail().show()
优化与维护:
# Optimize table (compaction)
spark.sql("OPTIMIZE production.orders")
# Optimize with Z-Ordering
spark.sql("OPTIMIZE production.orders ZORDER BY (customer_id, status)")
# Analyze table for statistics
spark.sql("ANALYZE TABLE production.orders COMPUTE STATISTICS")
# Clone table (zero-copy)
spark.sql("""
CREATE TABLE production.orders_clone
SHALLOW CLONE production.orders
""")
# Deep clone (independent copy)
spark.sql("""
CREATE TABLE production.orders_backup
DEEP CLONE production.orders
""")
# Change Data Feed (CDC)
spark.sql("""
ALTER TABLE production.orders
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read changes
changes_df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 5) \
.table("production.orders")
DataFrame 操作:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Read data
df = spark.read.format("delta").table("production.orders")
# Complex transformations
result = df \
.filter(col("order_date") >= "2024-01-01") \
.withColumn("year_month", F.date_format("order_date", "yyyy-MM")) \
.withColumn("order_rank",
F.row_number().over(
Window.partitionBy("customer_id")
.orderBy(F.desc("total_amount"))
)
) \
.groupBy("year_month", "status") \
.agg(
F.count("*").alias("order_count"),
F.sum("total_amount").alias("total_revenue"),
F.avg("total_amount").alias("avg_order_value"),
F.percentile_approx("total_amount", 0.5).alias("median_amount")
) \
.orderBy("year_month", "status")
# Write result
result.write.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "year_month >= '2024-01'") \
.saveAsTable("production.monthly_summary")
# JSON operations
json_df = df.withColumn("parsed_metadata", F.from_json("metadata", schema))
json_df = json_df.withColumn("tags", F.explode("parsed_metadata.tags"))
# Array and struct operations
df.withColumn("first_item", col("items").getItem(0)) \
.withColumn("item_count", F.size("items")) \
.withColumn("total_price",
F.aggregate("items", F.lit(0),
lambda acc, x: acc + x.price))
高级 Spark SQL:
# Register temp view
df.createOrReplaceTempView("orders_temp")
# Complex SQL
result = spark.sql("""
WITH customer_metrics AS (
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(total_amount) AS lifetime_value,
DATEDIFF(MAX(order_date), MIN(order_date)) AS customer_age_days,
COLLECT_LIST(
STRUCT(order_id, order_date, total_amount)
) AS order_history
FROM orders_temp
GROUP BY customer_id
),
customer_segments AS (
SELECT
*,
CASE
WHEN lifetime_value >= 10000 THEN 'VIP'
WHEN lifetime_value >= 5000 THEN 'Gold'
WHEN lifetime_value >= 1000 THEN 'Silver'
ELSE 'Bronze'
END AS segment,
NTILE(10) OVER (ORDER BY lifetime_value DESC) AS decile
FROM customer_metrics
)
SELECT * FROM customer_segments
WHERE segment IN ('VIP', 'Gold')
""")
# Window functions
spark.sql("""
SELECT
order_id,
customer_id,
order_date,
total_amount,
SUM(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total,
AVG(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7_orders
FROM orders_temp
""")
结构化流处理:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema
schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("timestamp", TimestampType()),
StructField("value", DoubleType())
])
# Read stream from Kafka
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Process stream
processed_stream = stream_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
F.window("timestamp", "5 minutes", "1 minute"),
"event_type"
) \
.agg(
F.count("*").alias("event_count"),
F.sum("value").alias("total_value")
)
# Write to Delta
query = processed_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/mnt/checkpoints/events") \
.option("mergeSchema", "true") \
.trigger(processingTime="1 minute") \
.table("production.event_metrics")
# Monitor streaming query
query.status
query.recentProgress
query.lastProgress
实验追踪:
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
# Set experiment
mlflow.set_experiment("/Users/data-science/customer-churn")
# Start run
with mlflow.start_run(run_name="rf_model_v1") as run:
# Parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5
}
mlflow.log_params(params)
# Train model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("feature_importance.png")
# Add tags
mlflow.set_tags({
"team": "data-science",
"model_type": "classification"
})
# Load model from run
run_id = run.info.run_id
model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")
# Register model
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "customer_churn_model")
模型注册与部署:
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Transition model to staging
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="Staging"
)
# Add model description
client.update_model_version(
name="customer_churn_model",
version=1,
description="Random Forest model with hyperparameter tuning"
)
# Load production model
model = mlflow.pyfunc.load_model("models:/customer_churn_model/Production")
# Batch inference with Spark
model_udf = mlflow.pyfunc.spark_udf(
spark,
model_uri="models:/customer_churn_model/Production",
result_type="double"
)
predictions = df.withColumn(
"churn_prediction",
model_udf(*feature_columns)
)
作业配置:
# Create job via API
job_config = {
"name": "daily_etl_pipeline",
"max_concurrent_runs": 1,
"timeout_seconds": 3600,
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"tasks": [
{
"task_key": "extract_data",
"notebook_task": {
"notebook_path": "/Workflows/Extract",
"base_parameters": {
"date": "{{job.start_time.date}}"
}
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "transform_data",
"depends_on": [{"task_key": "extract_data"}],
"notebook_task": {
"notebook_path": "/Workflows/Transform"
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "load_data",
"depends_on": [{"task_key": "transform_data"}],
"spark_python_task": {
"python_file": "dbfs:/scripts/load.py",
"parameters": ["--env", "production"]
},
"job_cluster_key": "etl_cluster"
}
],
"job_clusters": [
{
"job_cluster_key": "etl_cluster",
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4
}
}
],
"email_notifications": {
"on_failure": ["data-eng@company.com"],
"on_success": ["data-eng@company.com"]
}
}
Notebook 实用工具:
# Get parameters
date_param = dbutils.widgets.get("date")
# Exit notebook with value
dbutils.notebook.exit("success")
# Run another notebook
result = dbutils.notebook.run(
"/Shared/ProcessData",
timeout_seconds=600,
arguments={"date": "2024-01-15"}
)
# Access secrets
api_key = dbutils.secrets.get(scope="production", key="api_key")
# File system operations
dbutils.fs.ls("/mnt/data")
dbutils.fs.cp("/mnt/source/file.csv", "/mnt/dest/file.csv")
dbutils.fs.rm("/mnt/data/temp", recurse=True)
目录与模式管理:
# Create catalog
spark.sql("CREATE CATALOG IF NOT EXISTS production")
# Create schema
spark.sql("""
CREATE SCHEMA IF NOT EXISTS production.sales
COMMENT 'Sales data'
LOCATION '/mnt/unity-catalog/sales'
""")
# Grant privileges
spark.sql("GRANT USE CATALOG ON CATALOG production TO `data-engineers`")
spark.sql("GRANT ALL PRIVILEGES ON SCHEMA production.sales TO `data-engineers`")
spark.sql("GRANT SELECT ON TABLE production.sales.orders TO `data-analysts`")
# Three-level namespace
spark.sql("SELECT * FROM production.sales.orders")
# External locations
spark.sql("""
CREATE EXTERNAL LOCATION my_s3_location
URL 's3://my-bucket/data/'
WITH (STORAGE CREDENTIAL my_aws_credential)
""")
# Data lineage (automatic tracking)
spark.sql("SELECT * FROM production.sales.orders").show()
# View lineage in Unity Catalog UI
# Bad: Collect large dataset to driver
large_df.collect() # OOM error
# Good: Use actions that stay distributed
large_df.write.format("delta").save("/mnt/output")
# Bad: Many small files
for file in files:
df = spark.read.json(file)
df.write.format("delta").mode("append").save("/mnt/table")
# Good: Batch writes with optimization
df = spark.read.json("/mnt/source/*")
df.write.format("delta") \
.option("optimizeWrite", "true") \
.mode("append") \
.save("/mnt/table")
# Bad: Join without broadcast hint
large_df.join(small_df, "key")
# Good: Broadcast small table
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
# Bad: No partitioning on large table
df.write.format("delta").save("/mnt/events")
# Good: Partition by date
df.write.format("delta") \
.partitionBy("date") \
.save("/mnt/events")
每周安装次数
72
代码仓库
GitHub 星标数
11
首次出现
2026年1月24日
安全审计
安装于
codex61
opencode61
gemini-cli56
github-copilot53
amp50
kimi-cli50
You are an expert in Databricks with deep knowledge of Apache Spark, Delta Lake, MLflow, notebooks, cluster management, and lakehouse architecture. You design and implement scalable data pipelines and machine learning workflows on the Databricks platform.
Cluster Types and Configuration:
# Databricks CLI - Create cluster
databricks clusters create --json '{
"cluster_name": "data-engineering-cluster",
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.2xlarge",
"num_workers": 4,
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"autotermination_minutes": 120,
"spark_conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true"
},
"custom_tags": {
"team": "data-engineering",
"environment": "production"
},
"init_scripts": [
{
"dbfs": {
"destination": "dbfs:/databricks/init-scripts/install-libs.sh"
}
}
]
}'
# Job cluster configuration (optimized for cost)
job_cluster_config = {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 3,
"spark_conf": {
"spark.speculation": "true",
"spark.task.maxFailures": "4"
}
}
# High-concurrency cluster (for SQL Analytics)
high_concurrency_config = {
"cluster_name": "sql-analytics-cluster",
"spark_version": "13.3.x-sql-scala2.12",
"node_type_id": "i3.2xlarge",
"autoscale": {
"min_workers": 1,
"max_workers": 10
},
"enable_elastic_disk": True,
"data_security_mode": "USER_ISOLATION"
}
Instance Pools:
# Create instance pool
instance_pool_config = {
"instance_pool_name": "production-pool",
"min_idle_instances": 2,
"max_capacity": 20,
"node_type_id": "i3.xlarge",
"idle_instance_autotermination_minutes": 15,
"preloaded_spark_versions": [
"13.3.x-scala2.12"
]
}
# Use instance pool in cluster
cluster_with_pool = {
"cluster_name": "pool-cluster",
"spark_version": "13.3.x-scala2.12",
"instance_pool_id": "0101-120000-abc123",
"autoscale": {
"min_workers": 2,
"max_workers": 8
}
}
Creating and Managing Delta Tables:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, expr
spark = SparkSession.builder.getOrCreate()
# Create Delta table
df = spark.read.json("/mnt/raw/events")
df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy("date", "event_type") \
.save("/mnt/delta/events")
# Create managed table
df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("production.events")
# Create table with SQL
spark.sql("""
CREATE TABLE IF NOT EXISTS production.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
total_amount DECIMAL(10,2),
status STRING,
metadata MAP<STRING, STRING>
)
USING DELTA
PARTITIONED BY (order_date)
LOCATION '/mnt/delta/orders'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# Add constraints
spark.sql("""
ALTER TABLE production.orders
ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'completed', 'cancelled'))
""")
# Add generated columns
spark.sql("""
ALTER TABLE production.orders
ADD COLUMN month INT GENERATED ALWAYS AS (MONTH(order_date))
""")
MERGE Operations (Upserts):
# Upsert with Delta Lake
from delta.tables import DeltaTable
# Load Delta table
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
# New or updated data
updates_df = spark.read.format("parquet").load("/mnt/staging/order_updates")
# Merge (upsert)
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.updated_at > target.updated_at",
set={
"total_amount": "source.total_amount",
"status": "source.status",
"updated_at": "source.updated_at"
}
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"customer_id": "source.customer_id",
"order_date": "source.order_date",
"total_amount": "source.total_amount",
"status": "source.status",
"created_at": "source.created_at",
"updated_at": "source.updated_at"
}
).execute()
# Merge with delete
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.is_active = true",
set={"status": "source.status"}
).whenMatchedDelete(
condition="source.is_active = false"
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"status": "source.status"
}
).execute()
Time Travel and Versioning:
# Query historical versions
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/mnt/delta/orders")
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15") \
.load("/mnt/delta/orders")
# View history
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
delta_table.history().show()
# Restore to previous version
delta_table.restoreToVersion(5)
delta_table.restoreToTimestamp("2024-01-15")
# Vacuum old files (delete files older than retention period)
delta_table.vacuum(168) # 7 days in hours
# View table details
delta_table.detail().show()
Optimization and Maintenance:
# Optimize table (compaction)
spark.sql("OPTIMIZE production.orders")
# Optimize with Z-Ordering
spark.sql("OPTIMIZE production.orders ZORDER BY (customer_id, status)")
# Analyze table for statistics
spark.sql("ANALYZE TABLE production.orders COMPUTE STATISTICS")
# Clone table (zero-copy)
spark.sql("""
CREATE TABLE production.orders_clone
SHALLOW CLONE production.orders
""")
# Deep clone (independent copy)
spark.sql("""
CREATE TABLE production.orders_backup
DEEP CLONE production.orders
""")
# Change Data Feed (CDC)
spark.sql("""
ALTER TABLE production.orders
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read changes
changes_df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 5) \
.table("production.orders")
DataFrame Operations:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Read data
df = spark.read.format("delta").table("production.orders")
# Complex transformations
result = df \
.filter(col("order_date") >= "2024-01-01") \
.withColumn("year_month", F.date_format("order_date", "yyyy-MM")) \
.withColumn("order_rank",
F.row_number().over(
Window.partitionBy("customer_id")
.orderBy(F.desc("total_amount"))
)
) \
.groupBy("year_month", "status") \
.agg(
F.count("*").alias("order_count"),
F.sum("total_amount").alias("total_revenue"),
F.avg("total_amount").alias("avg_order_value"),
F.percentile_approx("total_amount", 0.5).alias("median_amount")
) \
.orderBy("year_month", "status")
# Write result
result.write.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "year_month >= '2024-01'") \
.saveAsTable("production.monthly_summary")
# JSON operations
json_df = df.withColumn("parsed_metadata", F.from_json("metadata", schema))
json_df = json_df.withColumn("tags", F.explode("parsed_metadata.tags"))
# Array and struct operations
df.withColumn("first_item", col("items").getItem(0)) \
.withColumn("item_count", F.size("items")) \
.withColumn("total_price",
F.aggregate("items", F.lit(0),
lambda acc, x: acc + x.price))
Advanced Spark SQL:
# Register temp view
df.createOrReplaceTempView("orders_temp")
# Complex SQL
result = spark.sql("""
WITH customer_metrics AS (
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(total_amount) AS lifetime_value,
DATEDIFF(MAX(order_date), MIN(order_date)) AS customer_age_days,
COLLECT_LIST(
STRUCT(order_id, order_date, total_amount)
) AS order_history
FROM orders_temp
GROUP BY customer_id
),
customer_segments AS (
SELECT
*,
CASE
WHEN lifetime_value >= 10000 THEN 'VIP'
WHEN lifetime_value >= 5000 THEN 'Gold'
WHEN lifetime_value >= 1000 THEN 'Silver'
ELSE 'Bronze'
END AS segment,
NTILE(10) OVER (ORDER BY lifetime_value DESC) AS decile
FROM customer_metrics
)
SELECT * FROM customer_segments
WHERE segment IN ('VIP', 'Gold')
""")
# Window functions
spark.sql("""
SELECT
order_id,
customer_id,
order_date,
total_amount,
SUM(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total,
AVG(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7_orders
FROM orders_temp
""")
Structured Streaming:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema
schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("timestamp", TimestampType()),
StructField("value", DoubleType())
])
# Read stream from Kafka
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Process stream
processed_stream = stream_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
F.window("timestamp", "5 minutes", "1 minute"),
"event_type"
) \
.agg(
F.count("*").alias("event_count"),
F.sum("value").alias("total_value")
)
# Write to Delta
query = processed_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/mnt/checkpoints/events") \
.option("mergeSchema", "true") \
.trigger(processingTime="1 minute") \
.table("production.event_metrics")
# Monitor streaming query
query.status
query.recentProgress
query.lastProgress
Experiment Tracking:
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
# Set experiment
mlflow.set_experiment("/Users/data-science/customer-churn")
# Start run
with mlflow.start_run(run_name="rf_model_v1") as run:
# Parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5
}
mlflow.log_params(params)
# Train model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("feature_importance.png")
# Add tags
mlflow.set_tags({
"team": "data-science",
"model_type": "classification"
})
# Load model from run
run_id = run.info.run_id
model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")
# Register model
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "customer_churn_model")
Model Registry and Deployment:
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Transition model to staging
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="Staging"
)
# Add model description
client.update_model_version(
name="customer_churn_model",
version=1,
description="Random Forest model with hyperparameter tuning"
)
# Load production model
model = mlflow.pyfunc.load_model("models:/customer_churn_model/Production")
# Batch inference with Spark
model_udf = mlflow.pyfunc.spark_udf(
spark,
model_uri="models:/customer_churn_model/Production",
result_type="double"
)
predictions = df.withColumn(
"churn_prediction",
model_udf(*feature_columns)
)
Job Configuration:
# Create job via API
job_config = {
"name": "daily_etl_pipeline",
"max_concurrent_runs": 1,
"timeout_seconds": 3600,
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"tasks": [
{
"task_key": "extract_data",
"notebook_task": {
"notebook_path": "/Workflows/Extract",
"base_parameters": {
"date": "{{job.start_time.date}}"
}
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "transform_data",
"depends_on": [{"task_key": "extract_data"}],
"notebook_task": {
"notebook_path": "/Workflows/Transform"
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "load_data",
"depends_on": [{"task_key": "transform_data"}],
"spark_python_task": {
"python_file": "dbfs:/scripts/load.py",
"parameters": ["--env", "production"]
},
"job_cluster_key": "etl_cluster"
}
],
"job_clusters": [
{
"job_cluster_key": "etl_cluster",
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4
}
}
],
"email_notifications": {
"on_failure": ["data-eng@company.com"],
"on_success": ["data-eng@company.com"]
}
}
Notebook Utilities:
# Get parameters
date_param = dbutils.widgets.get("date")
# Exit notebook with value
dbutils.notebook.exit("success")
# Run another notebook
result = dbutils.notebook.run(
"/Shared/ProcessData",
timeout_seconds=600,
arguments={"date": "2024-01-15"}
)
# Access secrets
api_key = dbutils.secrets.get(scope="production", key="api_key")
# File system operations
dbutils.fs.ls("/mnt/data")
dbutils.fs.cp("/mnt/source/file.csv", "/mnt/dest/file.csv")
dbutils.fs.rm("/mnt/data/temp", recurse=True)
Catalog and Schema Management:
# Create catalog
spark.sql("CREATE CATALOG IF NOT EXISTS production")
# Create schema
spark.sql("""
CREATE SCHEMA IF NOT EXISTS production.sales
COMMENT 'Sales data'
LOCATION '/mnt/unity-catalog/sales'
""")
# Grant privileges
spark.sql("GRANT USE CATALOG ON CATALOG production TO `data-engineers`")
spark.sql("GRANT ALL PRIVILEGES ON SCHEMA production.sales TO `data-engineers`")
spark.sql("GRANT SELECT ON TABLE production.sales.orders TO `data-analysts`")
# Three-level namespace
spark.sql("SELECT * FROM production.sales.orders")
# External locations
spark.sql("""
CREATE EXTERNAL LOCATION my_s3_location
URL 's3://my-bucket/data/'
WITH (STORAGE CREDENTIAL my_aws_credential)
""")
# Data lineage (automatic tracking)
spark.sql("SELECT * FROM production.sales.orders").show()
# View lineage in Unity Catalog UI
# Bad: Collect large dataset to driver
large_df.collect() # OOM error
# Good: Use actions that stay distributed
large_df.write.format("delta").save("/mnt/output")
# Bad: Many small files
for file in files:
df = spark.read.json(file)
df.write.format("delta").mode("append").save("/mnt/table")
# Good: Batch writes with optimization
df = spark.read.json("/mnt/source/*")
df.write.format("delta") \
.option("optimizeWrite", "true") \
.mode("append") \
.save("/mnt/table")
# Bad: Join without broadcast hint
large_df.join(small_df, "key")
# Good: Broadcast small table
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
# Bad: No partitioning on large table
df.write.format("delta").save("/mnt/events")
# Good: Partition by date
df.write.format("delta") \
.partitionBy("date") \
.save("/mnt/events")
Weekly Installs
72
Repository
GitHub Stars
11
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex61
opencode61
gemini-cli56
github-copilot53
amp50
kimi-cli50
Supabase Postgres 最佳实践指南 - 8大类别性能优化规则与SQL示例
81,400 周安装