ray-train by orchestra-research/ai-research-skills
npx skills add https://github.com/orchestra-research/ai-research-skills --skill ray-trainRay Train 能以最少的代码改动,将机器学习训练从单 GPU 扩展到多节点集群。
安装:
pip install -U "ray[train]"
基础 PyTorch 训练(单节点):
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
# 定义训练函数
def train_func(config):
# 你的常规 PyTorch 代码
model = nn.Linear(10, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 为分布式训练做准备(Ray 处理设备放置)
model = train.torch.prepare_model(model)
for epoch in range(10):
# 你的训练循环
output = model(torch.randn(32, 10))
loss = output.sum()
loss.backward()
optimizer.step()
optimizer.zero_grad()
# 报告指标(自动记录)
train.report({"loss": loss.item(), "epoch": epoch})
# 运行分布式训练
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=4, # 4 个 GPU/工作进程
use_gpu=True
)
)
result = trainer.fit()
print(f"Final loss: {result.metrics['loss']}")
就这样! Ray 处理以下事项:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
原始单 GPU 代码:
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(epochs):
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
Ray Train 版本(可扩展到多 GPU/多节点):
from ray.train.torch import TorchTrainer
from ray import train
def train_func(config):
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
# 为分布式训练做准备(自动设备放置)
model = train.torch.prepare_model(model)
dataloader = train.torch.prepare_data_loader(dataloader)
for epoch in range(epochs):
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
# 报告指标
train.report({"loss": loss.item()})
# 扩展到 8 个 GPU
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
trainer.fit()
优势:同一份代码可在 1 个 GPU 或 1000 个 GPU 上运行
from ray.train.huggingface import TransformersTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
def train_func(config):
# 加载模型和分词器
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
# 训练参数(HuggingFace API)
training_args = TrainingArguments(
output_dir="./output",
num_train_epochs=3,
per_device_train_batch_size=8,
learning_rate=2e-5,
)
# Ray 自动处理分布式训练
from transformers import Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
)
trainer.train()
# 扩展到多节点(2 节点 × 8 GPU = 16 个工作进程)
trainer = TransformersTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=16,
use_gpu=True,
resources_per_worker={"GPU": 1}
)
)
result = trainer.fit()
from ray import tune
from ray.train.torch import TorchTrainer
from ray.tune.schedulers import ASHAScheduler
def train_func(config):
# 使用配置中的超参数
lr = config["lr"]
batch_size = config["batch_size"]
model = MyModel()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
model = train.torch.prepare_model(model)
for epoch in range(10):
# 训练循环
loss = train_epoch(model, optimizer, batch_size)
train.report({"loss": loss, "epoch": epoch})
# 定义搜索空间
param_space = {
"lr": tune.loguniform(1e-5, 1e-2),
"batch_size": tune.choice([16, 32, 64, 128])
}
# 运行 20 次试验,支持早停
tuner = tune.Tuner(
TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
),
param_space=param_space,
tune_config=tune.TuneConfig(
num_samples=20,
scheduler=ASHAScheduler(metric="loss", mode="min")
)
)
results = tuner.fit()
best = results.get_best_result(metric="loss", mode="min")
print(f"Best hyperparameters: {best.config}")
结果:在集群上进行分布式超参数搜索
from ray import train
from ray.train import Checkpoint
def train_func(config):
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
# 尝试从检查点恢复
checkpoint = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
state = torch.load(f"{checkpoint_dir}/model.pt")
model.load_state_dict(state["model"])
optimizer.load_state_dict(state["optimizer"])
start_epoch = state["epoch"]
else:
start_epoch = 0
model = train.torch.prepare_model(model)
for epoch in range(start_epoch, 100):
loss = train_epoch(model, optimizer)
# 每 10 个周期保存一次检查点
if epoch % 10 == 0:
checkpoint = Checkpoint.from_directory(
train.get_context().get_trial_dir()
)
torch.save({
"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
"epoch": epoch
}, checkpoint.path / "model.pt")
train.report({"loss": loss}, checkpoint=checkpoint)
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
# 如果训练失败,自动从检查点恢复
result = trainer.fit()
from ray.train import ScalingConfig
# 连接到 Ray 集群
ray.init(address="auto") # 或者 ray.init("ray://head-node:10001")
# 跨 4 个节点 × 8 个 GPU = 32 个工作进程进行训练
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=32,
use_gpu=True,
resources_per_worker={"GPU": 1, "CPU": 4},
placement_strategy="SPREAD" # 跨节点分散
)
)
result = trainer.fit()
启动 Ray 集群:
# 在头节点上
ray start --head --port=6379
# 在工作节点上
ray start --address=<head-node-ip>:6379
在以下情况使用 Ray Train:
主要优势:
在以下情况使用替代方案:
问题:Ray 集群无法连接
检查 ray 状态:
ray status
# 应显示:
# - 节点数:4
# - GPU 数:32
# - 工作进程:就绪
如果未连接:
# 重启头节点
ray stop
ray start --head --port=6379 --dashboard-host=0.0.0.0
# 重启工作节点
ray stop
ray start --address=<head-ip>:6379
问题:内存不足
减少工作进程数量或使用梯度累积:
scaling_config=ScalingConfig(
num_workers=4, # 从 8 减少
use_gpu=True
)
# 在 train_func 中,累积梯度
for i, batch in enumerate(dataloader):
loss = model(batch) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
问题:训练速度慢
检查数据加载是否为瓶颈:
import time
def train_func(config):
for epoch in range(epochs):
start = time.time()
for batch in dataloader:
data_time = time.time() - start
# 训练...
start = time.time()
print(f"数据加载耗时:{data_time:.3f}秒")
如果数据加载慢,增加工作进程数:
dataloader = DataLoader(dataset, num_workers=8)
多节点设置:有关在 AWS、GCP、Kubernetes 和 SLURM 上部署 Ray 集群的信息,请参阅 references/multi-node.md。
超参数调优:有关 Ray Tune 集成、搜索算法(Optuna、HyperOpt)和基于种群的训练的信息,请参阅 references/hyperparameter-tuning.md。
自定义训练循环:有关高级 Ray Train 用法、自定义后端以及与其他框架集成的信息,请参阅 references/custom-loops.md。
支持的加速器:
每周安装量
81
代码仓库
GitHub 星标数
5.6K
首次出现
2026年2月7日
安全审计
安装于
codex70
opencode70
gemini-cli68
cursor68
github-copilot67
kimi-cli63
Ray Train scales machine learning training from single GPU to multi-node clusters with minimal code changes.
Installation :
pip install -U "ray[train]"
Basic PyTorch training (single node):
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
# Define training function
def train_func(config):
# Your normal PyTorch code
model = nn.Linear(10, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# Prepare for distributed (Ray handles device placement)
model = train.torch.prepare_model(model)
for epoch in range(10):
# Your training loop
output = model(torch.randn(32, 10))
loss = output.sum()
loss.backward()
optimizer.step()
optimizer.zero_grad()
# Report metrics (logged automatically)
train.report({"loss": loss.item(), "epoch": epoch})
# Run distributed training
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=4, # 4 GPUs/workers
use_gpu=True
)
)
result = trainer.fit()
print(f"Final loss: {result.metrics['loss']}")
That's it! Ray handles:
Original single-GPU code :
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(epochs):
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
Ray Train version (scales to multi-GPU/multi-node):
from ray.train.torch import TorchTrainer
from ray import train
def train_func(config):
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
# Prepare for distributed (automatic device placement)
model = train.torch.prepare_model(model)
dataloader = train.torch.prepare_data_loader(dataloader)
for epoch in range(epochs):
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
# Report metrics
train.report({"loss": loss.item()})
# Scale to 8 GPUs
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
trainer.fit()
Benefits : Same code runs on 1 GPU or 1000 GPUs
from ray.train.huggingface import TransformersTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
def train_func(config):
# Load model and tokenizer
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
# Training arguments (HuggingFace API)
training_args = TrainingArguments(
output_dir="./output",
num_train_epochs=3,
per_device_train_batch_size=8,
learning_rate=2e-5,
)
# Ray automatically handles distributed training
from transformers import Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
)
trainer.train()
# Scale to multi-node (2 nodes × 8 GPUs = 16 workers)
trainer = TransformersTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=16,
use_gpu=True,
resources_per_worker={"GPU": 1}
)
)
result = trainer.fit()
from ray import tune
from ray.train.torch import TorchTrainer
from ray.tune.schedulers import ASHAScheduler
def train_func(config):
# Use hyperparameters from config
lr = config["lr"]
batch_size = config["batch_size"]
model = MyModel()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
model = train.torch.prepare_model(model)
for epoch in range(10):
# Training loop
loss = train_epoch(model, optimizer, batch_size)
train.report({"loss": loss, "epoch": epoch})
# Define search space
param_space = {
"lr": tune.loguniform(1e-5, 1e-2),
"batch_size": tune.choice([16, 32, 64, 128])
}
# Run 20 trials with early stopping
tuner = tune.Tuner(
TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
),
param_space=param_space,
tune_config=tune.TuneConfig(
num_samples=20,
scheduler=ASHAScheduler(metric="loss", mode="min")
)
)
results = tuner.fit()
best = results.get_best_result(metric="loss", mode="min")
print(f"Best hyperparameters: {best.config}")
Result : Distributed hyperparameter search across cluster
from ray import train
from ray.train import Checkpoint
def train_func(config):
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
# Try to resume from checkpoint
checkpoint = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
state = torch.load(f"{checkpoint_dir}/model.pt")
model.load_state_dict(state["model"])
optimizer.load_state_dict(state["optimizer"])
start_epoch = state["epoch"]
else:
start_epoch = 0
model = train.torch.prepare_model(model)
for epoch in range(start_epoch, 100):
loss = train_epoch(model, optimizer)
# Save checkpoint every 10 epochs
if epoch % 10 == 0:
checkpoint = Checkpoint.from_directory(
train.get_context().get_trial_dir()
)
torch.save({
"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
"epoch": epoch
}, checkpoint.path / "model.pt")
train.report({"loss": loss}, checkpoint=checkpoint)
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
# Automatically resumes from checkpoint if training fails
result = trainer.fit()
from ray.train import ScalingConfig
# Connect to Ray cluster
ray.init(address="auto") # Or ray.init("ray://head-node:10001")
# Train across 4 nodes × 8 GPUs = 32 workers
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=32,
use_gpu=True,
resources_per_worker={"GPU": 1, "CPU": 4},
placement_strategy="SPREAD" # Spread across nodes
)
)
result = trainer.fit()
Launch Ray cluster :
# On head node
ray start --head --port=6379
# On worker nodes
ray start --address=<head-node-ip>:6379
Use Ray Train when :
Key advantages :
Use alternatives instead :
Issue: Ray cluster not connecting
Check ray status:
ray status
# Should show:
# - Nodes: 4
# - GPUs: 32
# - Workers: Ready
If not connected:
# Restart head node
ray stop
ray start --head --port=6379 --dashboard-host=0.0.0.0
# Restart worker nodes
ray stop
ray start --address=<head-ip>:6379
Issue: Out of memory
Reduce workers or use gradient accumulation:
scaling_config=ScalingConfig(
num_workers=4, # Reduce from 8
use_gpu=True
)
# In train_func, accumulate gradients
for i, batch in enumerate(dataloader):
loss = model(batch) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
Issue: Slow training
Check if data loading is bottleneck:
import time
def train_func(config):
for epoch in range(epochs):
start = time.time()
for batch in dataloader:
data_time = time.time() - start
# Train...
start = time.time()
print(f"Data loading: {data_time:.3f}s")
If data loading is slow, increase workers:
dataloader = DataLoader(dataset, num_workers=8)
Multi-node setup : See references/multi-node.md for Ray cluster deployment on AWS, GCP, Kubernetes, and SLURM.
Hyperparameter tuning : See references/hyperparameter-tuning.md for Ray Tune integration, search algorithms (Optuna, HyperOpt), and population-based training.
Custom training loops : See references/custom-loops.md for advanced Ray Train usage, custom backends, and integration with other frameworks.
Supported accelerators :
Weekly Installs
81
Repository
GitHub Stars
5.6K
First Seen
Feb 7, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex70
opencode70
gemini-cli68
cursor68
github-copilot67
kimi-cli63
AI 代码实施计划编写技能 | 自动化开发任务分解与 TDD 流程规划工具
50,900 周安装
Datadog自动化监控:通过Rube MCP与Composio实现指标、日志、仪表板管理
69 周安装
Intercom自动化指南:通过Rube MCP与Composio实现客户支持对话管理
69 周安装
二进制初步分析指南:使用ReVa工具快速识别恶意软件与逆向工程
69 周安装
PrivateInvestigator 道德人员查找工具 | 公开数据调查、反向搜索与背景研究
69 周安装
TorchTitan:PyTorch原生分布式大语言模型预训练平台,支持4D并行与H100 GPU加速
69 周安装
screenshot 截图技能:跨平台桌面截图工具,支持macOS/Linux权限管理与多模式捕获
69 周安装