spark-engineer by jeffallan/claude-skills
npx skills add https://github.com/jeffallan/claude-skills --skill spark-engineer专注于高性能分布式数据处理、优化大规模 ETL 管道以及构建生产级 Spark 应用的高级 Apache Spark 工程师。
df.rdd.getNumPartitions() 验证分区数量;如果检测到溢出或倾斜,则返回步骤 4;使用生产规模的数据进行测试,监控资源使用情况,验证性能目标根据上下文加载详细指导:
| 主题 | 参考 | 加载时机 |
|---|---|---|
| Spark SQL 与 DataFrames | references/spark-sql-dataframes.md | DataFrame API、Spark SQL、模式、连接、聚合 |
| RDD 操作 | references/rdd-operations.md |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 转换、动作、键值对 RDD、自定义分区器 |
| 分区与缓存 | references/partitioning-caching.md | 数据分区、持久化级别、广播变量 |
| 性能调优 | references/performance-tuning.md | 配置、内存调优、shuffle 优化、倾斜处理 |
| 流处理模式 | references/streaming-patterns.md | 结构化流处理、水印、有状态操作、输出接收器 |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
spark = SparkSession.builder \
.appName("example-pipeline") \
.config("spark.sql.shuffle.partitions", "400") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# 在生产环境中始终定义显式模式
schema = StructType([
StructField("user_id", StringType(), False),
StructField("event_ts", LongType(), False),
StructField("amount", DoubleType(), True),
])
df = spark.read.schema(schema).parquet("s3://bucket/events/")
result = df \
.filter(F.col("amount").isNotNull()) \
.groupBy("user_id") \
.agg(F.sum("amount").alias("total_amount"), F.count("*").alias("event_count"))
# 写入前验证分区数量
print(f"Partition count: {result.rdd.getNumPartitions()}")
result.write.mode("overwrite").parquet("s3://bucket/output/")
from pyspark.sql.functions import broadcast
# Spark 会自动广播 dim_table;提示使意图更明确
enriched = large_fact_df.join(broadcast(dim_df), on="product_id", how="left")
import pyspark.sql.functions as F
SALT_BUCKETS = 50
# 在两侧为倾斜键添加盐值
skewed_df = skewed_df.withColumn("salt", (F.rand() * SALT_BUCKETS).cast("int")) \
.withColumn("salted_key", F.concat(F.col("skewed_key"), F.lit("_"), F.col("salt")))
other_df = other_df.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(SALT_BUCKETS)]))) \
.withColumn("salted_key", F.concat(F.col("skewed_key"), F.lit("_"), F.col("salt")))
result = skewed_df.join(other_df, on="salted_key", how="inner") \
.drop("salt", "salted_key")
# 仅当 DataFrame 被多次重用时才进行缓存
df_cleaned = df.filter(...).withColumn(...).cache()
df_cleaned.count() # 立即物化;检查 Spark UI 是否有溢出
report_a = df_cleaned.groupBy("region").agg(...)
report_b = df_cleaned.groupBy("product").agg(...)
df_cleaned.unpersist() # 完成后释放
实施 Spark 解决方案时,请提供:
Spark DataFrame API、Spark SQL、RDD 转换/动作、Catalyst 优化器、Tungsten 执行引擎、分区策略、广播变量、累加器、结构化流处理、水印、检查点、Spark UI 分析、内存管理、shuffle 优化
每周安装次数
702
代码仓库
GitHub 星标数
7.2K
首次出现
2026年1月21日
安全审计
安装于
opencode583
gemini-cli566
claude-code560
codex554
cursor525
github-copilot521
Senior Apache Spark engineer specializing in high-performance distributed data processing, optimizing large-scale ETL pipelines, and building production-grade Spark applications.
df.rdd.getNumPartitions(); if spill or skew detected, return to step 4; test with production-scale data, monitor resource usage, verify performance targetsLoad detailed guidance based on context:
| Topic | Reference | Load When |
|---|---|---|
| Spark SQL & DataFrames | references/spark-sql-dataframes.md | DataFrame API, Spark SQL, schemas, joins, aggregations |
| RDD Operations | references/rdd-operations.md | Transformations, actions, pair RDDs, custom partitioners |
| Partitioning & Caching | references/partitioning-caching.md | Data partitioning, persistence levels, broadcast variables |
| Performance Tuning | references/performance-tuning.md | Configuration, memory tuning, shuffle optimization, skew handling |
| Streaming Patterns | references/streaming-patterns.md | Structured Streaming, watermarks, stateful operations, sinks |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
spark = SparkSession.builder \
.appName("example-pipeline") \
.config("spark.sql.shuffle.partitions", "400") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Always define explicit schemas in production
schema = StructType([
StructField("user_id", StringType(), False),
StructField("event_ts", LongType(), False),
StructField("amount", DoubleType(), True),
])
df = spark.read.schema(schema).parquet("s3://bucket/events/")
result = df \
.filter(F.col("amount").isNotNull()) \
.groupBy("user_id") \
.agg(F.sum("amount").alias("total_amount"), F.count("*").alias("event_count"))
# Verify partition count before writing
print(f"Partition count: {result.rdd.getNumPartitions()}")
result.write.mode("overwrite").parquet("s3://bucket/output/")
from pyspark.sql.functions import broadcast
# Spark will automatically broadcast dim_table; hint makes intent explicit
enriched = large_fact_df.join(broadcast(dim_df), on="product_id", how="left")
import pyspark.sql.functions as F
SALT_BUCKETS = 50
# Add salt to the skewed key on both sides
skewed_df = skewed_df.withColumn("salt", (F.rand() * SALT_BUCKETS).cast("int")) \
.withColumn("salted_key", F.concat(F.col("skewed_key"), F.lit("_"), F.col("salt")))
other_df = other_df.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(SALT_BUCKETS)]))) \
.withColumn("salted_key", F.concat(F.col("skewed_key"), F.lit("_"), F.col("salt")))
result = skewed_df.join(other_df, on="salted_key", how="inner") \
.drop("salt", "salted_key")
# Cache ONLY when the DataFrame is reused multiple times
df_cleaned = df.filter(...).withColumn(...).cache()
df_cleaned.count() # Materialize immediately; check Spark UI for spill
report_a = df_cleaned.groupBy("region").agg(...)
report_b = df_cleaned.groupBy("product").agg(...)
df_cleaned.unpersist() # Release when done
When implementing Spark solutions, provide:
Spark DataFrame API, Spark SQL, RDD transformations/actions, catalyst optimizer, tungsten execution engine, partitioning strategies, broadcast variables, accumulators, structured streaming, watermarks, checkpointing, Spark UI analysis, memory management, shuffle optimization
Weekly Installs
702
Repository
GitHub Stars
7.2K
First Seen
Jan 21, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode583
gemini-cli566
claude-code560
codex554
cursor525
github-copilot521
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
103,800 周安装