pytorch-fsdp by davila7/claude-code-templates
npx skills add https://github.com/davila7/claude-code-templates --skill pytorch-fsdp提供关于 pytorch-fsdp 开发的全面帮助,内容源自官方文档。
在以下情况时应触发此技能:
模式 1: 通用 Join 上下文管理器# 创建于:2025年6月6日 | 最后更新于:2025年6月6日 通用 Join 上下文管理器为不均匀输入上的分布式训练提供了便利。本页概述了相关类的 API:Join、Joinable 和 JoinHook。有关教程,请参阅使用 Join 上下文管理器进行不均匀输入的分布式训练。 class torch.distributed.algorithms.Join(joinables, enable=True, throw_on_early_termination=False, **kwargs)[source]# 此类定义了通用的 Join 上下文管理器,允许在进程加入后调用自定义钩子。这些钩子应屏蔽未加入进程的集体通信,以防止挂起和出错,并确保算法正确性。有关钩子定义的详细信息,请参阅 JoinHook。 警告 上下文管理器要求每个参与的 Joinable 在其自身的每次迭代集体通信之前调用方法 notify_join_context() 以确保正确性。 警告 上下文管理器要求 JoinHook 对象中的所有 process_group 属性都相同。如果有多个 JoinHook 对象,则使用第一个对象的设备。进程组和设备信息用于检查未加入的进程,并在启用 throw_on_early_termination 时通知进程抛出异常,这两者都使用 all-reduce 操作。 参数 joinables (List[Joinable]) – 参与的 Joinable 列表;它们的钩子按给定顺序迭代。 enable (bool) – 启用不均匀输入检测的标志;设置为 False 将禁用上下文管理器的功能,应仅在用户知道输入不会不均匀时设置(默认值:True)。 throw_on_early_termination (bool) – 控制检测到不均匀输入时是否抛出异常的标志(默认值:False)。 示例: >>> import os >>> import torch >>> import torch.distributed as dist >>> import torch.multiprocessing as mp >>> import torch.nn.parallel.DistributedDataParallel as DDP >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO >>> from torch.distributed.algorithms.join import Join >>> >>> # 在每个生成的 worker 上 >>> def worker(rank): >>> dist.init_process_group("nccl", rank=rank, world_size=2) >>> model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank]) >>> optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01) >>> # Rank 1 比 rank 0 多获得一个输入 >>> inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)] >>> with Join([model, optim]): >>> for input in inputs: >>> loss = model(input).sum() >>> loss.backward() >>> optim.step() >>> # 所有 rank 都能到达此处,不会挂起/出错 static notify_join_context(joinable)[source]# 通知 Join 上下文管理器调用进程尚未加入。然后,如果 throw_on_early_termination=True,则检查是否已检测到不均匀输入(即是否已有进程加入),如果是则抛出异常。此方法应在 Joinable 对象执行其每次迭代的集体通信之前调用。例如,应在 DistributedDataParallel 的前向传播开始时调用。只有传递给上下文管理器的第一个 Joinable 对象会在此方法中执行集体通信,对于其他对象,此方法是空操作。 参数 joinable (Joinable) – 调用此方法的 Joinable 对象。 返回 一个异步工作句柄,用于 all-reduce 操作,旨在通知上下文管理器该进程尚未加入(如果 joinable 是传递给上下文管理器的第一个对象);否则返回 None。 class torch.distributed.algorithms.Joinable[source]# 这为可加入类定义了一个抽象基类。一个可加入类(继承自 Joinable)应实现 join_hook() 方法,该方法返回一个 JoinHook 实例,此外还应实现 join_device() 和 join_process_group() 方法,分别返回设备和进程组信息。 abstract property join_device: device# 返回用于执行 Join 上下文管理器所需的集体通信的设备。 abstract join_hook(**kwargs)[source]# 返回给定 Joinable 的 JoinHook 实例。 参数 kwargs (dict) – 一个字典,包含用于在运行时修改 Join 钩子行为的任何关键字参数;共享同一 Join 上下文管理器的所有 Joinable 实例都会收到相同的 kwargs 值。 返回类型 JoinHook abstract property join_process_group: Any# 返回 Join 上下文管理器本身所需的集体通信的进程组。 class torch.distributed.algorithms.JoinHook[source]# 这定义了一个 Join 钩子,它在 Join 上下文管理器中提供两个入口点。 入口点:一个主钩子,在存在未加入进程时重复调用;一个后钩子,在所有进程都加入后调用一次。要为通用 Join 上下文管理器实现一个 Join 钩子,请定义一个继承自 JoinHook 的类,并根据需要重写 main_hook() 和 post_hook()。 main_hook()[source]# 当存在未加入进程时调用此钩子,以屏蔽训练迭代中的集体通信。 训练迭代 即,一次前向传播、反向传播和优化器步骤。 post_hook(is_last_joiner)[source]# 在所有进程都加入后调用钩子。它被传递一个额外的布尔参数 is_last_joiner,指示该 rank 是否是最后加入的进程之一。 参数 is_last_joiner (bool) – 如果该 rank 是最后加入的进程之一,则为 True;否则为 False。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
Join
模式 2: 分布式通信包 - torch.distributed# 创建于:2017年7月12日 | 最后更新于:2025年9月4日 注意 有关分布式训练所有特性的简要介绍,请参阅 PyTorch 分布式概述。 后端# torch.distributed 支持四个内置后端,每个后端具有不同的功能。下表显示了每个后端可用于 CPU 或 GPU 的功能。对于 NCCL,GPU 指的是 CUDA GPU,而对于 XCCL 则指 XPU GPU。MPI 仅在用于构建 PyTorch 的实现支持 CUDA 时才支持 CUDA。 后端 gloo mpi nccl xccl 设备 CPU GPU CPU GPU CPU GPU CPU GPU send ✓ ✘ ✓ ? ✘ ✓ ✘ ✓ recv ✓ ✘ ✓ ? ✘ ✓ ✘ ✓ broadcast ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ all_reduce ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ reduce ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ all_gather ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ gather ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ scatter ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ reduce_scatter ✓ ✓ ✘ ✘ ✘ ✓ ✘ ✓ all_to_all ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ barrier ✓ ✘ ✓ ? ✘ ✓ ✘ ✓ 随 PyTorch 提供的后端# PyTorch 分布式包支持 Linux(稳定)、MacOS(稳定)和 Windows(原型)。对于 Linux,默认情况下会构建 Gloo 和 NCCL 后端并包含在 PyTorch 分布式中(NCCL 仅在用 CUDA 构建时包含)。MPI 是一个可选后端,只有在从源代码构建 PyTorch 时才能包含。(例如,在安装了 MPI 的主机上构建 PyTorch。) 注意 从 PyTorch v1.8 开始,Windows 支持除 NCCL 之外的所有集体通信后端,如果 init_process_group() 的 init_method 参数指向一个文件,它必须遵循以下模式: 本地文件系统,init_method="file:///d:/tmp/some_file" 共享文件系统,init_method="file://////{machine_name}/{share_folder_name}/some_file" 与 Linux 平台相同,您可以通过设置环境变量 MASTER_ADDR 和 MASTER_PORT 来启用 TcpStore。 使用哪个后端?# 过去,我们经常被问到:“我应该使用哪个后端?”。 经验法则 对于使用 CUDA GPU 的分布式训练,使用 NCCL 后端。 对于使用 XPU GPU 的分布式训练,使用 XCCL 后端。 对于使用 CPU 的分布式训练,使用 Gloo 后端。 具有 InfiniBand 互连的 GPU 主机 使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端。 具有以太网互连的 GPU 主机 使用 NCCL,因为它目前提供了最佳的分布式 GPU 训练性能,特别是对于多进程单节点或多节点分布式训练。如果遇到任何 NCCL 问题,请使用 Gloo 作为备选方案。(请注意,对于 GPU,Gloo 目前运行速度比 NCCL 慢。) 具有 InfiniBand 互连的 CPU 主机 如果您的 InfiniBand 启用了 IP over IB,请使用 Gloo,否则请使用 MPI。我们计划在即将发布的版本中为 Gloo 添加 InfiniBand 支持。 具有以太网互连的 CPU 主机 使用 Gloo,除非您有特定理由使用 MPI。 常用环境变量# 选择要使用的网络接口# 默认情况下,NCCL 和 Gloo 后端都会尝试找到要使用的正确网络接口。如果自动检测到的接口不正确,可以使用以下环境变量覆盖它(适用于各自的后端): NCCL_SOCKET_IFNAME,例如 export NCCL_SOCKET_IFNAME=eth0 GLOO_SOCKET_IFNAME,例如 export GLOO_SOCKET_IFNAME=eth0 如果使用 Gloo 后端,可以通过逗号分隔指定多个接口,像这样:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3。后端将以轮询方式在这些接口上分发操作。所有进程必须在此变量中指定相同数量的接口。 其他 NCCL 环境变量# 调试 - 如果 NCCL 失败,可以设置 NCCL_DEBUG=INFO 来打印明确的警告消息以及基本的 NCCL 初始化信息。您还可以使用 NCCL_DEBUG_SUBSYS 获取有关 NCCL 特定方面的更多详细信息。例如,NCCL_DEBUG_SUBSYS=COLL 将打印集体调用的日志,这在调试挂起时可能很有帮助,特别是那些由集体类型或消息大小不匹配引起的挂起。在拓扑检测失败的情况下,设置 NCCL_DEBUG_SUBSYS=GRAPH 有助于检查详细的检测结果,并在需要 NCCL 团队进一步帮助时保存为参考。 性能调优 - NCCL 基于其拓扑检测执行自动调优,以节省用户的调优工作。在一些基于套接字的系统上,用户仍可以尝试调优 NCCL_SOCKET_NTHREADS 和 NCCL_NSOCKS_PERTHREAD 以增加套接字网络带宽。这两个环境变量已由 NCCL 为一些云提供商(如 AWS 或 GCP)进行了预调优。有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 官方文档 您可以使用 torch.distributed.ProcessGroupNCCL.NCCLConfig 和 torch.distributed.ProcessGroupNCCL.Options 进一步调优 NCCL 通信器。在解释器中使用 help(例如 help(torch.distributed.ProcessGroupNCCL.NCCLConfig))了解更多信息。 基础# torch.distributed 包为运行在一台或多台机器上的多个计算节点之间的多进程并行提供了 PyTorch 支持和通信原语。类 torch.nn.parallel.DistributedDataParallel() 基于此功能,作为任何 PyTorch 模型的包装器提供同步分布式训练。这与 Multiprocessing 包 - torch.multiprocessing 和 torch.nn.DataParallel() 提供的并行类型不同,因为它支持多个网络连接的机器,并且用户必须为每个进程显式启动主训练脚本的单独副本。在单机同步情况下,torch.distributed 或 torch.nn.parallel.DistributedDataParallel() 包装器相对于其他数据并行方法(包括 torch.nn.DataParallel())可能仍有优势: 每个进程维护自己的优化器,并在每次迭代中执行完整的优化步骤。虽然这看起来是冗余的,因为梯度已经在进程间收集并平均,因此对每个进程都是相同的,但这意味着不需要参数广播步骤,减少了在节点之间传输张量所花费的时间。 每个进程包含一个独立的 Python 解释器,消除了从一个 Python 进程驱动多个执行线程、模型副本或 GPU 所带来的额外解释器开销和“GIL 抖动”。这对于大量使用 Python 运行时的模型尤其重要,包括具有循环层或许多小组件的模型。 初始化# 在调用任何其他方法之前,需要使用 torch.distributed.init_process_group() 或 torch.distributed.device_mesh.init_device_mesh() 函数初始化该包。两者都会阻塞直到所有进程加入。 警告 初始化不是线程安全的。进程组创建应在单个线程中执行,以防止跨 rank 的“UUID”分配不一致,并防止初始化期间的竞争导致挂起。 torch.distributed.is_available()[source]# 如果分布式包可用,则返回 True。否则,torch.distributed 不会公开任何其他 API。目前,torch.distributed 在 Linux、MacOS 和 Windows 上可用。从源代码构建 PyTorch 时,设置 USE_DISTRIBUTED=1 以启用它。目前,Linux 和 Windows 的默认值为 USE_DISTRIBUTED=1,MacOS 的默认值为 USE_DISTRIBUTED=0。 返回类型 bool torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]# 初始化默认的分布式进程组。这也会初始化分布式包。 初始化进程组有两种主要方式: 显式指定 store、rank 和 world_size。 指定 init_method(一个 URL 字符串),指示在哪里/如何发现对等节点。可选地指定 rank 和 world_size,或将所有必需参数编码在 URL 中并省略它们。 如果两者都未指定,则假定 init_method 为 "env://"。 参数 backend (str 或 Backend, 可选) – 要使用的后端。根据构建时配置,有效值包括 mpi、gloo、nccl、ucc、xccl 或由第三方插件注册的后端。从 2.6 开始,如果未提供 backend,c10d 将使用为 device_id 关键字参数(如果提供)指示的设备类型注册的后端。目前已知的默认注册是:cuda 对应 nccl,cpu 对应 gloo,xpu 对应 xccl。如果既未提供 backend 也未提供 device_id,c10d 将检测运行时机器上的加速器,并使用为该检测到的加速器(或 cpu)注册的后端。此字段可以作为小写字符串给出(例如,"gloo"),也可以通过 Backend 属性访问(例如,Backend.GLOO)。如果使用 nccl 后端且每台机器有多个进程,则每个进程必须对其使用的每个 GPU 具有独占访问权限,因为在进程之间共享 GPU 可能导致死锁或 NCCL 无效使用。ucc 后端是实验性的。设备的默认后端可以使用 get_default_backend_for_device() 查询。 init_method (str, 可选) – 指定如何初始化进程组的 URL。如果未指定 init_method 或 store,则默认为 "env://"。与 store 互斥。 world_size (int, 可选) – 参与作业的进程数。如果指定了 store,则为必需。 rank (int, 可选) – 当前进程的 rank(应是一个介于 0 和 world_size-1 之间的数字)。如果指定了 store,则为必需。 store (Store, 可选) – 所有 worker 可访问的键/值存储,用于交换连接/地址信息。与 init_method 互斥。 timeout (timedelta, 可选) – 针对进程组执行的操作的超时时间。NCCL 的默认值为 10 分钟,其他后端的默认值为 30 分钟。这是在此持续时间后,集体操作将异步中止并且进程将崩溃。这样做是因为 CUDA 执行是异步的,继续执行用户代码不再安全,因为失败的异步 NCCL 操作可能导致后续 CUDA 操作在损坏的数据上运行。当设置 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时。 group_name (str, 可选, 已弃用) – 组名。此参数被忽略 pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在特定进程组构造期间需要传递哪些额外选项。截至目前,我们支持的唯一选项是用于 nccl 后端的 ProcessGroupNCCL.Options,可以指定 is_high_priority_stream,以便在有计算内核等待时,nccl 后端可以选择高优先级 cuda 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t device_id (torch.device | int, 可选) – 此进程将使用的单个特定设备,允许进行后端特定的优化。目前这在 NCCL 下有两个效果:立即形成通信器(立即调用 ncclCommInit* 而不是正常的惰性调用),并且子组将在可能时使用 ncclCommSplit 以避免不必要的组创建开销。如果您想尽早知道 NCCL 初始化错误,也可以使用此字段。如果提供 int,API 将假定使用编译时的加速器类型。 注意 要启用 backend == Backend.MPI,需要在支持 MPI 的系统上从源代码构建 PyTorch。 注意 对多个后端的支持是实验性的。目前当未指定后端时,将同时创建 gloo 和 nccl 后端。gloo 后端将用于 CPU 张量的集体操作,nccl 后端将用于 CUDA 张量的集体操作。可以通过传递格式为 "<device_type>:<backend_name>,<device_type>:<backend_name>" 的字符串来指定自定义后端,例如 "cpu:gloo,cuda:custom_backend"。 torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None, backend_override=None)[source]# 基于 device_type、mesh_shape 和 mesh_dim_names 参数初始化 DeviceMesh。这将创建一个具有 n 维数组布局的 DeviceMesh,其中 n 是 mesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度标记为 mesh_dim_names[i]。 注意 init_device_mesh 遵循 SPMD 编程模型,意味着相同的 PyTorch Python 程序在集群中的所有进程/rank 上运行。确保 mesh_shape(描述设备布局的 nD 数组的维度)在所有 rank 上相同。不一致的 mesh_shape 可能导致挂起。 注意 如果未找到进程组,init_device_mesh 将在后台初始化分布式通信所需的分布式进程组/组。 参数 device_type (str) – 网格的设备类型。目前支持:"cpu"、"cuda/cuda-like"、"xpu"。不允许传入带有 GPU 索引的设备类型,例如 "cuda:0"。 mesh_shape (Tuple[int]) – 一个元组,定义描述设备布局的多维数组的维度。 mesh_dim_names (Tuple[str], 可选) – 一个网格维度名称的元组,分配给描述设备布局的多维数组的每个维度。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须是唯一的。 backend_override (Dict[int | str, tuple[str, Options] | str | Options], 可选) – 为每个网格维度将创建的 ProcessGroup 的部分或全部覆盖。每个键可以是维度的索引或其名称(如果提供了 mesh_dim_names)。每个值可以是一个包含后端名称及其选项的元组,或者只是这两个组件之一(在这种情况下,另一个将设置为其默认值)。 返回 一个表示设备布局的 DeviceMesh 对象。 返回类型 DeviceMesh 示例: >>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp")) torch.distributed.is_initialized()[source]# 检查默认进程组是否已初始化。 返回类型 bool torch.distributed.is_mpi_available()[source]# 检查 MPI 后端是否可用。 返回类型 bool torch.distributed.is_nccl_available()[source]# 检查 NCCL 后端是否可用。 返回类型 bool torch.distributed.is_gloo_available()[source]# 检查 Gloo 后端是否可用。 返回类型 bool torch.distributed.distributed_c10d.is_xccl_available()[source]# 检查 XCCL 后端是否可用。 返回类型 bool torch.distributed.is_torchelastic_launched()[source]# 检查此进程是否由 torch.distributed.elastic(又名 torchelastic)启动。TORCHELASTIC_RUN_ID 环境变量的存在被用作代理来确定当前进程是否由 torchelastic 启动。这是一个合理的代理,因为 TORCHELASTIC_RUN_ID 映射到 rendezvous id,该 id 始终是一个非空值,表示用于对等节点发现的作业 id。 返回类型 bool torch.distributed.get_default_backend_for_device(device)[source]# 返回给定设备的默认后端。 参数 device (Union[str, torch.device]) – 要获取默认后端的设备。 返回 给定设备的默认后端,作为小写字符串。 返回类型 str 目前支持三种初始化方法: TCP 初始化# 有两种使用 TCP 初始化的方式,两者都需要一个所有进程都可访问的网络地址和所需的 world_size。第一种方式需要指定一个属于 rank 0 进程的地址。此初始化方法要求所有进程手动指定 rank。注意,最新的分布式包中不再支持多播地址。group_name 也已弃用。 import torch.distributed as dist # 使用其中一台机器的地址 dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4) 共享文件系统初始化# 另一种初始化方法利用组中所有机器共享且可见的文件系统,以及所需的 world_size。URL 应以 file:// 开头,并包含共享文件系统上(现有目录中的)一个不存在的文件的路径。文件系统初始化将自动创建该文件(如果不存在),但不会删除该文件。因此,您有责任确保在下一次对同一文件路径/名称调用 init_process_group() 之前清理该文件。注意,最新的分布式包中不再支持自动 rank 分配,group_name 也已弃用。 警告 此方法假定文件系统支持使用 fcntl 进行锁定 - 大多数本地系统和 NFS 都支持它。 警告 此方法将始终创建文件,并尽力在程序结束时清理和删除文件。换句话说,每次使用文件初始化方法进行初始化都需要一个全新的空文件才能使初始化成功。如果再次使用先前初始化使用的同一文件(碰巧没有被清理),这是意外行为,通常会导致死锁和失败。因此,即使此方法会尽力清理文件,如果自动删除不成功,您有责任确保在训练结束时删除文件,以防止下次再次重用同一文件。如果您计划在同一文件名上多次调用 init_process_group(),这一点尤其重要。换句话说,如果文件未被移除/清理,并且您再次对该文件调用 init_process_group(),预计会出现失败。这里的经验法则是,确保每次调用 init_process_group() 时文件都不存在或为空。 import torch.distributed as dist # 应始终指定 rank dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank) 环境变量初始化# 此方法将从环境变量读取配置,允许完全自定义信息的获取方式。要设置的变量是: MASTER_PORT - 必需;必须是 rank 0 机器上的空闲端口 MASTER_ADDR - 必需(rank 0 除外);rank 0 节点的地址 WORLD_SIZE - 必需;可以在此处设置,也可以在初始化函数调用中设置 RANK - 必需;可以在此处设置,也可以在初始化函数调用中设置 rank 0 的机器将用于建立所有连接。这是默认方法,意味着不必指定 init_method(或者可以是 env://)。 改进初始化时间# TORCH_GLOO_LAZY_INIT - 按需建立连接,而不是使用完整的网格,这可以极大地改善非 all2all 操作的初始化时间。 初始化后# 一旦运行了 torch.distributed.init_process_group(),就可以使用以下函数。要检查进程组是否已初始化,请使用 torch.distributed.is_initialized()。 class torch.distributed.Backend(name)[source]# 一个类似枚举的后端类。可用的后端:GLOO、NCCL、UCC、MPI、XCCL 和其他已注册的后端。此类的值为小写字符串,例如 "gloo"。它们可以作为属性访问,例如 Backend.NCCL。此类可以直接调用来解析字符串,例如 Backend(backend_str) 将检查 backend_str 是否有效,如果有效则返回解析后的小写字符串。它也接受大写字符串,例如 Backend("GLOO") 返回 "gloo"。 注意 条目 Backend.UNDEFINED 存在,但仅用作某些字段的初始值。用户不应直接使用它,也不应假定其存在。 classmethod register_backend(name, func, extended_api=False, devices=None)[source]# 使用给定的名称和实例化函数注册一个新的后端。此方法由第三方 ProcessGroup 扩展用于注册新的后端。 参数 name (str) – ProcessGroup 扩展的后端名称。它应与 init_process_group() 中的名称匹配。 func (function) – 实例化后端的函数处理程序。该函数应在后端扩展中实现,并接受四个参数,包括 store、rank、world_size 和 timeout。 extended_api (bool, 可选) – 后端是否支持扩展的参数结构。默认值:False。如果设置为 True,后端将获得一个 c10d::DistributedBackendOptions 实例,以及一个由后端实现定义的进程组选项对象。 device (str 或 str 列表, 可选) – 此后端支持的设备类型,例如 "cpu"、"cuda" 等。如果为 None,则假定同时支持 "cpu" 和 "cuda" 注意 对第三方后端的支持是实验性的,可能会更改。 torch.distributed.get_backend(group=None)[source]# 返回给定进程组的后端。 参数 group (ProcessGroup, 可选) – 要操作的进程组。默认为通用主进程组。如果指定了另一个特定组,则调用进程必须是该组的一部分。 返回 给定进程组的后端,作为小写字符串。 返回类型 Backend torch.distributed.get_rank(group=None)[source]# 返回当前进程在提供组中的 rank,否则返回默认组的 rank。Rank 是分配给分布式进程组中每个进程的唯一标识符。它们始终是从 0 到 world_size 的连续整数。 参数 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 返回 进程组的 rank -1,如果不是该组的成员 返回类型 int torch.distributed.get_world_size(group=None)[source]# 返回当前进程组中的进程数。 参数 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 返回 进程组的 world size -1,如果不是该组的成员 返回类型 int 关闭# 通过调用 destroy_process_group() 在退出时清理资源非常重要。最简单的模式是在训练脚本中不再需要通信时(通常在 main() 的末尾),通过为 group 参数传递默认值 None 调用 destroy_process_group() 来销毁每个进程组和后台。每个训练器进程应调用一次,而不是在外层进程启动器级别调用。 如果在超时时间内,pg 中的所有 rank 都未调用 destroy_process_group(),特别是当应用程序中有多个进程组时(例如,用于 N 维并行),退出时可能会出现挂起。这是因为 ProcessGroupNCCL 的析构函数调用 ncclCommAbort,该调用必须集体执行,但如果由 python 的 GC 调用,ProcessGroupNCCL 析构函数的调用顺序是不确定的。调用 destroy_process_group() 有助于确保跨 rank 以一致的顺序调用 ncclCommAbort,并避免在 ProcessGroupNCCL 的析构函数期间调用 ncclCommAbort。 重新初始化# destroy_process_group 也可用于销毁单个进程组。一个用例可能是容错训练,其中进程组可能在运行时被销毁然后重新初始化。在这种情况下,在调用 destroy 之后和随后初始化之前,使用 torch.distributed 原语以外的其他方式同步训练器进程至关重要。由于实现此同步的困难,目前不支持/未测试此行为,并被视为一个已知问题。如果这是一个阻碍您的用例,请提交一个 github issue 或 RFC。 组# 默认情况下,集体操作在默认组(也称为 world)上运行,并要求所有进程进入分布式函数调用。然而,一些工作负载可以从更细粒度的通信中受益。这就是分布式组的作用所在。new_group() 函数可用于创建新组,包含所有进程的任意子集。它返回一个不透明的组句柄,可以作为 group 参数提供给所有集体操作(集体操作是以某些众所周知的编程模式交换信息的分布式函数)。 torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source]# 创建一个新的分布式组。此函数要求主组中的所有进程(即分布式作业中的所有进程)都进入此函数,即使它们不会成为该组的成员。此外,所有进程中的组应以相同的顺序创建。 警告 安全并发使用:当使用多个具有 NCCL 后端的进程组时,用户必须确保跨 rank 的集体操作具有全局一致的执行顺序。如果一个进程内的多个线程发出集体操作,则需要显式同步以确保顺序一致。当使用 torch.distributed 通信 API 的异步变体时,会返回一个工作对象,并且通信内核被排队到单独的 CUDA 流上,允许通信和计算重叠。一旦在一个进程组上发出一个或多个异步操作,必须通过调用 work.wait() 与其他 cuda 流同步,然后才能使用另一个进程组。有关更多详细信息,请参阅并发使用多个 NCCL 通信器 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using-multiple-nccl-communicators-concurrently。 参数 ranks (list[int]) – 组成员的 rank 列表。如果为 None,将设置为所有 rank。默认值为 None。 timeout (timedelta, 可选) – 有关详细信息和默认值,请参阅 init_process_group。 backend (str 或 Backend, 可选) – 要使用的后端。根据构建时配置,有效值为 gloo 和 nccl。默认使用与全局组相同的后端。此字段应作为小写字符串给出(例如,"gloo"),也可以通过 Backend 属性访问(例如,Backend.GLOO)。如果传入 None,将使用与默认进程组对应的后端。默认值为 None。 pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在特定进程组构造期间需要传递哪些额外选项。即,对于 nccl 后端,可以指定 is_high_priority_stream,以便进程组可以选择高优先级 cuda 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tuse_local_synchronization (bool, 可选): 在进程组创建结束时执行组本地屏障。这不同之处在于,非成员 rank 不需要调用 API 并且不加入屏障。 group_desc (str, 可选) – 描述进程组的字符串。 device_id (torch.device, 可选) – 此进程要“绑定”到的单个特定设备,如果给出此字段,new_group 调用将尝试立即为该设备初始化通信后端。 返回 一个分布式组的句柄,可以提供给集体调用,或者如果 rank 不是 ranks 的一部分,则返回 GroupMember.NON_GROUP_MEMBER。 注意 use_local_synchronization 不适用于 MPI。 注意 虽然 use_local_synchronization=True 在较大的集群和较小的进程组中可能显著更快,但必须小心,因为它会改变集群行为,因为非成员 rank 不加入组 barrier()。 注意 当每个 rank 创建多个重叠的进程组时,use_local_synchronization=True 可能导致死锁。为避免这种情况,请确保所有 rank 遵循相同的全局创建顺序。 torch.distributed.get_group_rank(group, global_rank)[source]# 将全局 rank 转换为组 rank。global_rank 必须是 group 的成员,否则会引发 RuntimeError。 参数 group (ProcessGroup) – 要查找相对 rank 的 ProcessGroup。 global_rank (int) – 要查询的全局 rank。 返回 global_rank 相对于 group 的组 rank 返回类型 int 注意 在默认进程组上调用此函数返回 identity torch.distributed.get_global_rank(group, group_rank)[source]# 将组 rank 转换为全局 rank。group_rank 必须是 group 的成员,否则会引发 RuntimeError。 参数 group (ProcessGroup) – 要从中查找全局 rank 的 ProcessGroup。 group_rank (int) – 要查询的组 rank。 返回 group_rank 相对于 group 的全局 rank 返回类型 int 注意 在默认进程组上调用此函数返回 identity torch.distributed.get_process_group_ranks(group)[source]# 获取与 group 关联的所有 rank。 参数 group (Optional[ProcessGroup]) – 要从中获取所有 rank 的 ProcessGroup。如果为 None,将使用默认进程组。 返回 按组 rank 排序的全局 rank 列表。 返回类型 list[int] DeviceMesh# DeviceMesh 是管理进程组(或 NCCL 通信器)的更高级抽象。它允许用户轻松创建节点间和节点内的进程组,而无需担心如何为不同的子进程组正确设置 rank,并有助于轻松管理这些分布式进程组。init_device_mesh() 函数可用于创建新的 DeviceMesh,并使用描述设备拓扑的网格形状。 class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, backend_override=None, _init_backend=True)[source]# DeviceMesh 表示一个设备网格,其中设备的布局可以表示为一个 n 维数组,而 n 维数组的每个值是默认进程组 rank 的全局 id。DeviceMesh 可用于跨集群设置 N 维设备连接,并管理 N 维并行的 ProcessGroup。通信可以在 DeviceMesh 的每个维度上单独发生。DeviceMesh 尊重用户已经选择的设备(即,如果用户在 DeviceMesh 初始化之前调用了 torch.cuda.set_device),并且如果用户事先没有设置设备,将为当前进程选择/设置设备。请注意,手动设备选择应在 DeviceMesh 初始化之前发生。DeviceMesh 也可以在与 DTensor API 一起使用时用作上下文管理器。 注意 DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/rank 上运行。因此,用户需要确保网格数组(描述设备布局)在所有 rank 上相同。不一致的网格将导致静默挂起。 参数 device_type (str) – 网格的设备类型。目前支持:"cpu"、"cuda/cuda-like"。 mesh (ndarray) – 一个多维数组或整数张量,描述设备的布局,其中 ID 是默认进程组的全局 ID。 返回 一个表示设备布局的 DeviceMesh 对象。 返回类型 DeviceMesh 以下程序以 SPMD 方式在每个进程/rank 上运行。在此示例中,我们有 2 台主机,每台有 4 个 GPU。在网格的第一个维度上进行归约将跨列 (0, 4), .. 和 (3, 7) 进行,在网格的第二个维度上进行归约将跨行 (0, 1, 2, 3) 和 (4, 5, 6, 7) 进行。 示例: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # 将设备网格初始化为 (2, 4) 以表示拓扑 >>> # 跨主机(维度 0)和主机内(维度 1)。 >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source]# 从现有的 ProcessGroup 或现有 ProcessGroup 列表构造具有 device_type 的 DeviceMesh。构造的设备网格的维度数等于传递的组数。例如,如果传递单个进程组,则生成的 DeviceMesh 是 1D 网格。如果传递 2 个进程组的列表,则生成的 DeviceMesh 是 2D 网格。如果传递多个组,则需要 mesh 和 mesh_dim_names 参数。传递的进程组的顺序决定了网格的拓扑。例如,第一个进程组将是 DeviceMesh 的第 0 维。传递的网格张量必须具有与传递的进程组数量相同的维度数,并且网格
Comprehensive assistance with pytorch-fsdp development, generated from official documentation.
This skill should be triggered when:
Pattern 1: Generic Join Context Manager# Created On: Jun 06, 2025 | Last Updated On: Jun 06, 2025 The generic join context manager facilitates distributed training on uneven inputs. This page outlines the API of the relevant classes: Join, Joinable, and JoinHook. For a tutorial, see Distributed Training with Uneven Inputs Using the Join Context Manager. class torch.distributed.algorithms.Join(joinables, enable=True, throw_on_early_termination=False, **kwargs)[source]# This class defines the generic join context manager, which allows custom hooks to be called after a process joins. These hooks should shadow the collective communications of non-joined processes to prevent hanging and erroring and to ensure algorithmic correctness. Refer to JoinHook for details about the hook definition. Warning The context manager requires each participating Joinable to call the method notify_join_context() before its own per- iteration collective communications to ensure correctness. Warning The context manager requires that all process_group attributes in the JoinHook objects are the same. If there are multiple JoinHook objects, then the device of the first is used. The process group and device information is used for checking for non- joined processes and for notifying processes to throw an exception if throw_on_early_termination is enabled, both of which using an all- reduce. Parameters joinables (List[Joinable]) – a list of the participating Joinable s; their hooks are iterated over in the given order. enable (bool) – a flag enabling uneven input detection; setting to False disables the context manager’s functionality and should only be set when the user knows the inputs will not be uneven (default: True). throw_on_early_termination (bool) – a flag controlling whether to throw an exception upon detecting uneven inputs (default: False). Example: >>> import os >>> import torch >>> import torch.distributed as dist >>> import torch.multiprocessing as mp >>> import torch.nn.parallel.DistributedDataParallel as DDP >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO >>> from torch.distributed.algorithms.join import Join >>> >>> # On each spawned worker >>> def worker(rank): >>> dist.init_process_group("nccl", rank=rank, world_size=2) >>> model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank]) >>> optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01) >>> # Rank 1 gets one more input than rank 0 >>> inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)] >>> with Join([model, optim]): >>> for input in inputs: >>> loss = model(input).sum() >>> loss.backward() >>> optim.step() >>> # All ranks reach here without hanging/erroring static notify_join_context(joinable)[source]# Notifies the join context manager that the calling process has not yet joined. Then, if throw_on_early_termination=True, checks if uneven inputs have been detected (i.e. if one process has already joined) and throws an exception if so. This method should be called from a Joinable object before its per-iteration collective communications. For example, this should be called at the beginning of the forward pass in DistributedDataParallel. Only the first Joinable object passed into the context manager performs the collective communications in this method, and for the others, this method is vacuous. Parameters joinable (Joinable) – the Joinable object calling this method. Returns An async work handle for the all-reduce meant to notify the context manager that the process has not yet joined if joinable is the first one passed into the context manager; None otherwise. class torch.distributed.algorithms.Joinable[source]# This defines an abstract base class for joinable classes. A joinable class (inheriting from Joinable) should implement join_hook(), which returns a JoinHook instance, in addition to join_device() and join_process_group() that return device and process group information, respectively. abstract property join_device: device# Return the device from which to perform collective communications needed by the join context manager. abstract join_hook(**kwargs)[source]# Return a JoinHook instance for the given Joinable. Parameters kwargs (dict) – a dict containing any keyword arguments to modify the behavior of the join hook at run time; all Joinable instances sharing the same join context manager are forwarded the same value for kwargs. Return type JoinHook abstract property join_process_group: Any# Returns the process group for the collective communications needed by the join context manager itself. class torch.distributed.algorithms.JoinHook[source]# This defines a join hook, which provides two entry points in the join context manager. Entry points : a main hook, which is called repeatedly while there exists a non-joined process, and a post-hook, which is called once all processes have joined. To implement a join hook for the generic join context manager, define a class that inherits from JoinHook and override main_hook() and post_hook() as appropriate. main_hook()[source]# Call this hook while there exists a non-joined process to shadow collective communications in a training iteration. Training iteration i.e., in one forward pass, backward pass, and optimizer step. post_hook(is_last_joiner)[source]# Call hook after all processes have joined. It is passed an additional bool argument is_last_joiner, which indicates if the rank is one of the last to join. Parameters is_last_joiner (bool) – True if the rank is one of the last to join; False otherwise.
Join
Pattern 2: Distributed communication package - torch.distributed# Created On: Jul 12, 2017 | Last Updated On: Sep 04, 2025 Note Please refer to PyTorch Distributed Overview for a brief introduction to all features related to distributed training. Backends# torch.distributed supports four built-in backends, each with different capabilities. The table below shows which functions are available for use with a CPU or GPU for each backend. For NCCL, GPU refers to CUDA GPU while for XCCL to XPU GPU. MPI supports CUDA only if the implementation used to build PyTorch supports it. Backend gloo mpi nccl xccl Device CPU GPU CPU GPU CPU GPU CPU GPU send ✓ ✘ ✓ ? ✘ ✓ ✘ ✓ recv ✓ ✘ ✓ ? ✘ ✓ ✘ ✓ broadcast ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ all_reduce ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ reduce ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ all_gather ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ gather ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ scatter ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ reduce_scatter ✓ ✓ ✘ ✘ ✘ ✓ ✘ ✓ all_to_all ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ barrier ✓ ✘ ✓ ? ✘ ✓ ✘ ✓ Backends that come with PyTorch# PyTorch distributed package supports Linux (stable), MacOS (stable), and Windows (prototype). By default for Linux, the Gloo and NCCL backends are built and included in PyTorch distributed (NCCL only when building with CUDA). MPI is an optional backend that can only be included if you build PyTorch from source. (e.g. building PyTorch on a host that has MPI installed.) Note As of PyTorch v1.8, Windows supports all collective communications backend but NCCL, If the init_method argument of init_process_group() points to a file it must adhere to the following schema: Local file system, init_method="file:///d:/tmp/some_file" Shared file system, init_method="file://////{machine_name}/{share_folder_name}/some_file" Same as on Linux platform, you can enable TcpStore by setting environment variables, MASTER_ADDR and MASTER_PORT. Which backend to use?# In the past, we were often asked: “which backend should I use?”. Rule of thumb Use the NCCL backend for distributed training with CUDA GPU. Use the XCCL backend for distributed training with XPU GPU. Use the Gloo backend for distributed training with CPU. GPU hosts with InfiniBand interconnect Use NCCL, since it’s the only backend that currently supports InfiniBand and GPUDirect. GPU hosts with Ethernet interconnect Use NCCL, since it currently provides the best distributed GPU training performance, especially for multiprocess single-node or multi-node distributed training. If you encounter any problem with NCCL, use Gloo as the fallback option. (Note that Gloo currently runs slower than NCCL for GPUs.) CPU hosts with InfiniBand interconnect If your InfiniBand has enabled IP over IB, use Gloo, otherwise, use MPI instead. We are planning on adding InfiniBand support for Gloo in the upcoming releases. CPU hosts with Ethernet interconnect Use Gloo, unless you have specific reasons to use MPI. Common environment variables# Choosing the network interface to use# By default, both the NCCL and Gloo backends will try to find the right network interface to use. If the automatically detected interface is not correct, you can override it using the following environment variables (applicable to the respective backend): NCCL_SOCKET_IFNAME, for example export NCCL_SOCKET_IFNAME=eth0 GLOO_SOCKET_IFNAME, for example export GLOO_SOCKET_IFNAME=eth0 If you’re using the Gloo backend, you can specify multiple interfaces by separating them by a comma, like this: export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3. The backend will dispatch operations in a round-robin fashion across these interfaces. It is imperative that all processes specify the same number of interfaces in this variable. Other NCCL environment variables# Debugging - in case of NCCL failure, you can set NCCL_DEBUG=INFO to print an explicit warning message as well as basic NCCL initialization information. You may also use NCCL_DEBUG_SUBSYS to get more details about a specific aspect of NCCL. For example, NCCL_DEBUG_SUBSYS=COLL would print logs of collective calls, which may be helpful when debugging hangs, especially those caused by collective type or message size mismatch. In case of topology detection failure, it would be helpful to set NCCL_DEBUG_SUBSYS=GRAPH to inspect the detailed detection result and save as reference if further help from NCCL team is needed. Performance tuning - NCCL performs automatic tuning based on its topology detection to save users’ tuning effort. On some socket-based systems, users may still try tuning NCCL_SOCKET_NTHREADS and NCCL_NSOCKS_PERTHREAD to increase socket network bandwidth. These two environment variables have been pre-tuned by NCCL for some cloud providers, such as AWS or GCP. For a full list of NCCL environment variables, please refer to NVIDIA NCCL’s official documentation You can tune NCCL communicators even further using torch.distributed.ProcessGroupNCCL.NCCLConfig and torch.distributed.ProcessGroupNCCL.Options. Learn more about them using help (e.g. help(torch.distributed.ProcessGroupNCCL.NCCLConfig)) in the interpreter. Basics# The torch.distributed package provides PyTorch support and communication primitives for multiprocess parallelism across several computation nodes running on one or more machines. The class torch.nn.parallel.DistributedDataParallel() builds on this functionality to provide synchronous distributed training as a wrapper around any PyTorch model. This differs from the kinds of parallelism provided by Multiprocessing package - torch.multiprocessing and torch.nn.DataParallel() in that it supports multiple network-connected machines and in that the user must explicitly launch a separate copy of the main training script for each process. In the single-machine synchronous case, torch.distributed or the torch.nn.parallel.DistributedDataParallel() wrapper may still have advantages over other approaches to data-parallelism, including torch.nn.DataParallel(): Each process maintains its own optimizer and performs a complete optimization step with each iteration. While this may appear redundant, since the gradients have already been gathered together and averaged across processes and are thus the same for every process, this means that no parameter broadcast step is needed, reducing time spent transferring tensors between nodes. Each process contains an independent Python interpreter, eliminating the extra interpreter overhead and “GIL-thrashing” that comes from driving several execution threads, model replicas, or GPUs from a single Python process. This is especially important for models that make heavy use of the Python runtime, including models with recurrent layers or many small components. Initialization# The package needs to be initialized using the torch.distributed.init_process_group() or torch.distributed.device_mesh.init_device_mesh() function before calling any other methods. Both block until all processes have joined. Warning Initialization is not thread-safe. Process group creation should be performed from a single thread, to prevent inconsistent ‘UUID’ assignment across ranks, and to prevent races during initialization that can lead to hangs. torch.distributed.is_available()[source]# Return True if the distributed package is available. Otherwise, torch.distributed does not expose any other APIs. Currently, torch.distributed is available on Linux, MacOS and Windows. Set USE_DISTRIBUTED=1 to enable it when building PyTorch from source. Currently, the default value is USE_DISTRIBUTED=1 for Linux and Windows, USE_DISTRIBUTED=0 for MacOS. Return type bool torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]# Initialize the default distributed process group. This will also initialize the distributed package. There are 2 main ways to initialize a process group: Specify store, rank, and world_size explicitly. Specify init_method (a URL string) which indicates where/how to discover peers. Optionally specify rank and world_size, or encode all required parameters in the URL and omit them. If neither is specified, init_method is assumed to be “env://”. Parameters backend (str or Backend, optional) – The backend to use. Depending on build-time configurations, valid values include mpi, gloo, nccl, ucc, xccl or one that is registered by a third-party plugin. Since 2.6, if backend is not provided, c10d will use a backend registered for the device type indicated by the device_id kwarg (if provided). The known default registrations today are: nccl for cuda, gloo for cpu, xccl for xpu. If neither backend nor device_id is provided, c10d will detect the accelerator on the run-time machine and use a backend registered for that detected accelerator (or cpu). This field can be given as a lowercase string (e.g., "gloo"), which can also be accessed via Backend attributes (e.g., Backend.GLOO). If using multiple processes per machine with nccl backend, each process must have exclusive access to every GPU it uses, as sharing GPUs between processes can result in deadlock or NCCL invalid usage. ucc backend is experimental. Default backend for the device can be queried with get_default_backend_for_device(). init_method (str, optional) – URL specifying how to initialize the process group. Default is “env://” if no init_method or store is specified. Mutually exclusive with store. world_size (int, optional) – Number of processes participating in the job. Required if store is specified. rank (int, optional) – Rank of the current process (it should be a number between 0 and world_size-1). Required if store is specified. store (Store, optional) – Key/value store accessible to all workers, used to exchange connection/address information. Mutually exclusive with init_method. timeout (timedelta, optional) – Timeout for operations executed against the process group. Default value is 10 minutes for NCCL and 30 minutes for other backends. This is the duration after which collectives will be aborted asynchronously and the process will crash. This is done since CUDA execution is async and it is no longer safe to continue executing user code since failed async NCCL operations might result in subsequent CUDA operations running on corrupted data. When TORCH_NCCL_BLOCKING_WAIT is set, the process will block and wait for this timeout. group_name (str, optional, deprecated) – Group name. This argument is ignored pg_options (ProcessGroupOptions, optional) – process group options specifying what additional options need to be passed in during the construction of specific process groups. As of now, the only options we support is ProcessGroupNCCL.Options for the nccl backend, is_high_priority_stream can be specified so that the nccl backend can pick up high priority cuda streams when there’re compute kernels waiting. For other available options to config nccl, See device_id (torch.device | int, optional) – a single, specific device this process will work on, allowing for backend-specific optimizations. Currently this has two effects, only under NCCL: the communicator is immediately formed (calling ncclCommInit* immediately rather than the normal lazy call) and sub-groups will use ncclCommSplit when possible to avoid unnecessary overhead of group creation. If you want to know NCCL initialization error early, you can also use this field. If an int is provided, the API assumes that the accelerator type at compile time will be used. Note To enable backend == Backend.MPI, PyTorch needs to be built from source on a system that supports MPI. Note Support for multiple backends is experimental. Currently when no backend is specified, both gloo and nccl backends will be created. The gloo backend will be used for collectives with CPU tensors and the nccl backend will be used for collectives with CUDA tensors. A custom backend can be specified by passing in a string with format “<device_type>:<backend_name>,<device_type>:<backend_name>”, e.g. “cpu:gloo,cuda:custom_backend”. torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None, backend_override=None)[source]# Initializes a DeviceMesh based on device_type, mesh_shape, and mesh_dim_names parameters. This creates a DeviceMesh with an n-dimensional array layout, where n is the length of mesh_shape. If mesh_dim_names is provided, each dimension is labeled as mesh_dim_names[i]. Note init_device_mesh follows SPMD programming model, meaning the same PyTorch Python program runs on all processes/ranks in the cluster. Ensure mesh_shape (the dimensions of the nD array describing device layout) is identical across all ranks. Inconsistent mesh_shape may lead to hanging. Note If no process group is found, init_device_mesh will initialize distributed process group/groups required for distributed communications behind the scene. Parameters device_type (str) – The device type of the mesh. Currently supports: “cpu”, “cuda/cuda-like”, “xpu”. Passing in a device type with a GPU index, such as “cuda:0”, is not allowed. mesh_shape (Tuple[int]) – A tuple defining the dimensions of the multi-dimensional array describing the layout of devices. mesh_dim_names (Tuple[str], optional) – A tuple of mesh dimension names to assign to each dimension of the multi-dimensional array describing the layout of devices. Its length must match the length of mesh_shape. Each string in mesh_dim_names must be unique. backend_override (Dict[int | str, tuple[str, Options] | str | Options], optional) – Overrides for some or all of the ProcessGroups that will be created for each mesh dimension. Each key can be either the index of a dimension or its name (if mesh_dim_names is provided). Each value can be a tuple containing the name of the backend and its options, or just one of these two components (in which case the other will be set to its default value). Returns A DeviceMesh object representing the device layout. Return type DeviceMesh Example: >>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp")) torch.distributed.is_initialized()[source]# Check if the default process group has been initialized. Return type bool torch.distributed.is_mpi_available()[source]# Check if the MPI backend is available. Return type bool torch.distributed.is_nccl_available()[source]# Check if the NCCL backend is available. Return type bool torch.distributed.is_gloo_available()[source]# Check if the Gloo backend is available. Return type bool torch.distributed.distributed_c10d.is_xccl_available()[source]# Check if the XCCL backend is available. Return type bool torch.distributed.is_torchelastic_launched()[source]# Check whether this process was launched with torch.distributed.elastic (aka torchelastic). The existence of TORCHELASTIC_RUN_ID environment variable is used as a proxy to determine whether the current process was launched with torchelastic. This is a reasonable proxy since TORCHELASTIC_RUN_ID maps to the rendezvous id which is always a non-null value indicating the job id for peer discovery purposes.. Return type bool torch.distributed.get_default_backend_for_device(device)[source]# Return the default backend for the given device. Parameters device (Union[str, torch.device]) – The device to get the default backend for. Returns The default backend for the given device as a lower case string. Return type str Currently three initialization methods are supported: TCP initialization# There are two ways to initialize using TCP, both requiring a network address reachable from all processes and a desired world_size. The first way requires specifying an address that belongs to the rank 0 process. This initialization method requires that all processes have manually specified ranks. Note that multicast address is not supported anymore in the latest distributed package. group_name is deprecated as well. import torch.distributed as dist # Use address of one of the machines dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4) Shared file-system initialization# Another initialization method makes use of a file system that is shared and visible from all machines in a group, along with a desired world_size. The URL should start with file:// and contain a path to a non-existent file (in an existing directory) on a shared file system. File-system initialization will automatically create that file if it doesn’t exist, but will not delete the file. Therefore, it is your responsibility to make sure that the file is cleaned up before the next init_process_group() call on the same file path/name. Note that automatic rank assignment is not supported anymore in the latest distributed package and group_name is deprecated as well. Warning This method assumes that the file system supports locking using fcntl - most local systems and NFS support it. Warning This method will always create the file and try its best to clean up and remove the file at the end of the program. In other words, each initialization with the file init method will need a brand new empty file in order for the initialization to succeed. If the same file used by the previous initialization (which happens not to get cleaned up) is used again, this is unexpected behavior and can often cause deadlocks and failures. Therefore, even though this method will try its best to clean up the file, if the auto-delete happens to be unsuccessful, it is your responsibility to ensure that the file is removed at the end of the training to prevent the same file to be reused again during the next time. This is especially important if you plan to call init_process_group() multiple times on the same file name. In other words, if the file is not removed/cleaned up and you call init_process_group() again on that file, failures are expected. The rule of thumb here is that, make sure that the file is non-existent or empty every time init_process_group() is called. import torch.distributed as dist # rank should always be specified dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank) Environment variable initialization# This method will read the configuration from environment variables, allowing one to fully customize how the information is obtained. The variables to be set are: MASTER_PORT - required; has to be a free port on machine with rank 0 MASTER_ADDR - required (except for rank 0); address of rank 0 node WORLD_SIZE - required; can be set either here, or in a call to init function RANK - required; can be set either here, or in a call to init function The machine with rank 0 will be used to set up all connections. This is the default method, meaning that init_method does not have to be specified (or can be env://). Improving initialization time# TORCH_GLOO_LAZY_INIT - establishes connections on demand rather than using a full mesh which can greatly improve initialization time for non all2all operations. Post-Initialization# Once torch.distributed.init_process_group() was run, the following functions can be used. To check whether the process group has already been initialized use torch.distributed.is_initialized(). class torch.distributed.Backend(name)[source]# An enum-like class for backends. Available backends: GLOO, NCCL, UCC, MPI, XCCL, and other registered backends. The values of this class are lowercase strings, e.g., "gloo". They can be accessed as attributes, e.g., Backend.NCCL. This class can be directly called to parse the string, e.g., Backend(backend_str) will check if backend_str is valid, and return the parsed lowercase string if so. It also accepts uppercase strings, e.g., Backend("GLOO") returns "gloo". Note The entry Backend.UNDEFINED is present but only used as initial value of some fields. Users should neither use it directly nor assume its existence. classmethod register_backend(name, func, extended_api=False, devices=None)[source]# Register a new backend with the given name and instantiating function. This class method is used by 3rd party ProcessGroup extension to register new backends. Parameters name (str) – Backend name of the ProcessGroup extension. It should match the one in init_process_group(). func (function) – Function handler that instantiates the backend. The function should be implemented in the backend extension and takes four arguments, including store, rank, world_size, and timeout. extended_api (bool, optional) – Whether the backend supports extended argument structure. Default: False. If set to True, the backend will get an instance of c10d::DistributedBackendOptions, and a process group options object as defined by the backend implementation. device (str or list of str, optional) – device type this backend supports, e.g. “cpu”, “cuda”, etc. If None, assuming both “cpu” and “cuda” Note This support of 3rd party backend is experimental and subject to change. torch.distributed.get_backend(group=None)[source]# Return the backend of the given process group. Parameters group (ProcessGroup, optional) – The process group to work on. The default is the general main process group. If another specific group is specified, the calling process must be part of group. Returns The backend of the given process group as a lower case string. Return type Backend torch.distributed.get_rank(group=None)[source]# Return the rank of the current process in the provided group, default otherwise. Rank is a unique identifier assigned to each process within a distributed process group. They are always consecutive integers ranging from 0 to world_size. Parameters group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. Returns The rank of the process group -1, if not part of the group Return type int torch.distributed.get_world_size(group=None)[source]# Return the number of processes in the current process group. Parameters group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. Returns The world size of the process group -1, if not part of the group Return type int Shutdown# It is important to clean up resources on exit by calling destroy_process_group(). The simplest pattern to follow is to destroy every process group and backend by calling destroy_process_group() with the default value of None for the group argument, at a point in the training script where communications are no longer needed, usually near the end of main(). The call should be made once per trainer-process, not at the outer process-launcher level. if destroy_process_group() is not called by all ranks in a pg within the timeout duration, especially when there are multiple process-groups in the application e.g. for N-D parallelism, hangs on exit are possible. This is because the destructor for ProcessGroupNCCL calls ncclCommAbort, which must be called collectively, but the order of calling ProcessGroupNCCL’s destructor if called by python’s GC is not deterministic. Calling destroy_process_group() helps by ensuring ncclCommAbort is called in a consistent order across ranks, and avoids calling ncclCommAbort during ProcessGroupNCCL’s destructor. Reinitialization# destroy_process_group can also be used to destroy individual process groups. One use case could be fault tolerant training, where a process group may be destroyed and then a new one initialized during runtime. In this case, it’s critical to synchronize the trainer processes using some means other than torch.distributed primitives calling destroy and before subsequently initializing. This behavior is currently unsupported/untested, due to the difficulty of achieving this synchronization, and is considered a known issue. Please file a github issue or RFC if this is a use case that’s blocking you. Groups# By default collectives operate on the default group (also called the world) and require all processes to enter the distributed function call. However, some workloads can benefit from more fine-grained communication. This is where distributed groups come into play. new_group() function can be used to create new groups, with arbitrary subsets of all processes. It returns an opaque group handle that can be given as a group argument to all collectives (collectives are distributed functions to exchange information in certain well-known programming patterns). torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source]# Create a new distributed group. This function requires that all processes in the main group (i.e. all processes that are part of the distributed job) enter this function, even if they are not going to be members of the group. Additionally, groups should be created in the same order in all processes. Warning Safe concurrent usage: When using multiple process groups with the NCCL backend, the user must ensure a globally consistent execution order of collectives across ranks. If multiple threads within a process issue collectives, explicit synchronization is necessary to ensure consistent ordering. When using async variants of torch.distributed communication APIs, a work object is returned and the communication kernel is enqueued on a separate CUDA stream, allowing overlap of communication and computation. Once one or more async ops have been issued on one process group, they must be synchronized with other cuda streams by calling work.wait() before using another process group. See Using multiple NCCL communicators concurrently for more details. Parameters ranks (list[int]) – List of ranks of group members. If None, will be set to all ranks. Default is None. timeout (timedelta, optional) – see init_process_group for details and default value. backend (str or Backend, optional) – The backend to use. Depending on build-time configurations, valid values are gloo and nccl. By default uses the same backend as the global group. This field should be given as a lowercase string (e.g., "gloo"), which can also be accessed via Backend attributes (e.g., Backend.GLOO). If None is passed in, the backend corresponding to the default process group will be used. Default is None. pg_options (ProcessGroupOptions, optional) – process group options specifying what additional options need to be passed in during the construction of specific process groups. i.e. for the nccl backend, is_high_priority_stream can be specified so that process group can pick up high priority cuda streams. For other available options to config nccl, See (bool, optional): perform a group-local barrier at the end of the process group creation. This is different in that non-member ranks don’t need to call into API and don’t join the barrier. group_desc (str, optional) – a string to describe the process group. device_id (torch.device, optional) – a single, specific device to “bind” this process to, The new_group call will try to initialize a communication backend immediately for the device if this field is given. Returns A handle of distributed group that can be given to collective calls or GroupMember.NON_GROUP_MEMBER if the rank is not part of ranks. N.B. use_local_synchronization doesn’t work with MPI. N.B. While use_local_synchronization=True can be significantly faster with larger clusters and small process groups, care must be taken since it changes cluster behavior as non-member ranks don’t join the group barrier(). N.B. use_local_synchronization=True can lead to deadlocks when each rank creates multiple overlapping process groups. To avoid that, make sure all ranks follow the same global creation order. torch.distributed.get_group_rank(group, global_rank)[source]# Translate a global rank into a group rank. global_rank must be part of group otherwise this raises RuntimeError. Parameters group (ProcessGroup) – ProcessGroup to find the relative rank. global_rank (int) – Global rank to query. Returns Group rank of global_rank relative to group Return type int N.B. calling this function on the default process group returns identity torch.distributed.get_global_rank(group, group_rank)[source]# Translate a group rank into a global rank. group_rank must be part of group otherwise this raises RuntimeError. Parameters group (ProcessGroup) – ProcessGroup to find the global rank from. group_rank (int) – Group rank to query. Returns Global rank of group_rank relative to group Return type int N.B. calling this function on the default process group returns identity torch.distributed.get_process_group_ranks(group)[source]# Get all ranks associated with group. Parameters group (Optional[ProcessGroup]) – ProcessGroup to get all ranks from. If None, the default process group will be used. Returns List of global ranks ordered by group rank. Return type list[int] DeviceMesh# DeviceMesh is a higher level abstraction that manages process groups (or NCCL communicators). It allows user to easily create inter node and intra node process groups without worrying about how to set up the ranks correctly for different sub process groups, and it helps manage those distributed process group easily. init_device_mesh() function can be used to create new DeviceMesh, with a mesh shape describing the device topology. class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, backend_override=None, _init_backend=True)[source]# DeviceMesh represents a mesh of devices, where layout of devices could be represented as a n-d dimension array, and each value of the n-d dimensional array is the global id of the default process group ranks. DeviceMesh could be used to setup the N dimensional device connections across the cluster, and manage the ProcessGroups for N dimensional parallelisms. Communications could happen on each dimension of the DeviceMesh separately. DeviceMesh respects the device that user selects already (i.e. if user call torch.cuda.set_device before the DeviceMesh initialization), and will select/set the device for the current process if user does not set the device beforehand. Note that manual device selection should happen BEFORE the DeviceMesh initialization. DeviceMesh can also be used as a context manager when using together with DTensor APIs. Note DeviceMesh follows SPMD programming model, which means the same PyTorch Python program is running on all processes/ranks in the cluster. Therefore, users need to make sure the mesh array (which describes the layout of devices) should be identical across all ranks. Inconsistent mesh will lead to silent hang. Parameters device_type (str) – The device type of the mesh. Currently supports: “cpu”, “cuda/cuda-like”. mesh (ndarray) – A multi-dimensional array or an integer tensor describing the layout of devices, where the IDs are global IDs of the default process group. Returns A DeviceMesh object representing the device layout. Return type DeviceMesh The following program runs on each process/rank in an SPMD manner. In this example, we have 2 hosts with 4 GPUs each. A reduction over the first dimension of mesh will reduce across columns (0, 4), .. and (3, 7), a reduction over the second dimension of mesh reduces across rows (0, 1, 2, 3) and (4, 5, 6, 7). Example: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source]# Constructs a DeviceMesh with device_type from an existing ProcessGroup or a list of existing ProcessGroup. The constructed device mesh has number of dimensions equal to the number of groups passed. For example, if a single process group is passed in, the resulted DeviceMesh is a 1D mesh. If a list of 2 process groups is passed in, the resulted DeviceMesh is a 2D mesh. If more than one group is passed, then the mesh and mesh_dim_names arguments are required. The order of the process groups passed in determines the topology of the mesh. For example, the first process group will be the 0th dimension of the DeviceMesh. The mesh tensor passed in must have the same number of dimensions as the number of process groups passed in, and the order of the dimensions in the mesh tensor must match the order in the process groups passed in. Parameters group (ProcessGroup or list[ProcessGroup]) – the existing ProcessGroup or a list of existing ProcessGroups. device_type (str) – The device type of the mesh. Currently supports: “cpu”, “cuda/cuda-like”. Passing in a device type with a GPU index, such as “cuda:0”, is not allowed. mesh (torch.Tensor or ArrayLike, optional) – A multi-dimensional array or an integer tensor describing the layout of devices, where the IDs are global IDs of the default process group. Default is None. mesh_dim_names (tuple[str], optional) – A tuple of mesh dimension names to assign to each dimension of the multi-dimensional array describing the layout of devices. Its length must match the length of mesh_shape. Each string in mesh_dim_names must be unique. Default is None. Returns A DeviceMesh object representing the device layout. Return type DeviceMesh get_all_groups()[source]# Returns a list of ProcessGroups for all mesh dimensions. Returns A list of ProcessGroup object. Return type list[torch.distributed.distributed_c10d.ProcessGroup] get_coordinate()[source]# Return the relative indices of this rank relative to all dimensions of the mesh. If this rank is not part of the mesh, return None. Return type Optional[list[int]] get_group(mesh_dim=None)[source]# Returns the single ProcessGroup specified by mesh_dim, or, if mesh_dim is not specified and the DeviceMesh is 1-dimensional, returns the only ProcessGroup in the mesh. Parameters mesh_dim (str/python:int, optional) – it can be the name of the mesh dimension or the index None. (of the mesh dimension. Default is) – Returns A ProcessGroup object. Return type ProcessGroup get_local_rank(mesh_dim=None)[source]# Returns the local rank of the given mesh_dim of the DeviceMesh. Parameters mesh_dim (str/python:int, optional) – it can be the name of the mesh dimension or the index None. (of the mesh dimension. Default is) – Returns An integer denotes the local rank. Return type int The following program runs on each process/rank in an SPMD manner. In this example, we have 2 hosts with 4 GPUs each. Calling mesh_2d.get_local_rank(mesh_dim=0) on rank 0, 1, 2, 3 would return 0. Calling mesh_2d.get_local_rank(mesh_dim=0) on rank 4, 5, 6, 7 would return 1. Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 0, 4 would return 0. Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 1, 5 would return 1. Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 2, 6 would return 2. Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 3, 7 would return 3. Example: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) get_rank()[source]# Returns the current global rank. Return type int Point-to-point communication# torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source]# Send a tensor synchronously. Warning tag is not supported with the NCCL backend. Parameters tensor (Tensor) – Tensor to send. dst (int) – Destination rank on global process group (regardless of group argument). Destination rank should not be the same as the rank of the current process. group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. tag (int, optional) – Tag to match send with remote recv group_dst (int, optional) – Destination rank on group. Invalid to specify both dst and group_dst. torch.distributed.recv(tensor, src=None, group=None, tag=0, group_src=None)[source]# Receives a tensor synchronously. Warning tag is not supported with the NCCL backend. Parameters tensor (Tensor) – Tensor to fill with received data. src (int, optional) – Source rank on global process group (regardless of group argument). Will receive from any process if unspecified. group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. tag (int, optional) – Tag to match recv with remote send group_src (int, optional) – Destination rank on group. Invalid to specify both src and group_src. Returns Sender rank -1, if not part of the group Return type int isend() and irecv() return distributed request objects when used. In general, the type of this object is unspecified as they should never be created manually, but they are guaranteed to support two methods: is_completed() - returns True if the operation has finished wait() - will block the process until the operation is finished. is_completed() is guaranteed to return True once it returns. torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source]# Send a tensor asynchronously. Warning Modifying tensor before the request completes causes undefined behavior. Warning tag is not supported with the NCCL backend. Unlike send, which is blocking, isend allows src == dst rank, i.e. send to self. Parameters tensor (Tensor) – Tensor to send. dst (int) – Destination rank on global process group (regardless of group argument) group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. tag (int, optional) – Tag to match send with remote recv group_dst (int, optional) – Destination rank on group. Invalid to specify both dst and group_dst Returns A distributed request object. None, if not part of the group Return type Optional[Work] torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source]# Receives a tensor asynchronously. Warning tag is not supported with the NCCL backend. Unlike recv, which is blocking, irecv allows src == dst rank, i.e. recv from self. Parameters tensor (Tensor) – Tensor to fill with received data. src (int, optional) – Source rank on global process group (regardless of group argument). Will receive from any process if unspecified. group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. tag (int, optional) – Tag to match recv with remote send group_src (int, optional) – Destination rank on group. Invalid to specify both src and group_src. Returns A distributed request object. None, if not part of the group Return type Optional[Work] torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None, use_batch=False)[source]# Sends picklable objects in object_list synchronously. Similar to send(), but Python objects can be passed in. Note that all objects in object_list must be picklable in order to be sent. Parameters object_list (List[Any]) – List of input objects to sent. Each object must be picklable. Receiver must provide lists of equal sizes. dst (int) – Destination rank to send object_list to. Destination rank is based on global process group (regardless of group argument) group (Optional[ProcessGroup]) – (ProcessGroup, optional): The process group to work on. If None, the default process group will be used. Default is None. device (torch.device, optional) – If not None, the objects are serialized and converted to tensors which are moved to the device before sending. Default is None. group_dst (int, optional) – Destination rank on group. Must specify one of dst and group_dst but not both use_batch (bool, optional) – If True, use batch p2p operations instead of regular send operations. This avoids initializing 2-rank communicators and uses existing entire group communicators. See batch_isend_irecv for usage and assumptions. Default is False. Returns None. Note For NCCL-based process groups, internal tensor representations of objects must be moved to the GPU device before communication takes place. In this case, the device used is given by torch.cuda.current_device() and it is the user’s responsibility to ensure that this is set so that each rank has an individual GPU, via torch.cuda.set_device(). Warning Object collectives have a number of serious performance and scalability limitations. See Object collectives for details. Warning send_object_list() uses pickle module implicitly, which is known to be insecure. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling. Only call this function with data you trust. Warning Calling send_object_list() with GPU tensors is not well supported and inefficient as it incurs GPU -> CPU transfer since tensors would be pickled. Please consider using send() instead. Example::>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None, use_batch=False)[source]# Receives picklable objects in object_list synchronously. Similar to recv(), but can receive Python objects. Parameters object_list (List[Any]) – List of objects to receive into. Must provide a list of sizes equal to the size of the list being sent. src (int, optional) – Source rank from which to recv object_list. Source rank is based on global process group (regardless of group argument) Will receive from any rank if set to None. Default is None. group (Optional[ProcessGroup]) – (ProcessGroup, optional): The process group to work on. If None, the default process group will be used. Default is None. device (torch.device, optional) – If not None, receives on this device. Default is None. group_src (int, optional) – Destination rank on group. Invalid to specify both src and group_src. use_batch (bool, optional) – If True, use batch p2p operations instead of regular send operations. This avoids initializing 2-rank communicators and uses existing entire group communicators. See batch_isend_irecv for usage and assumptions. Default is False. Returns Sender rank. -1 if rank is not part of the group. If rank is part of the group, object_list will contain the sent objects from src rank. Note For NCCL-based process groups, internal tensor representations of objects must be moved to the GPU device before communication takes place. In this case, the device used is given by torch.cuda.current_device() and it is the user’s responsibility to ensure that this is set so that each rank has an individual GPU, via torch.cuda.set_device(). Warning Object collectives have a number of serious performance and scalability limitations. See Object collectives for details. Warning recv_object_list() uses pickle module implicitly, which is known to be insecure. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling. Only call this function with data you trust. Warning Calling recv_object_list() with GPU tensors is not well supported and inefficient as it incurs GPU -> CPU transfer since tensors would be pickled. Please consider using recv() instead. Example::>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.batch_isend_irecv(p2p_op_list)[source]# Send or Receive a batch of tensors asynchronously and return a list of requests. Process each of the operations in p2p_op_list and return the corresponding requests. NCCL, Gloo, and UCC backend are currently supported. Parameters p2p_op_list (list[torch.distributed.distributed_c10d.P2POp]) – A list of point-to-point operations(type of each operator is torch.distributed.P2POp). The order of the isend/irecv in the list matters and it needs to match with corresponding isend/irecv on the remote end. Returns A list of distributed request objects returned by calling the corresponding op in the op_list. Return type list[torch.distributed.distributed_c10d.Work] Examples >>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size) >>> recv_op = dist.P2POp( ... dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size ... ) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1 Note Note that when this API is used with the NCCL PG backend, users must set the current GPU device with torch.cuda.set_device, otherwise it will lead to unexpected hang issues. In addition, if this API is the first collective call in the group passed to dist.P2POp, all ranks of the group must participate in this API call; otherwise, the behavior is undefined. If this API call is not the first collective call in the group, batched P2P operations involving only a subset of ranks of the group are allowed. class torch.distributed.P2POp(op, tensor, peer=None, group=None, tag=0, group_peer=None)[source]# A class to build point-to-point operations for batch_isend_irecv. This class builds the type of P2P operation, communication buffer, peer rank, Process Group, and tag. Instances of this class will be passed to batch_isend_irecv for point-to-point communications. Parameters op (Callable) – A function to send data to or receive data from a peer process. The type of op is either torch.distributed.isend or torch.distributed.irecv. tensor (Tensor) – Tensor to send or receive. peer (int, optional) – Destination or source rank. group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. tag (int, optional) – Tag to match send with recv. group_peer (int, optional) – Destination or source rank. Synchronous and asynchronous collective operations# Every collective operation function supports the following two kinds of operations, depending on the setting of the async_op flag passed into the collective: Synchronous operation - the default mode, when async_op is set to False. When the function returns, it is guaranteed that the collective operation is performed. In the case of CUDA operations, it is not guaranteed that the CUDA operation is completed, since CUDA operations are asynchronous. For CPU collectives, any further function calls utilizing the output of the collective call will behave as expected. For CUDA collectives, function calls utilizing the output on the same CUDA stream will behave as expected. Users must take care of synchronization under the scenario of running under different streams. For details on CUDA semantics such as stream synchronization, see CUDA Semantics. See the below script to see examples of differences in these semantics for CPU and CUDA operations. Asynchronous operation - when async_op is set to True. The collective operation function returns a distributed request object. In general, you don’t need to create it manually and it is guaranteed to support two methods: is_completed() - in the case of CPU collectives, returns True if completed. In the case of CUDA operations, returns True if the operation has been successfully enqueued onto a CUDA stream and the output can be utilized on the default stream without further synchronization. wait() - in the case of CPU collectives, will block the process until the operation is completed. In the case of CUDA collectives, will block the currently active CUDA stream until the operation is completed (but will not block the CPU). get_future() - returns torch.(100) if rank == 0: # if the explicit call to wait_stream was omitted, the output below will be # non-deterministically 1 or 101, depending on whether the allreduce overwrote # the value after the add completed. print(output) Collective functions# torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[source]# Broadcasts the tensor to the whole group. tensor must have the same number of elements in all processes participating in the collective. Parameters tensor (Tensor) – Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise. src (int) – Source rank on global process group (regardless of group argument). group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. async_op (bool, optional) – Whether this op should be an async op group_src (int) – Source rank on group. Must specify one of group_src and src but not both. Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[source]# Broadcasts picklable objects in object_list to the whole group. Similar to broadcast(), but Python objects can be passed in. Note that all objects in object_list must be picklable in order to be broadcasted. Parameters object_list (List[Any]) – List of input objects to broadcast. Each object must be picklable. Only objects on the src rank will be broadcast, but each rank must provide lists of equal sizes. src (int) – Source rank from which to broadcast object_list. Source rank is based on global process group (regardless of group argument) group (Optional[ProcessGroup]) – (ProcessGroup, optional): The process group to work on. If None, the default process group will be used. Default is None. device (torch.device, optional) – If not None, the objects are serialized and converted to tensors which are moved to the device before broadcasting. Default is None. group_src (int) – Source rank on group. Must not specify one of group_src and src but not both. Returns None. If rank is part of the group, object_list will contain the broadcasted objects from src rank. Note For NCCL-based process groups, internal tensor representations of objects must be moved to the GPU device before communication takes place. In this case, the device used is given by torch.cuda.current_device() and it is the user’s responsibility to ensure that this is set so that each rank has an individual GPU, via torch.cuda.set_device(). Note Note that this API differs slightly from the broadcast() collective since it does not provide an async_op handle and thus will be a blocking call. Warning Object collectives have a number of serious performance and scalability limitations. See Object collectives for details. Warning broadcast_object_list() uses pickle module implicitly, which is known to be insecure. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling. Only call this function with data you trust. Warning Calling broadcast_object_list() with GPU tensors is not well supported and inefficient as it incurs GPU -> CPU transfer since tensors would be pickled. Please consider using broadcast() instead. Example::>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> objects = [None, None, None] >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]# Reduces the tensor data across all machines in a way that all get the final result. After the call tensor is going to be bitwise identical in all processes. Complex tensors are supported. Parameters tensor (Tensor) – Input and output of the collective. The function operates in-place. op (optional) – One of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions. group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. async_op (bool, optional) – Whether this op should be an async op Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group Examples >>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0 tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1 torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[source]# Reduces the tensor data across all machines. Only the process with rank dst is going to receive the final result. Parameters tensor (Tensor) – Input and output of the collective. The function operates in-place. dst (int) – Destination rank on global process group (regardless of group argument) op (optional) – One of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions. group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. async_op (bool, optional) – Whether this op should be an async op group_dst (int) – Destination rank on group. Must specify one of group_dst and dst but not both. Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]# Gathers tensors from the whole group in a list. Complex and uneven sized tensors are supported. Parameters tensor_list (list[Tensor]) – Output list. It should contain correctly-sized tensors to be used for output of the collective. Uneven sized tensors are supported. tensor (Tensor) – Tensor to be broadcast from current process. group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used. async_op (bool, optional) – Whether this op should be an async op Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group Examples >>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor_list = [ ... torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0 [tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1 >>> # All tensors below are of torch.cfloat dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [ ... torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1 torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source]# Gather tensors from all ranks and put them in a single output tensor. This function requires all tensors to be the same size on each process. Parameters output_tensor (Tensor) – Output tensor to accommodate tensor elements from all ranks. It must be correctly sized to have one of the following forms: (i) a concatenation of all the input tensors along the primary dimension; for definition of “concatenation”, see torch.cat(); (ii) a stack of all the input tensors along the primary dimension; for definition of “stack”, see torch.stack(). Examples below may better explain the supported output forms. input_tensor (Tensor) – Tensor to
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
106,200 周安装