ray-data by orchestra-research/ai-research-skills
npx skills add https://github.com/orchestra-research/ai-research-skills --skill ray-data用于机器学习和人工智能工作负载的分布式数据处理库。
在以下情况下使用 Ray Data:
关键特性:
使用替代方案的情况:
pip install -U 'ray[data]'
import ray
# 读取 Parquet 文件
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# 转换数据(惰性执行)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
# 消费数据
for batch in ds.iter_batches(batch_size=100):
print(batch)
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# 创建数据集
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config):
# 在训练中访问数据集
train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
for batch in train_ds.iter_batches(batch_size=32):
# 在批次上训练
pass
# 使用 Ray 进行训练
trainer = TorchTrainer(
train_func,
datasets={"train": train_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
import ray
# Parquet(推荐用于机器学习)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")
# JSON
ds = ray.data.read_json("gs://bucket/data/*.json")
# 图像
ds = ray.data.read_images("s3://bucket/images/")
# 从列表
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
# 从范围
ds = ray.data.range(1000000) # 合成数据
# 从 pandas
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
# 批量转换(快速)
def process_batch(batch):
batch["doubled"] = batch["value"] * 2
return batch
ds = ds.map_batches(process_batch, batch_size=1000)
# 逐行处理(较慢)
def process_row(row):
row["squared"] = row["value"] ** 2
return row
ds = ds.map(process_row)
# 过滤行
ds = ds.filter(lambda row: row["value"] > 100)
# 按列分组
ds = ds.groupby("category").count()
# 自定义聚合
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
# 使用 GPU 进行预处理
def preprocess_images_gpu(batch):
import torch
images = torch.tensor(batch["image"]).cuda()
# GPU 预处理
processed = images * 255
return {"processed": processed.cpu().numpy()}
ds = ds.map_batches(
preprocess_images_gpu,
batch_size=64,
num_gpus=1 # 请求 GPU
)
# 写入 Parquet
ds.write_parquet("s3://bucket/output/")
# 写入 CSV
ds.write_csv("output/")
# 写入 JSON
ds.write_json("output/")
# 控制并行度
ds = ds.repartition(100) # 100 个块用于 100 核集群
# 更大的批次 = 更快的向量化操作
ds.map_batches(process_fn, batch_size=10000) # 对比 batch_size=100
# 处理大于内存的数据
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
process(batch) # 流式处理,不加载到内存
import ray
# 加载模型
def load_model():
# 每个工作器加载一次
return MyModel()
# 推理函数
class BatchInference:
def __init__(self):
self.model = load_model()
def __call__(self, batch):
predictions = self.model(batch["input"])
return {"prediction": predictions}
# 运行分布式推理
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")
# 多步骤流水线
ds = (
ray.data.read_parquet("s3://raw/")
.map_batches(clean_data)
.map_batches(tokenize)
.map_batches(augment)
.write_parquet("s3://processed/")
)
# 转换为 PyTorch
torch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds:
# batch 是包含张量的字典
inputs, labels = batch["features"], batch["label"]
# 转换为 TensorFlow
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds:
# 训练模型
pass
| 格式 | 读取 | 写入 | 使用场景 |
|---|---|---|---|
| Parquet | ✅ | ✅ | 机器学习数据(推荐) |
| CSV | ✅ | ✅ | 表格数据 |
| JSON | ✅ | ✅ | 半结构化数据 |
| 图像 | ✅ | ❌ | 计算机视觉 |
| NumPy | ✅ | ✅ | 数组 |
| Pandas | ✅ | ❌ | 数据框 |
扩展性(处理 100GB 数据):
GPU 加速(图像预处理):
生产部署:
每周安装量
74
代码库
GitHub 星标数
5.6K
首次出现
2026年2月7日
安全审计
安装于
opencode65
codex64
gemini-cli63
cursor63
github-copilot62
kimi-cli58
Distributed data processing library for ML and AI workloads.
Use Ray Data when:
Key features :
Use alternatives instead :
pip install -U 'ray[data]'
import ray
# Read Parquet files
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# Transform data (lazy execution)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
# Consume data
for batch in ds.iter_batches(batch_size=100):
print(batch)
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# Create dataset
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config):
# Access dataset in training
train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
for batch in train_ds.iter_batches(batch_size=32):
# Train on batch
pass
# Train with Ray
trainer = TorchTrainer(
train_func,
datasets={"train": train_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
import ray
# Parquet (recommended for ML)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")
# JSON
ds = ray.data.read_json("gs://bucket/data/*.json")
# Images
ds = ray.data.read_images("s3://bucket/images/")
# From list
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
# From range
ds = ray.data.range(1000000) # Synthetic data
# From pandas
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
# Batch transformation (fast)
def process_batch(batch):
batch["doubled"] = batch["value"] * 2
return batch
ds = ds.map_batches(process_batch, batch_size=1000)
# Row-by-row (slower)
def process_row(row):
row["squared"] = row["value"] ** 2
return row
ds = ds.map(process_row)
# Filter rows
ds = ds.filter(lambda row: row["value"] > 100)
# Group by column
ds = ds.groupby("category").count()
# Custom aggregation
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
# Use GPU for preprocessing
def preprocess_images_gpu(batch):
import torch
images = torch.tensor(batch["image"]).cuda()
# GPU preprocessing
processed = images * 255
return {"processed": processed.cpu().numpy()}
ds = ds.map_batches(
preprocess_images_gpu,
batch_size=64,
num_gpus=1 # Request GPU
)
# Write to Parquet
ds.write_parquet("s3://bucket/output/")
# Write to CSV
ds.write_csv("output/")
# Write to JSON
ds.write_json("output/")
# Control parallelism
ds = ds.repartition(100) # 100 blocks for 100-core cluster
# Larger batches = faster vectorized ops
ds.map_batches(process_fn, batch_size=10000) # vs batch_size=100
# Process data larger than memory
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
process(batch) # Streamed, not loaded to memory
import ray
# Load model
def load_model():
# Load once per worker
return MyModel()
# Inference function
class BatchInference:
def __init__(self):
self.model = load_model()
def __call__(self, batch):
predictions = self.model(batch["input"])
return {"prediction": predictions}
# Run distributed inference
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")
# Multi-step pipeline
ds = (
ray.data.read_parquet("s3://raw/")
.map_batches(clean_data)
.map_batches(tokenize)
.map_batches(augment)
.write_parquet("s3://processed/")
)
# Convert to PyTorch
torch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds:
# batch is dict with tensors
inputs, labels = batch["features"], batch["label"]
# Convert to TensorFlow
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds:
# Train model
pass
| Format | Read | Write | Use Case |
|---|---|---|---|
| Parquet | ✅ | ✅ | ML data (recommended) |
| CSV | ✅ | ✅ | Tabular data |
| JSON | ✅ | ✅ | Semi-structured |
| Images | ✅ | ❌ | Computer vision |
| NumPy | ✅ | ✅ | Arrays |
| Pandas | ✅ | ❌ | DataFrames |
Scaling (processing 100GB data):
GPU acceleration (image preprocessing):
Production deployments :
Weekly Installs
74
Repository
GitHub Stars
5.6K
First Seen
Feb 7, 2026
Security Audits
Gen Agent Trust HubWarnSocketPassSnykWarn
Installed on
opencode65
codex64
gemini-cli63
cursor63
github-copilot62
kimi-cli58
AI 代码实施计划编写技能 | 自动化开发任务分解与 TDD 流程规划工具
50,900 周安装