重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
apache-spark-data-processing by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill apache-spark-data-processing一项全面的技能,用于掌握 Apache Spark 数据处理,从基础的 RDD 操作到高级的流处理、SQL 和机器学习工作流。学习构建可扩展的分布式数据管道和分析系统。
在以下场景中使用 Apache Spark:
不适用于:
RDD 是 Spark 的基本数据抽象——不可变的、可并行处理的分布式对象集合。
关键特性:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
RDD 操作:
何时使用 RDD:
尽可能使用 DataFrame/Dataset - 它们通过 Catalyst 优化器提供自动优化。
DataFrame 是组织成命名列的分布式数据集合——类似于数据库表或 pandas DataFrame,但具有强大的优化功能。
DataFrame:
Dataset:
相对于 RDD 的主要优势:
Spark 使用惰性求值来优化执行:
好处:
数据被划分为分区以进行并行处理:
分区数量考虑因素:
repartition() 或 coalesce() 来调整分区数量将频繁访问的数据缓存在内存中以提升性能:
# 在内存中缓存 DataFrame
df.cache() # persist(StorageLevel.MEMORY_AND_DISK) 的简写
# 不同的存储级别
df.persist(StorageLevel.MEMORY_ONLY) # 快,但如果被驱逐可能会丢失数据
df.persist(StorageLevel.MEMORY_AND_DISK) # 如果内存已满则溢出到磁盘
df.persist(StorageLevel.DISK_ONLY) # 仅存储在磁盘上
df.persist(StorageLevel.MEMORY_ONLY_SER) # 在内存中序列化
# 完成后取消持久化
df.unpersist()
何时缓存:
何时不缓存:
Spark SQL 允许您使用 SQL 或 DataFrame API 查询结构化数据:
性能特性:
用于高效分布式计算的共享变量:
广播变量:
在每个节点上缓存的只读变量
高效共享大型只读数据
避免在每个任务中发送大数据
lookup_table = {"key1": "value1", "key2": "value2"} broadcast_lookup = sc.broadcast(lookup_table)
rdd.map(lambda x: broadcast_lookup.value.get(x, "default"))
累加器:
用于跨任务聚合值的只写变量
用于分布式操作中的计数器和求和
只有驱动程序可以读取最终累加值
error_count = sc.accumulator(0)
rdd.foreach(lambda x: error_count.add(1) if is_error(x) else None)
print(f"Total errors: {error_count.value}")
从各种源创建 DataFrame:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()
# 从结构化数据创建
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
columns = ["name", "id"]
df = spark.createDataFrame(data, columns)
# 从文件创建
df_json = spark.read.json("path/to/file.json")
df_parquet = spark.read.parquet("path/to/file.parquet")
df_csv = spark.read.option("header", "true").csv("path/to/file.csv")
# 从 JDBC 源创建
df_jdbc = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:port/database") \
.option("dbtable", "table_name") \
.option("user", "username") \
.option("password", "password") \
.load()
常见的 DataFrame 转换:
# 选择列
df.select("name", "age").show()
# 过滤行
df.filter(df.age > 21).show()
df.where(df["age"] > 21).show() # 替代语法
# 添加/修改列
from pyspark.sql.functions import col, lit
df.withColumn("age_plus_10", col("age") + 10).show()
df.withColumn("country", lit("USA")).show()
# 聚合
df.groupBy("department").count().show()
df.groupBy("department").agg({"salary": "avg", "age": "max"}).show()
# 排序
df.orderBy("age").show()
df.orderBy(col("age").desc()).show()
# 连接
df1.join(df2, df1.id == df2.user_id, "inner").show()
df1.join(df2, "id", "left_outer").show()
# 并集
df1.union(df2).show()
在 DataFrame 上执行 SQL:
# 将 DataFrame 注册为临时视图
df.createOrReplaceTempView("people")
# 运行 SQL 查询
sql_result = spark.sql("SELECT name FROM people WHERE age > 21")
sql_result.show()
# 复杂查询
result = spark.sql("""
SELECT
department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MAX(age) as max_age
FROM people
WHERE age > 25
GROUP BY department
HAVING COUNT(*) > 5
ORDER BY avg_salary DESC
""")
result.show()
Spark SQL 支持多种数据格式:
Parquet:
列式存储格式
出色的压缩和查询性能
模式嵌入在文件中
支持谓词下推和列剪裁
df.write.parquet("output/path", mode="overwrite", compression="snappy")
df = spark.read.parquet("output/path").filter(col("date") == "2025-01-01")
ORC:
与 Parquet 类似,压缩稍好
与 Hive 集成时首选
内置索引以加速查询
df.write.orc("output/path", mode="overwrite") df = spark.read.orc("output/path")
JSON:
人类可读但效率较低
读取时进行模式推断
适用于嵌套/复杂数据
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ])
df = spark.read.schema(schema).json("data.json")
CSV:
广泛兼容但速度慢
需要标头推断或显式模式
压缩效益最小
df.write.csv("output.csv", header=True, mode="overwrite") df = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")
使用窗口函数进行高级分析:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum, avg
# 定义窗口规范
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
# 排名函数
df.withColumn("rank", rank().over(window_spec)).show()
df.withColumn("row_num", row_number().over(window_spec)).show()
df.withColumn("dense_rank", dense_rank().over(window_spec)).show()
# 窗口上的聚合函数
df.withColumn("dept_avg_salary", avg("salary").over(window_spec)).show()
df.withColumn("running_total", sum("salary").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()
# 偏移函数
df.withColumn("prev_salary", lag("salary", 1).over(window_spec)).show()
df.withColumn("next_salary", lead("salary", 1).over(window_spec)).show()
创建自定义转换:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
# Python UDF
def categorize_age(age):
if age < 18:
return "Minor"
elif age < 65:
return "Adult"
else:
return "Senior"
categorize_udf = udf(categorize_age, StringType())
df.withColumn("age_category", categorize_udf(col("age"))).show()
# Pandas UDF
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(IntegerType())
def square(series: pd.Series) -> pd.Series:
return series ** 2
df.withColumn("age_squared", square(col("age"))).show()
UDF 性能提示:
map:对每个元素应用函数
# RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
squared = rdd.map(lambda x: x * 2) # [2, 4, 6, 8, 10]
# DataFrame
from pyspark.sql.functions import col
df.select(col("value") * 2).show()
filter:选择匹配谓词的元素
# RDD
rdd.filter(lambda x: x > 2).collect() # [3, 4, 5]
# DataFrame
df.filter(col("age") > 25).show()
flatMap:映射并展平结果
# RDD - 将文本拆分为单词
lines = sc.parallelize(["hello world", "apache spark"])
words = lines.flatMap(lambda line: line.split(" ")) # ["hello", "world", "apache", "spark"]
reduceByKey:按键聚合值
# 词频统计示例
words = sc.parallelize(["apple", "banana", "apple", "cherry", "banana", "apple"])
word_pairs = words.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
# 结果: [("apple", 3), ("banana", 2), ("cherry", 1)]
groupByKey:按键分组值
# 效率低于 reduceByKey
word_pairs.groupByKey().mapValues(list).collect()
# 结果: [("apple", [1, 1, 1]), ("banana", [1, 1]), ("cherry", [1])]
join:按键组合数据集
# RDD 连接
users = sc.parallelize([("user1", "Alice"), ("user2", "Bob")])
orders = sc.parallelize([("user1", 100), ("user2", 200), ("user1", 150)])
users.join(orders).collect()
# 结果: [("user1", ("Alice", 100)), ("user1", ("Alice", 150)), ("user2", ("Bob", 200))]
# DataFrame 连接
df_users.join(df_orders, "user_id", "inner").show()
distinct:移除重复项
# RDD
rdd.distinct().collect()
# DataFrame
df.distinct().show()
df.dropDuplicates(["user_id"]).show() # 基于特定列去重
coalesce/repartition:更改分区数量
# 减少分区
df.coalesce(1).write.parquet("output")
# 增加/减少分区
df.repartition(10).write.parquet("output")
df.repartition(10, "user_id").write.parquet("output") # 按列分区
collect:将所有数据检索到驱动程序
results = rdd.collect() # 返回列表
# 警告:仅用于适合驱动程序内存的小型数据集
count:统计元素数量
total = df.count() # 行数
first/take:获取前 N 个元素
first_elem = rdd.first()
first_five = rdd.take(5)
reduce:聚合所有元素
total_sum = rdd.reduce(lambda a, b: a + b)
foreach:对每个元素执行函数
# 仅副作用
rdd.foreach(lambda x: print(x))
saveAsTextFile:写入文件系统
rdd.saveAsTextFile("hdfs://path/to/output")
show:显示 DataFrame 行
df.show(20, truncate=False) # 显示 20 行,不截断列
使用 DataFrame API 处理连续数据流。
流式 DataFrame:
输入源:
输出模式:
输出接收器:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
# 从 JSON 文件读取流
input_stream = spark.readStream \
.format("json") \
.schema(schema) \
.option("maxFilesPerTrigger", 1) \
.load("input/directory")
# 转换流数据
processed = input_stream \
.filter(col("value") > 10) \
.select("id", "value", "timestamp")
# 将流写入 Parquet
query = processed.writeStream \
.format("parquet") \
.option("path", "output/directory") \
.option("checkpointLocation", "checkpoint/directory") \
.outputMode("append") \
.start()
# 等待终止
query.awaitTermination()
将流数据与静态参考数据连接:
# 静态 DataFrame
static_df = spark.read.parquet("reference/data")
# 流式 DataFrame
streaming_df = spark.readStream.format("kafka").load()
# 内连接
joined = streaming_df.join(static_df, "type")
# 左外连接
joined = streaming_df.join(static_df, "type", "left_outer")
# 写入结果
joined.writeStream \
.format("parquet") \
.option("path", "output") \
.option("checkpointLocation", "checkpoint") \
.start()
在时间窗口上聚合数据:
from pyspark.sql.functions import window, col, count
# 10 分钟滚动窗口
windowed_counts = streaming_df \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("word")
) \
.count()
# 10 分钟滑动窗口,5 分钟滑动间隔
windowed_counts = streaming_df \
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("word")
) \
.count()
# 写入控制台进行调试
query = windowed_counts.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.start()
使用水位线处理延迟到达的数据:
from pyspark.sql.functions import window
# 定义水位线
windowed_counts = streaming_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("word")
) \
.count()
# 延迟超过 10 分钟的数据将被丢弃
水位线的好处:
基于不活动间隙将事件分组到会话中:
from pyspark.sql.functions import session_window, when
# 基于用户的动态会话窗口
session_window_spec = session_window(
col("timestamp"),
when(col("userId") == "user1", "5 seconds")
.when(col("userId") == "user2", "20 seconds")
.otherwise("5 minutes")
)
sessionized_counts = streaming_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(session_window_spec, col("userId")) \
.count()
跨微批次维护状态:
from pyspark.sql.functions import expr
# 使用状态进行去重
deduplicated = streaming_df \
.withWatermark("timestamp", "1 hour") \
.dropDuplicates(["user_id", "event_id"])
# 流-流连接
stream1 = spark.readStream.format("kafka").option("subscribe", "topic1").load()
stream2 = spark.readStream.format("kafka").option("subscribe", "topic2").load()
joined = stream1 \
.withWatermark("timestamp", "10 minutes") \
.join(
stream2.withWatermark("timestamp", "20 minutes"),
expr("stream1.user_id = stream2.user_id AND stream1.timestamp >= stream2.timestamp AND stream1.timestamp <= stream2.timestamp + interval 15 minutes"),
"inner"
)
通过检查点确保容错:
# 检查点位置存储:
# - 流元数据
# - 状态信息
# - 预写日志
query = streaming_df.writeStream \
.format("parquet") \
.option("path", "output") \
.option("checkpointLocation", "checkpoint/dir") # 生产环境必需 \
.start()
# 恢复:使用相同的检查点位置重新启动查询
# Spark 将从最后提交的偏移量恢复
检查点最佳实践:
Spark 的可扩展机器学习库。
MLlib 特性:
链接转换和估计器:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
# 加载数据
df = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
# 定义管道阶段
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="features"
)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withStd=True,
withMean=True
)
lr = LogisticRegression(
featuresCol="scaled_features",
labelCol="label",
maxIter=10,
regParam=0.01
)
# 创建管道
pipeline = Pipeline(stages=[assembler, scaler, lr])
# 分割数据
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# 训练模型
model = pipeline.fit(train_df)
# 进行预测
predictions = model.transform(test_df)
predictions.select("label", "prediction", "probability").show()
将原始数据转换为特征:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
# 分类编码
indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")
# 数值缩放
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
# 组装特征
assembler = VectorAssembler(
inputCols=["category_vec", "numeric_feature1", "numeric_feature2"],
outputCol="features"
)
# 文本处理
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")
在流数据上训练模型:
from pyspark.mllib.regression import LabeledPoint
from pyspark.streaming import StreamingContext
from pyspark.streaming.ml import StreamingLinearRegressionWithSGD
# 创建 StreamingContext
ssc = StreamingContext(sc, batchDuration=1)
# 定义数据流
training_stream = ssc.textFileStream("training/data/path")
testing_stream = ssc.textFileStream("testing/data/path")
# 将流解析为 LabeledPoint 对象
def parse_point(line):
values = [float(x) for x in line.strip().split(',')]
return LabeledPoint(values[0], values[1:])
parsed_training = training_stream.map(parse_point)
parsed_testing = testing_stream.map(parse_point)
# 初始化模型
num_features = 3
model = StreamingLinearRegressionWithSGD(initialWeights=[0.0] * num_features)
# 训练和预测
model.trainOn(parsed_training)
predictions = model.predictOnValues(parsed_testing.map(lambda lp: (lp.label, lp.features)))
# 打印预测
predictions.pprint()
# 启动流处理
ssc.start()
ssc.awaitTermination()
评估模型性能:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
# 二分类
binary_evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc = binary_evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# 多分类
multi_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = multi_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
# 回归
regression_evaluator = RegressionEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="rmse"
)
rmse = regression_evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
使用交叉验证优化模型参数:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 定义模型
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
# 构建参数网格
param_grid = ParamGridBuilder() \
.addGrid(rf.numTrees, [10, 20, 50]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
.build()
# 定义评估器
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
# 交叉验证
cv = CrossValidator(
estimator=rf,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=5,
parallelism=4
)
# 训练
cv_model = cv.fit(train_df)
# 最佳模型
best_model = cv_model.bestModel
print(f"Best numTrees: {best_model.getNumTrees}")
print(f"Best maxDepth: {best_model.getMaxDepth()}")
# 在测试集上评估
predictions = cv_model.transform(test_df)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")
MLlib 提供分布式矩阵表示:
from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix, CoordinateMatrix
from pyspark.mllib.linalg import Vectors
# RowMatrix: 没有行索引的分布式矩阵
rows = sc.parallelize([
Vectors.dense([1.0, 2.0, 3.0]),
Vectors.dense([4.0, 5.0, 6.0]),
Vectors.dense([7.0, 8.0, 9.0])
])
row_matrix = RowMatrix(rows)
# 计算统计信息
print(f"Rows: {row_matrix.numRows()}")
print(f"Cols: {row_matrix.numCols()}")
print(f"Column means: {row_matrix.computeColumnSummaryStatistics().mean()}")
# IndexedRowMatrix: 带有行索引的矩阵
from pyspark.mllib.linalg.distributed import IndexedRow
indexed_rows = sc.parallelize([
IndexedRow(0, Vectors.dense([1.0, 2.0, 3.0])),
IndexedRow(1, Vectors.dense([4.0, 5.0, 6.0]))
])
indexed_matrix = IndexedRowMatrix(indexed_rows)
# CoordinateMatrix: 使用 (行, 列, 值) 条目的稀疏矩阵
from pyspark.mllib.linalg.distributed import MatrixEntry
entries = sc.parallelize([
MatrixEntry(0, 0, 1.0),
MatrixEntry(0, 2, 3.0),
MatrixEntry(1, 1, 5.0)
])
coord_matrix = CoordinateMatrix(entries)
在保留类别分布的同时抽样数据:
# Scala/Java 方法
data = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("c", 6)]
rdd = sc.parallelize(data)
# 定义每个键的抽样比例
fractions = {"a": 0.5, "b": 0.5, "c": 0.5}
# 近似抽样
sampled_rdd = rdd.sampleByKey(withReplacement=False, fractions=fractions)
# 精确抽样
exact_sampled = rdd.sampleByKeyExact(withReplacement=False, fractions=fractions)
print(sampled_rdd.collect())
内存分解:
配置:
spark = SparkSession.builder \
.appName("MemoryTuning") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.config("spark.memory.fraction", "0.6") # 执行 + 存储的内存比例 \
.config("spark.memory.storageFraction", "0.5") # 存储内存比例 \
.getOrCreate()
内存最佳实践:
洗牌是昂贵的操作 - 尽量减少它们:
导致洗牌的操作:
优化策略:
# 1. 使用 reduceByKey 而不是 groupByKey
# 不好: groupByKey 洗牌所有数据
word_pairs.groupByKey().mapValues(sum)
# 好: reduceByKey 在洗牌前在本地合并
word_pairs.reduceByKey(lambda a, b: a + b)
# 2. 在连接中广播小表
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
# 3. 适当分区数据
df.repartition(200, "user_id") # 为后续聚合按键分区
# 4. 减少分区时使用 coalesce 而不是 repartition
df.coalesce(10) # 没有洗牌,只是合并分区
# 5. 调整洗牌分区
spark.conf.set("spark.sql.shuffle.partitions", 200) # 默认是 200
洗牌配置:
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 200) \
.config("spark.default.parallelism", 200) \
.config("spark.sql.adaptive.enabled", "true") # 启用 AQE \
.config("spark
A comprehensive skill for mastering Apache Spark data processing, from basic RDD operations to advanced streaming, SQL, and machine learning workflows. Learn to build scalable, distributed data pipelines and analytics systems.
Use Apache Spark when you need to:
Not Ideal For:
RDDs are Spark's fundamental data abstraction - immutable, distributed collections of objects that can be processed in parallel.
Key Characteristics:
RDD Operations:
When to Use RDDs:
Prefer DataFrames/Datasets when possible - they provide automatic optimization via Catalyst optimizer.
DataFrames are distributed collections of data organized into named columns - similar to a database table or pandas DataFrame, but with powerful optimizations.
DataFrames:
Datasets (Scala/Java only):
Key Advantages Over RDDs:
Spark uses lazy evaluation to optimize execution:
Benefits:
Data is divided into partitions for parallel processing:
Partition Count Considerations:
repartition() or coalesce() to adjust partition countCache frequently accessed data in memory for performance:
# Cache DataFrame in memory
df.cache() # Shorthand for persist(StorageLevel.MEMORY_AND_DISK)
# Different storage levels
df.persist(StorageLevel.MEMORY_ONLY) # Fast but may lose data if evicted
df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk if memory full
df.persist(StorageLevel.DISK_ONLY) # Store only on disk
df.persist(StorageLevel.MEMORY_ONLY_SER) # Serialized in memory (more compact)
# Unpersist when done
df.unpersist()
When to Cache:
When Not to Cache:
Spark SQL allows you to query structured data using SQL or DataFrame API:
Performance Features:
Shared variables for efficient distributed computing:
Broadcast Variables:
Read-only variables cached on each node
Efficient for sharing large read-only data (lookup tables, ML models)
Avoid sending large data with every task
lookup_table = {"key1": "value1", "key2": "value2"} broadcast_lookup = sc.broadcast(lookup_table)
rdd.map(lambda x: broadcast_lookup.value.get(x, "default"))
Accumulators:
Write-only variables for aggregating values across tasks
Used for counters and sums in distributed operations
Only driver can read final accumulated value
error_count = sc.accumulator(0)
rdd.foreach(lambda x: error_count.add(1) if is_error(x) else None)
print(f"Total errors: {error_count.value}")
Create DataFrames from various sources:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()
# From structured data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
columns = ["name", "id"]
df = spark.createDataFrame(data, columns)
# From files
df_json = spark.read.json("path/to/file.json")
df_parquet = spark.read.parquet("path/to/file.parquet")
df_csv = spark.read.option("header", "true").csv("path/to/file.csv")
# From JDBC sources
df_jdbc = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:port/database") \
.option("dbtable", "table_name") \
.option("user", "username") \
.option("password", "password") \
.load()
Common DataFrame transformations:
# Select columns
df.select("name", "age").show()
# Filter rows
df.filter(df.age > 21).show()
df.where(df["age"] > 21).show() # Alternative syntax
# Add/modify columns
from pyspark.sql.functions import col, lit
df.withColumn("age_plus_10", col("age") + 10).show()
df.withColumn("country", lit("USA")).show()
# Aggregations
df.groupBy("department").count().show()
df.groupBy("department").agg({"salary": "avg", "age": "max"}).show()
# Sorting
df.orderBy("age").show()
df.orderBy(col("age").desc()).show()
# Joins
df1.join(df2, df1.id == df2.user_id, "inner").show()
df1.join(df2, "id", "left_outer").show()
# Unions
df1.union(df2).show()
Execute SQL on DataFrames:
# Register DataFrame as temporary view
df.createOrReplaceTempView("people")
# Run SQL queries
sql_result = spark.sql("SELECT name FROM people WHERE age > 21")
sql_result.show()
# Complex queries
result = spark.sql("""
SELECT
department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MAX(age) as max_age
FROM people
WHERE age > 25
GROUP BY department
HAVING COUNT(*) > 5
ORDER BY avg_salary DESC
""")
result.show()
Spark SQL supports multiple data formats:
Parquet (Recommended for Analytics):
Columnar storage format
Excellent compression and query performance
Schema embedded in file
Supports predicate pushdown and column pruning
df.write.parquet("output/path", mode="overwrite", compression="snappy")
df = spark.read.parquet("output/path").filter(col("date") == "2025-01-01")
ORC (Optimized Row Columnar):
Similar to Parquet with slightly better compression
Preferred for Hive integration
Built-in indexes for faster queries
df.write.orc("output/path", mode="overwrite") df = spark.read.orc("output/path")
JSON (Semi-Structured Data):
Human-readable but less efficient
Schema inference on read
Good for nested/complex data
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ])
df = spark.read.schema(schema).json("data.json")
CSV (Legacy/Simple Data):
Widely compatible but slow
Requires header inference or explicit schema
Minimal compression benefits
df.write.csv("output.csv", header=True, mode="overwrite") df = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")
Advanced analytics with window functions:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum, avg
# Define window specification
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
# Ranking functions
df.withColumn("rank", rank().over(window_spec)).show()
df.withColumn("row_num", row_number().over(window_spec)).show()
df.withColumn("dense_rank", dense_rank().over(window_spec)).show()
# Aggregate functions over window
df.withColumn("dept_avg_salary", avg("salary").over(window_spec)).show()
df.withColumn("running_total", sum("salary").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()
# Offset functions
df.withColumn("prev_salary", lag("salary", 1).over(window_spec)).show()
df.withColumn("next_salary", lead("salary", 1).over(window_spec)).show()
Create custom transformations:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
# Python UDF (slower due to serialization overhead)
def categorize_age(age):
if age < 18:
return "Minor"
elif age < 65:
return "Adult"
else:
return "Senior"
categorize_udf = udf(categorize_age, StringType())
df.withColumn("age_category", categorize_udf(col("age"))).show()
# Pandas UDF (vectorized, faster for large datasets)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(IntegerType())
def square(series: pd.Series) -> pd.Series:
return series ** 2
df.withColumn("age_squared", square(col("age"))).show()
UDF Performance Tips:
map : Apply function to each element
# RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
squared = rdd.map(lambda x: x * 2) # [2, 4, 6, 8, 10]
# DataFrame (use select with functions)
from pyspark.sql.functions import col
df.select(col("value") * 2).show()
filter : Select elements matching predicate
# RDD
rdd.filter(lambda x: x > 2).collect() # [3, 4, 5]
# DataFrame
df.filter(col("age") > 25).show()
flatMap : Map and flatten results
# RDD - Split text into words
lines = sc.parallelize(["hello world", "apache spark"])
words = lines.flatMap(lambda line: line.split(" ")) # ["hello", "world", "apache", "spark"]
reduceByKey : Aggregate values by key
# Word count example
words = sc.parallelize(["apple", "banana", "apple", "cherry", "banana", "apple"])
word_pairs = words.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
# Result: [("apple", 3), ("banana", 2), ("cherry", 1)]
groupByKey : Group values by key (avoid when possible - use reduceByKey instead)
# Less efficient than reduceByKey
word_pairs.groupByKey().mapValues(list).collect()
# Result: [("apple", [1, 1, 1]), ("banana", [1, 1]), ("cherry", [1])]
join : Combine datasets by key
# RDD join
users = sc.parallelize([("user1", "Alice"), ("user2", "Bob")])
orders = sc.parallelize([("user1", 100), ("user2", 200), ("user1", 150)])
users.join(orders).collect()
# Result: [("user1", ("Alice", 100)), ("user1", ("Alice", 150)), ("user2", ("Bob", 200))]
# DataFrame join (more efficient)
df_users.join(df_orders, "user_id", "inner").show()
distinct : Remove duplicates
# RDD
rdd.distinct().collect()
# DataFrame
df.distinct().show()
df.dropDuplicates(["user_id"]).show() # Drop based on specific columns
coalesce/repartition : Change partition count
# Reduce partitions (no shuffle, more efficient)
df.coalesce(1).write.parquet("output")
# Increase/decrease partitions (involves shuffle)
df.repartition(10).write.parquet("output")
df.repartition(10, "user_id").write.parquet("output") # Partition by column
collect : Retrieve all data to driver
results = rdd.collect() # Returns list
# WARNING: Only use on small datasets that fit in driver memory
count : Count elements
total = df.count() # Number of rows
first/take : Get first N elements
first_elem = rdd.first()
first_five = rdd.take(5)
reduce : Aggregate all elements
total_sum = rdd.reduce(lambda a, b: a + b)
foreach : Execute function on each element
# Side effects only (no return value)
rdd.foreach(lambda x: print(x))
saveAsTextFile : Write to file system
rdd.saveAsTextFile("hdfs://path/to/output")
show : Display DataFrame rows (action)
df.show(20, truncate=False) # Show 20 rows, don't truncate columns
Process continuous data streams using DataFrame API.
Streaming DataFrame:
Input Sources:
Output Modes:
Output Sinks:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
# Read stream from JSON files
input_stream = spark.readStream \
.format("json") \
.schema(schema) \
.option("maxFilesPerTrigger", 1) \
.load("input/directory")
# Transform streaming data
processed = input_stream \
.filter(col("value") > 10) \
.select("id", "value", "timestamp")
# Write stream to Parquet
query = processed.writeStream \
.format("parquet") \
.option("path", "output/directory") \
.option("checkpointLocation", "checkpoint/directory") \
.outputMode("append") \
.start()
# Wait for termination
query.awaitTermination()
Join streaming data with static reference data:
# Static DataFrame (loaded once)
static_df = spark.read.parquet("reference/data")
# Streaming DataFrame
streaming_df = spark.readStream.format("kafka").load()
# Inner join (supported)
joined = streaming_df.join(static_df, "type")
# Left outer join (supported)
joined = streaming_df.join(static_df, "type", "left_outer")
# Write result
joined.writeStream \
.format("parquet") \
.option("path", "output") \
.option("checkpointLocation", "checkpoint") \
.start()
Aggregate data over time windows:
from pyspark.sql.functions import window, col, count
# 10-minute tumbling window
windowed_counts = streaming_df \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("word")
) \
.count()
# 10-minute sliding window with 5-minute slide
windowed_counts = streaming_df \
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("word")
) \
.count()
# Write to console for debugging
query = windowed_counts.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.start()
Handle late-arriving data with watermarks:
from pyspark.sql.functions import window
# Define watermark (10 minutes tolerance for late data)
windowed_counts = streaming_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("word")
) \
.count()
# Data arriving more than 10 minutes late will be dropped
Watermark Benefits:
Group events into sessions based on inactivity gaps:
from pyspark.sql.functions import session_window, when
# Dynamic session window based on user
session_window_spec = session_window(
col("timestamp"),
when(col("userId") == "user1", "5 seconds")
.when(col("userId") == "user2", "20 seconds")
.otherwise("5 minutes")
)
sessionized_counts = streaming_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(session_window_spec, col("userId")) \
.count()
Maintain state across micro-batches:
from pyspark.sql.functions import expr
# Deduplication using state
deduplicated = streaming_df \
.withWatermark("timestamp", "1 hour") \
.dropDuplicates(["user_id", "event_id"])
# Stream-stream joins (stateful)
stream1 = spark.readStream.format("kafka").option("subscribe", "topic1").load()
stream2 = spark.readStream.format("kafka").option("subscribe", "topic2").load()
joined = stream1 \
.withWatermark("timestamp", "10 minutes") \
.join(
stream2.withWatermark("timestamp", "20 minutes"),
expr("stream1.user_id = stream2.user_id AND stream1.timestamp >= stream2.timestamp AND stream1.timestamp <= stream2.timestamp + interval 15 minutes"),
"inner"
)
Ensure fault tolerance with checkpoints:
# Checkpoint location stores:
# - Stream metadata (offsets, configuration)
# - State information (for stateful operations)
# - Write-ahead logs
query = streaming_df.writeStream \
.format("parquet") \
.option("path", "output") \
.option("checkpointLocation", "checkpoint/dir") # REQUIRED for production \
.start()
# Recovery: Restart query with same checkpoint location
# Spark will resume from last committed offset
Checkpoint Best Practices:
Spark's scalable machine learning library.
MLlib Features:
Chain transformations and estimators:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
# Load data
df = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
# Define pipeline stages
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="features"
)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withStd=True,
withMean=True
)
lr = LogisticRegression(
featuresCol="scaled_features",
labelCol="label",
maxIter=10,
regParam=0.01
)
# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Train model
model = pipeline.fit(train_df)
# Make predictions
predictions = model.transform(test_df)
predictions.select("label", "prediction", "probability").show()
Transform raw data into features:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
# Categorical encoding
indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")
# Numerical scaling
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
# Assemble features
assembler = VectorAssembler(
inputCols=["category_vec", "numeric_feature1", "numeric_feature2"],
outputCol="features"
)
# Text processing
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")
Train models on streaming data:
from pyspark.mllib.regression import LabeledPoint
from pyspark.streaming import StreamingContext
from pyspark.streaming.ml import StreamingLinearRegressionWithSGD
# Create StreamingContext
ssc = StreamingContext(sc, batchDuration=1)
# Define data streams
training_stream = ssc.textFileStream("training/data/path")
testing_stream = ssc.textFileStream("testing/data/path")
# Parse streams into LabeledPoint objects
def parse_point(line):
values = [float(x) for x in line.strip().split(',')]
return LabeledPoint(values[0], values[1:])
parsed_training = training_stream.map(parse_point)
parsed_testing = testing_stream.map(parse_point)
# Initialize model
num_features = 3
model = StreamingLinearRegressionWithSGD(initialWeights=[0.0] * num_features)
# Train and predict
model.trainOn(parsed_training)
predictions = model.predictOnValues(parsed_testing.map(lambda lp: (lp.label, lp.features)))
# Print predictions
predictions.pprint()
# Start streaming
ssc.start()
ssc.awaitTermination()
Evaluate model performance:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
# Binary classification
binary_evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc = binary_evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# Multiclass classification
multi_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = multi_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
# Regression
regression_evaluator = RegressionEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="rmse"
)
rmse = regression_evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
Optimize model parameters with cross-validation:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Define model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
# Build parameter grid
param_grid = ParamGridBuilder() \
.addGrid(rf.numTrees, [10, 20, 50]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
.build()
# Define evaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
# Cross-validation
cv = CrossValidator(
estimator=rf,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=5,
parallelism=4
)
# Train
cv_model = cv.fit(train_df)
# Best model
best_model = cv_model.bestModel
print(f"Best numTrees: {best_model.getNumTrees}")
print(f"Best maxDepth: {best_model.getMaxDepth()}")
# Evaluate on test set
predictions = cv_model.transform(test_df)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")
MLlib provides distributed matrix representations:
from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix, CoordinateMatrix
from pyspark.mllib.linalg import Vectors
# RowMatrix: Distributed matrix without row indices
rows = sc.parallelize([
Vectors.dense([1.0, 2.0, 3.0]),
Vectors.dense([4.0, 5.0, 6.0]),
Vectors.dense([7.0, 8.0, 9.0])
])
row_matrix = RowMatrix(rows)
# Compute statistics
print(f"Rows: {row_matrix.numRows()}")
print(f"Cols: {row_matrix.numCols()}")
print(f"Column means: {row_matrix.computeColumnSummaryStatistics().mean()}")
# IndexedRowMatrix: Matrix with row indices
from pyspark.mllib.linalg.distributed import IndexedRow
indexed_rows = sc.parallelize([
IndexedRow(0, Vectors.dense([1.0, 2.0, 3.0])),
IndexedRow(1, Vectors.dense([4.0, 5.0, 6.0]))
])
indexed_matrix = IndexedRowMatrix(indexed_rows)
# CoordinateMatrix: Sparse matrix using (row, col, value) entries
from pyspark.mllib.linalg.distributed import MatrixEntry
entries = sc.parallelize([
MatrixEntry(0, 0, 1.0),
MatrixEntry(0, 2, 3.0),
MatrixEntry(1, 1, 5.0)
])
coord_matrix = CoordinateMatrix(entries)
Sample data while preserving class distribution:
# Scala/Java approach
data = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("c", 6)]
rdd = sc.parallelize(data)
# Define sampling fractions per key
fractions = {"a": 0.5, "b": 0.5, "c": 0.5}
# Approximate sample (faster, one pass)
sampled_rdd = rdd.sampleByKey(withReplacement=False, fractions=fractions)
# Exact sample (slower, guaranteed exact counts)
exact_sampled = rdd.sampleByKeyExact(withReplacement=False, fractions=fractions)
print(sampled_rdd.collect())
Memory Breakdown:
Configuration:
spark = SparkSession.builder \
.appName("MemoryTuning") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.config("spark.memory.fraction", "0.6") # Fraction for execution + storage \
.config("spark.memory.storageFraction", "0.5") # Fraction of above for storage \
.getOrCreate()
Memory Best Practices:
Shuffles are expensive operations - minimize them:
Causes of Shuffles:
Optimization Strategies:
# 1. Use reduceByKey instead of groupByKey
# Bad: groupByKey shuffles all data
word_pairs.groupByKey().mapValues(sum)
# Good: reduceByKey combines locally before shuffle
word_pairs.reduceByKey(lambda a, b: a + b)
# 2. Broadcast small tables in joins
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
# 3. Partition data appropriately
df.repartition(200, "user_id") # Partition by key for subsequent aggregations
# 4. Coalesce instead of repartition when reducing partitions
df.coalesce(10) # No shuffle, just merge partitions
# 5. Tune shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 200) # Default is 200
Shuffle Configuration:
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 200) \
.config("spark.default.parallelism", 200) \
.config("spark.sql.adaptive.enabled", "true") # Enable AQE \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
Partition Count Guidelines:
Partition by Column:
# Partition writes by date for easy filtering
df.write.partitionBy("date", "country").parquet("output")
# Read with partition pruning (only reads relevant partitions)
spark.read.parquet("output").filter(col("date") == "2025-01-15").show()
Custom Partitioning:
from pyspark.rdd import portable_hash
# Custom partitioner for RDD
def custom_partitioner(key):
return portable_hash(key) % 100
rdd.partitionBy(100, custom_partitioner)
When to Cache:
# Iterative algorithms (ML)
training_data.cache()
for i in range(num_iterations):
model = train_model(training_data)
# Multiple aggregations on same data
base_df.cache()
result1 = base_df.groupBy("country").count()
result2 = base_df.groupBy("city").avg("sales")
# Interactive analysis
df.cache()
df.filter(condition1).show()
df.filter(condition2).show()
df.groupBy("category").count().show()
Storage Levels:
from pyspark import StorageLevel
# Memory only (fastest, but may lose data)
df.persist(StorageLevel.MEMORY_ONLY)
# Memory and disk (spill to disk if needed)
df.persist(StorageLevel.MEMORY_AND_DISK)
# Serialized in memory (more compact, slower access)
df.persist(StorageLevel.MEMORY_ONLY_SER)
# Disk only (slowest, but always available)
df.persist(StorageLevel.DISK_ONLY)
# Replicated (fault tolerance)
df.persist(StorageLevel.MEMORY_AND_DISK_2) # 2 replicas
Optimize joins with small tables:
from pyspark.sql.functions import broadcast
# Automatic broadcast (tables < spark.sql.autoBroadcastJoinThreshold)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # 10 MB
# Explicit broadcast hint
large_df.join(broadcast(small_df), "key")
# Benefits:
# - No shuffle of large table
# - Small table sent to all executors once
# - Much faster for small dimension tables
Enable runtime query optimization:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE Benefits:
# - Dynamically coalesce partitions after shuffle
# - Handle skewed joins by splitting large partitions
# - Optimize join strategy at runtime
Performance Comparison:
Recommendation:
Understand query optimization:
# View physical plan
df.explain(mode="extended")
# Optimizations include:
# - Predicate pushdown: Push filters to data source
# - Column pruning: Read only required columns
# - Constant folding: Evaluate constants at compile time
# - Join reordering: Optimize join order
# - Partition pruning: Skip irrelevant partitions
Standalone:
Simple, built-in cluster manager
Easy setup for development and small clusters
No resource sharing with other frameworks
$SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-worker.sh spark://master:7077
spark-submit --master spark://master:7077 app.py
YARN:
Hadoop's resource manager
Share cluster resources with MapReduce, Hive, etc.
Two modes: cluster (driver on YARN) and client (driver on local machine)
spark-submit --master yarn --deploy-mode cluster app.py
spark-submit --master yarn --deploy-mode client app.py
Kubernetes:
Modern container orchestration
Dynamic resource allocation
Cloud-native deployments
spark-submit
--master k8s://https://k8s-master:443
--deploy-mode cluster
--name spark-app
--conf spark.executor.instances=5
--conf spark.kubernetes.container.image=spark:latest
app.py
Mesos:
Basic spark-submit:
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.sql.shuffle.partitions=200 \
--py-files dependencies.zip \
--files config.json \
application.py
Configuration Options:
--master: Cluster manager URL--deploy-mode: Where to run driver (client or cluster)--driver-memory: Memory for driver process--executor-memory: Memory per executor--executor-cores: Cores per executor--num-executors: Number of executors--conf: Spark configuration properties--py-files: Python dependencies--files: Additional files to distributeGeneral Guidelines:
Example Calculations:
Cluster: 10 nodes, 32 cores each, 128 GB RAM each
Option 1: Many small executors
- 30 executors (3 per node)
- 10 cores per executor
- 40 GB memory per executor
- Total: 300 cores
Option 2: Fewer large executors (RECOMMENDED)
- 50 executors (5 per node)
- 5 cores per executor
- 24 GB memory per executor
- Total: 250 cores
Automatically scale executors based on workload:
spark = SparkSession.builder \
.appName("DynamicAllocation") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", 2) \
.config("spark.dynamicAllocation.maxExecutors", 100) \
.config("spark.dynamicAllocation.initialExecutors", 10) \
.config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
.getOrCreate()
Benefits:
Spark UI:
History Server:
# Start history server
$SPARK_HOME/sbin/start-history-server.sh
# Configure event logging
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir", "hdfs://namenode/spark-logs")
Metrics:
# Enable metrics collection
spark.conf.set("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink")
spark.conf.set("spark.metrics.conf.*.sink.console.period", 10)
Logging:
# Configure log level
spark.sparkContext.setLogLevel("WARN") # ERROR, WARN, INFO, DEBUG
# Custom logging
import logging
logger = logging.getLogger(__name__)
logger.info("Custom log message")
Automatic Recovery:
Checkpointing:
# Set checkpoint directory
spark.sparkContext.setCheckpointDir("hdfs://namenode/checkpoints")
# Checkpoint RDD (breaks lineage for very long chains)
rdd.checkpoint()
# Streaming checkpoint (required for production)
query = streaming_df.writeStream \
.option("checkpointLocation", "hdfs://namenode/streaming-checkpoint") \
.start()
Speculative Execution:
# Enable speculative execution for slow tasks
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", 1.5)
spark.conf.set("spark.speculation.quantile", 0.75)
Optimize data placement for performance:
Locality Levels:
Improve Locality:
# Increase locality wait time
spark.conf.set("spark.locality.wait", "10s")
spark.conf.set("spark.locality.wait.node", "5s")
spark.conf.set("spark.locality.wait.rack", "3s")
# Partition data to match cluster topology
df.repartition(num_nodes * cores_per_node)
def etl_pipeline(spark, input_path, output_path):
# Extract
raw_df = spark.read.parquet(input_path)
# Transform
cleaned_df = raw_df \
.dropDuplicates(["id"]) \
.filter(col("value").isNotNull()) \
.withColumn("processed_date", current_date())
# Enrich
enriched_df = cleaned_df.join(broadcast(reference_df), "key")
# Aggregate
aggregated_df = enriched_df \
.groupBy("category", "date") \
.agg(
count("*").alias("count"),
sum("amount").alias("total_amount"),
avg("value").alias("avg_value")
)
# Load
aggregated_df.write \
.partitionBy("date") \
.mode("overwrite") \
.parquet(output_path)
def incremental_process(spark, input_path, output_path, checkpoint_path):
# Read last processed timestamp
last_timestamp = read_checkpoint(checkpoint_path)
# Read new data
new_data = spark.read.parquet(input_path) \
.filter(col("timestamp") > last_timestamp)
# Process
processed = transform(new_data)
# Write
processed.write.mode("append").parquet(output_path)
# Update checkpoint
max_timestamp = new_data.agg(max("timestamp")).collect()[0][0]
write_checkpoint(checkpoint_path, max_timestamp)
def scd_type2_upsert(spark, dimension_df, updates_df):
# Mark existing records as inactive if updated
inactive_records = dimension_df \
.join(updates_df, "business_key") \
.select(
dimension_df["*"],
lit(False).alias("is_active"),
current_date().alias("end_date")
)
# Add new records
new_records = updates_df \
.withColumn("is_active", lit(True)) \
.withColumn("start_date", current_date()) \
.withColumn("end_date", lit(None))
# Union unchanged, inactive, and new records
result = dimension_df \
.join(updates_df, "business_key", "left_anti") \
.union(inactive_records) \
.union(new_records)
return result
def calculate_running_metrics(df):
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lag, sum, avg
# Define window
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
# Calculate metrics
result = df \
.withColumn("row_num", row_number().over(window_spec)) \
.withColumn("prev_value", lag("value", 1).over(window_spec)) \
.withColumn("running_total", sum("value").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))) \
.withColumn("moving_avg", avg("value").over(window_spec.rowsBetween(-2, 0)))
return result
Symptoms:
java.lang.OutOfMemoryErrorSolutions:
# Increase executor memory
spark.conf.set("spark.executor.memory", "8g")
# Increase driver memory (if collecting data)
spark.conf.set("spark.driver.memory", "4g")
# Reduce memory pressure
df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk
df.coalesce(100) # Reduce partition count
spark.conf.set("spark.sql.shuffle.partitions", 400) # Increase shuffle partitions
# Avoid collect() on large datasets
# Use take() or limit() instead
df.take(100)
Symptoms:
Solutions:
# Increase shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 400)
# Handle skew with salting
df_salted = df.withColumn("salt", (rand() * 10).cast("int"))
result = df_salted.groupBy("key", "salt").agg(...)
# Use broadcast for small tables
large_df.join(broadcast(small_df), "key")
# Enable AQE for automatic optimization
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Symptoms:
Solutions:
# Increase executor memory for stateful operations
spark.conf.set("spark.executor.memory", "8g")
# Tune watermark for late data
.withWatermark("timestamp", "15 minutes")
# Increase trigger interval to reduce micro-batch overhead
.trigger(processingTime="30 seconds")
# Monitor lag and adjust parallelism
spark.conf.set("spark.sql.shuffle.partitions", 200)
# Recover from checkpoint corruption
# Delete checkpoint directory and restart (data loss possible)
# Or implement custom state recovery logic
Symptoms:
Solutions:
# 1. Salting technique (add random prefix to keys)
from pyspark.sql.functions import concat, lit, rand
df_salted = df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 10).cast("int")))
result = df_salted.groupBy("salted_key").agg(...)
# 2. Repartition by skewed column
df.repartition(200, "skewed_column")
# 3. Isolate skewed keys
skewed_keys = df.groupBy("key").count().filter(col("count") > threshold).select("key")
skewed_df = df.join(broadcast(skewed_keys), "key")
normal_df = df.join(broadcast(skewed_keys), "key", "left_anti")
# Process separately
skewed_result = process_with_salting(skewed_df)
normal_result = process_normally(normal_df)
final = skewed_result.union(normal_result)
This skill integrates real-world code examples from Apache Spark's official repository. All code snippets in the EXAMPLES.md file are sourced from Context7's Apache Spark library documentation, ensuring production-ready patterns and best practices.
Skill Version : 1.0.0 Last Updated : October 2025 Skill Category : Big Data, Distributed Computing, Data Engineering, Machine Learning Context7 Integration : /apache/spark with 8000 tokens of documentation
Weekly Installs
69
Repository
GitHub Stars
47
First Seen
Jan 22, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykPass
Installed on
codex55
opencode55
gemini-cli53
cursor53
github-copilot51
amp43
专业SEO审计工具:全面网站诊断、技术SEO优化与页面分析指南
70,100 周安装