dask by davila7/claude-code-templates
npx skills add https://github.com/davila7/claude-code-templates --skill daskDask 是一个用于并行和分布式计算的 Python 库,它提供了三个关键能力:
Dask 的规模可以从笔记本电脑(处理约 100 GiB 数据)扩展到集群(处理约 100 TiB 数据),同时保持熟悉的 Python API。
在以下情况下应使用此技能:
Dask 提供五个主要组件,每个组件适用于不同的用例:
目的:通过并行处理将 pandas 操作扩展到更大的数据集。
何时使用:
参考文档:有关 Dask DataFrames 的全面指导,请参阅 references/dataframes.md,其中包括:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
map_partitions 进行自定义操作快速示例:
import dask.dataframe as dd
# 将多个文件读取为单个 DataFrame
ddf = dd.read_csv('data/2024-*.csv')
# 操作是惰性的,直到调用 compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').mean().compute()
关键点:
.compute()map_partitions 进行高效的自定义操作目的:使用分块算法将 NumPy 功能扩展到大于内存的数据集。
何时使用:
参考文档:有关 Dask Arrays 的全面指导,请参阅 references/arrays.md,其中包括:
map_blocks 进行自定义操作快速示例:
import dask.array as da
# 创建带有分块的大数组
x = da.random.random((100000, 100000), chunks=(10000, 10000))
# 操作是惰性的
y = x + 100
z = y.mean(axis=0)
# 计算结果
result = z.compute()
关键点:
map_blocks目的:使用函数式操作处理非结构化或半结构化数据(文本、JSON、日志)。
何时使用:
参考文档:有关 Dask Bags 的全面指导,请参阅 references/bags.md,其中包括:
快速示例:
import dask.bag as db
import json
# 读取并解析 JSON 文件
bag = db.read_text('logs/*.json').map(json.loads)
# 过滤和转换
valid = bag.filter(lambda x: x['status'] == 'valid')
processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})
# 转换为 DataFrame 进行分析
ddf = processed.to_dataframe()
关键点:
foldby 代替 groupby 以获得更好的性能目的:构建自定义并行工作流,对任务执行和依赖关系进行细粒度控制。
何时使用:
参考文档:有关 Dask Futures 的全面指导,请参阅 references/futures.md,其中包括:
快速示例:
from dask.distributed import Client
client = Client() # 创建本地集群
# 提交任务(立即执行)
def process(x):
return x ** 2
futures = client.map(process, range(100))
# 收集结果
results = client.gather(futures)
client.close()
关键点:
目的:控制 Dask 任务如何以及在何处执行(线程、进程、分布式)。
何时选择调度器:
参考文档:有关 Dask Schedulers 的全面指导,请参阅 references/schedulers.md,其中包括:
快速示例:
import dask
import dask.dataframe as dd
# 对 DataFrame 使用线程(默认,适用于数值计算)
ddf = dd.read_csv('data.csv')
result1 = ddf.mean().compute() # 使用线程
# 对 Python 密集型工作使用进程
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(python_function).compute(scheduler='processes')
# 对调试使用同步
dask.config.set(scheduler='synchronous')
result3 = problematic_computation.compute() # 可以使用 pdb
# 对监控和扩展使用分布式
from dask.distributed import Client
client = Client()
result4 = computation.compute() # 使用带仪表板的分布式
关键点:
有关全面的性能优化指导、内存管理策略以及需要避免的常见陷阱,请参阅 references/best-practices.md。关键原则包括:
在使用 Dask 之前,请探索:
1. 不要在本地加载数据然后交给 Dask
# 错误:首先将所有数据加载到内存中
import pandas as pd
df = pd.read_csv('large.csv')
ddf = dd.from_pandas(df, npartitions=10)
# 正确:让 Dask 处理加载
import dask.dataframe as dd
ddf = dd.read_csv('large.csv')
2. 避免重复的 compute() 调用
# 错误:每次 compute 都是独立的
for item in items:
result = dask_computation(item).compute()
# 正确:所有计算一次完成
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations)
3. 不要构建过大的任务图
map_partitions/map_blocks 来融合操作len(ddf.__dask_graph__())4. 选择合适的分块大小
5. 使用仪表板
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # 监控性能,识别瓶颈
import dask.dataframe as dd
# 提取:读取数据
ddf = dd.read_csv('raw_data/*.csv')
# 转换:清理和处理
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])
# 加载:聚合并保存
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean']})
summary.to_parquet('output/summary.parquet')
import dask.bag as db
import json
# 从 Bag 开始处理非结构化数据
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')
# 转换为 DataFrame 进行结构化分析
ddf = bag.to_dataframe()
result = ddf.groupby('category').mean().compute()
import dask.array as da
# 加载或创建大数组
x = da.from_zarr('large_dataset.zarr')
# 分块处理
normalized = (x - x.mean()) / x.std()
# 保存结果
da.to_zarr(normalized, 'normalized.zarr')
from dask.distributed import Client
client = Client()
# 一次性分散大型数据集
data = client.scatter(large_dataset)
# 并行处理,具有依赖关系
futures = []
for param in parameters:
future = client.submit(process, data, param)
futures.append(future)
# 收集结果
results = client.gather(futures)
使用此决策指南选择适当的 Dask 组件:
数据类型:
操作类型:
控制级别:
工作流类型:
# Bag → DataFrame
ddf = bag.to_dataframe()
# DataFrame → Array(用于数值数据)
arr = ddf.to_dask_array(lengths=True)
# Array → DataFrame
ddf = dd.from_dask_array(arr, columns=['col1', 'col2'])
使用同步调度器在小数据上测试:
dask.config.set(scheduler='synchronous')
result = computation.compute() # 可以使用 pdb,易于调试
在样本上使用线程验证:
sample = ddf.head(1000) # 小样本
# 测试逻辑,然后扩展到完整数据集
使用分布式进行扩展和监控:
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # 监控性能
result = computation.compute()
内存错误:
persist() 并在完成后删除启动缓慢:
map_partitions 或 map_blocks 来减少任务数量并行化效果差:
所有参考文档文件都可以根据需要读取以获取详细信息:
references/dataframes.md - 完整的 Dask DataFrame 指南references/arrays.md - 完整的 Dask Array 指南references/bags.md - 完整的 Dask Bag 指南references/futures.md - 完整的 Dask Futures 和分布式计算指南references/schedulers.md - 完整的调度器选择和配置指南references/best-practices.md - 全面的性能优化和故障排除指南当用户需要关于特定 Dask 组件、操作或模式的详细信息,超出此处提供的快速指导时,请加载这些文件。
每周安装量
116
仓库
GitHub 星标数
22.6K
首次出现
2026年1月21日
安全审计
安装于
claude-code102
opencode91
cursor84
gemini-cli83
antigravity80
codex75
Dask is a Python library for parallel and distributed computing that enables three critical capabilities:
Dask scales from laptops (processing ~100 GiB) to clusters (processing ~100 TiB) while maintaining familiar Python APIs.
This skill should be used when:
Dask provides five main components, each suited to different use cases:
Purpose : Scale pandas operations to larger datasets through parallel processing.
When to Use :
Reference Documentation : For comprehensive guidance on Dask DataFrames, refer to references/dataframes.md which includes:
map_partitionsQuick Example :
import dask.dataframe as dd
# Read multiple files as single DataFrame
ddf = dd.read_csv('data/2024-*.csv')
# Operations are lazy until compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').mean().compute()
Key Points :
.compute() calledmap_partitions for efficient custom operationsPurpose : Extend NumPy capabilities to datasets larger than memory using blocked algorithms.
When to Use :
Reference Documentation : For comprehensive guidance on Dask Arrays, refer to references/arrays.md which includes:
map_blocksQuick Example :
import dask.array as da
# Create large array with chunks
x = da.random.random((100000, 100000), chunks=(10000, 10000))
# Operations are lazy
y = x + 100
z = y.mean(axis=0)
# Compute result
result = z.compute()
Key Points :
map_blocks for operations not available in DaskPurpose : Process unstructured or semi-structured data (text, JSON, logs) with functional operations.
When to Use :
Reference Documentation : For comprehensive guidance on Dask Bags, refer to references/bags.md which includes:
Quick Example :
import dask.bag as db
import json
# Read and parse JSON files
bag = db.read_text('logs/*.json').map(json.loads)
# Filter and transform
valid = bag.filter(lambda x: x['status'] == 'valid')
processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})
# Convert to DataFrame for analysis
ddf = processed.to_dataframe()
Key Points :
foldby instead of groupby for better performancePurpose : Build custom parallel workflows with fine-grained control over task execution and dependencies.
When to Use :
Reference Documentation : For comprehensive guidance on Dask Futures, refer to references/futures.md which includes:
Quick Example :
from dask.distributed import Client
client = Client() # Create local cluster
# Submit tasks (executes immediately)
def process(x):
return x ** 2
futures = client.map(process, range(100))
# Gather results
results = client.gather(futures)
client.close()
Key Points :
Purpose : Control how and where Dask tasks execute (threads, processes, distributed).
When to Choose Scheduler :
Reference Documentation : For comprehensive guidance on Dask Schedulers, refer to references/schedulers.md which includes:
Quick Example :
import dask
import dask.dataframe as dd
# Use threads for DataFrame (default, good for numeric)
ddf = dd.read_csv('data.csv')
result1 = ddf.mean().compute() # Uses threads
# Use processes for Python-heavy work
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(python_function).compute(scheduler='processes')
# Use synchronous for debugging
dask.config.set(scheduler='synchronous')
result3 = problematic_computation.compute() # Can use pdb
# Use distributed for monitoring and scaling
from dask.distributed import Client
client = Client()
result4 = computation.compute() # Uses distributed with dashboard
Key Points :
For comprehensive performance optimization guidance, memory management strategies, and common pitfalls to avoid, refer to references/best-practices.md. Key principles include:
Before using Dask, explore:
1. Don't Load Data Locally Then Hand to Dask
# Wrong: Loads all data in memory first
import pandas as pd
df = pd.read_csv('large.csv')
ddf = dd.from_pandas(df, npartitions=10)
# Correct: Let Dask handle loading
import dask.dataframe as dd
ddf = dd.read_csv('large.csv')
2. Avoid Repeated compute() Calls
# Wrong: Each compute is separate
for item in items:
result = dask_computation(item).compute()
# Correct: Single compute for all
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations)
3. Don't Build Excessively Large Task Graphs
map_partitions/map_blocks to fuse operationslen(ddf.__dask_graph__())4. Choose Appropriate Chunk Sizes
5. Use the Dashboard
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # Monitor performance, identify bottlenecks
import dask.dataframe as dd
# Extract: Read data
ddf = dd.read_csv('raw_data/*.csv')
# Transform: Clean and process
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])
# Load: Aggregate and save
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean']})
summary.to_parquet('output/summary.parquet')
import dask.bag as db
import json
# Start with Bag for unstructured data
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')
# Convert to DataFrame for structured analysis
ddf = bag.to_dataframe()
result = ddf.groupby('category').mean().compute()
import dask.array as da
# Load or create large array
x = da.from_zarr('large_dataset.zarr')
# Process in chunks
normalized = (x - x.mean()) / x.std()
# Save result
da.to_zarr(normalized, 'normalized.zarr')
from dask.distributed import Client
client = Client()
# Scatter large dataset once
data = client.scatter(large_dataset)
# Process in parallel with dependencies
futures = []
for param in parameters:
future = client.submit(process, data, param)
futures.append(future)
# Gather results
results = client.gather(futures)
Use this decision guide to choose the appropriate Dask component:
Data Type :
Operation Type :
Control Level :
Workflow Type :
# Bag → DataFrame
ddf = bag.to_dataframe()
# DataFrame → Array (for numeric data)
arr = ddf.to_dask_array(lengths=True)
# Array → DataFrame
ddf = dd.from_dask_array(arr, columns=['col1', 'col2'])
dask.config.set(scheduler='synchronous')
result = computation.compute() # Can use pdb, easy debugging
2. Validate with threads on sample :
sample = ddf.head(1000) # Small sample
# Test logic, then scale to full dataset
3. Scale with distributed for monitoring :
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # Monitor performance
result = computation.compute()
Memory Errors :
persist() strategically and delete when doneSlow Start :
map_partitions or map_blocks to reduce tasksPoor Parallelization :
All reference documentation files can be read as needed for detailed information:
references/dataframes.md - Complete Dask DataFrame guidereferences/arrays.md - Complete Dask Array guidereferences/bags.md - Complete Dask Bag guidereferences/futures.md - Complete Dask Futures and distributed computing guidereferences/schedulers.md - Complete scheduler selection and configuration guidereferences/best-practices.md - Comprehensive performance optimization and troubleshootingLoad these files when users need detailed information about specific Dask components, operations, or patterns beyond the quick guidance provided here.
Weekly Installs
116
Repository
GitHub Stars
22.6K
First Seen
Jan 21, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
claude-code102
opencode91
cursor84
gemini-cli83
antigravity80
codex75
FastAPI官方技能:Python Web开发最佳实践与CLI工具使用指南
1,000 周安装
Genkit JS 开发指南:AI 应用构建、错误排查与最佳实践
134 周安装
Azure 可观测性服务指南:Monitor、App Insights、Log Analytics 监控与 KQL 查询
102,200 周安装
Azure 配额管理指南:服务限制、容量验证与配额增加方法
105,700 周安装
Microsoft Entra应用注册指南:Azure AD应用配置、API权限与OAuth流程详解
103,100 周安装
Azure 验证工具 - Microsoft GitHub Copilot for Azure 部署前检查指南
103,100 周安装
Azure资源可视化工具 - 自动生成架构图,分析资源依赖关系
103,100 周安装