pytorch中基于DistributedDataParallel实现多卡并行计算
torch.nn.parallel.
DistributedDataParallel提供了更为高效的单机多卡和多机多卡训练接口。
DistributedDataParallel并行库接口参数参考:
https://www.cnblogs.com/jiangkejie/p/13256115.html
这里给出单机多卡的具体实现步骤
参考:
https://github.com/tczhangzhi/pytorch-distributed
https://blog.****.net/zwqjoy/article/details/89415933
https://www.cnblogs.com/yh-blog/p/12877922.html
几个基本概念
-
即进程组。默认情况下,只有一个组,一个 job 即为一个组,也即一个 world。
当需要进行更加精细的通信时,可以通过 new_group 接口,使用 word 的子集,创建新组,用于集体通信等。
-
world size :
表示全局进程个数。
-
rank:
表示进程序号,用于进程间通讯,表征进程优先级。rank = 0 的主机为 master 节点。
-
local_rank:
节点内,GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个节点内的第 1 块 GPU。节点内每个进程会分配一个 local_rank 参数,表示当前进程在当前主机上的编号。例如:rank=2, local_rank=0 表示第 3 个节点上的第 1 个进程。
1: torch.nn.parallel.DistributedDataParallel
这个从名字上就能看出来与DataParallel相类似,也是一个模型wrapper。这个包是实现多机多卡分布训练最核心东西,它可以帮助我们在不同机器的多个模型拷贝之间平均梯度。
2: torch.utils.data.distributed.DistributedSampler
在多机多卡情况下分布式训练数据的读取也是一个问题,不同的卡读取到的数据应该是不同的。dataparallel的做法是直接将batch切分到不同的卡,这种方法对于多机来说不可取,因为多机之间直接进行数据传输会严重影响效率。于是有了利用sampler确保dataloader只会load到整个数据集的一个特定子集的做法。DistributedSampler就是做这件事的。它为每一个子进程划分出一部分数据集,以避免不同进程之间数据重复
使用流程
Pytorch
中分布式的基本使用流程如下:
-
在使用
distributed
包的任何其他函数之前,需要使用init_process_group
初始化进程组,同时初始化distributed
包。 -
如果需要进行小组内集体通信,用
new_group
创建子分组 -
创建分布式并行(DistributedDataParallel)模型
DDP(model, device_ids=device_ids)
- 为数据集创建distributed
Sampler
-
使用启动工具
torch.distributed.launch
在每个主机上执行一次脚本,开始训练 -
使用
destory_process_group()
销毁进程组
单机多卡--DistributedDataParallel
import torch import torch.nn as nn from torch.autograd import Variable from torch.utils.data import Dataset, DataLoader import os from torch.utils.data.distributed import DistributedSampler # 1) 初始化 torch.distributed.init_process_group(backend="nccl") input_size = 5 output_size = 2 batch_size = 30 data_size = 90 # 2) 配置每个进程的gpu local_rank = torch.distributed.get_rank() # 也可以通过设置args.local_rank得到(见下文) torch.cuda.set_device(local_rank) device = torch.device("cuda", local_rank) class RandomDataset(Dataset): def __init__(self, size, length): self.len = length self.data = torch.randn(length, size).to(\'cuda\') def __getitem__(self, index): return self.data[index] def __len__(self): return self.len dataset = RandomDataset(input_size, data_size) # 3)使用DistributedSampler rand_loader = DataLoader(dataset=dataset, batch_size=batch_size, sampler=DistributedSampler(dataset)) class Model(nn.Module): def __init__(self, input_size, output_size): super(Model, self).__init__() self.fc = nn.Linear(input_size, output_size) def forward(self, input): output = self.fc(input) print(" In Model: input size", input.size(), "output size", output.size()) return output model = Model(input_size, output_size) # 4) 封装之前要把模型移到对应的gpu model.to(device) if torch.cuda.device_count() > 1: print("Let\'s use", torch.cuda.device_count(), "GPUs!") # 5) 封装 model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank) for data in rand_loader: if torch.cuda.is_available(): input_var = data else: input_var = data output = model(input_var) print("Outside: input size", input_var.size(), "output_size", output.size())
torch.distributed.launch 会给模型分配一个args.local_rank的参数,也可以通过torch.distributed.get_rank()获取进程id。
# 这个参数是torch.distributed.launch传递过来的,我们设置位置参数来接受,local_rank代表当前程序进程使用的GPU标号 parser = argparse.ArgumentParser() parser.add_argument(\'--local_rank\', default=-1, type=int, help=\'node rank for distributed training\') args = parser.parse_args() print(args.local_rank))
怎么将程序跑起来。这里也有两种方法:
1. 用 torch.distributed.launch
:
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py
参考:https://pytorch.org/docs/stable/distributed.html#distributed-launch
torch.distributed package在torch. distribued .launch中提供了一个启动实用程序。
这个辅助工具可以用于为每个节点启动多个进程,以进行分布式训练。它是在每个训练节点上生成多个分布式训练进程的模块。
一些Notes参见:https://pytorch.org/docs/stable/distributed.html#distributed-launch
2. 用 torch.multiprocessing:
import torch.multiprocessing as mp def main(rank, your_custom_arg_1, your_custom_arg_2): # 这里第一个 rank 变量会被 mp.spawn 函数自动填充,可以充当 local_rank 来用(参见前面的代码) pass # 将前面那一堆东西包装成一个 main 函数 mp.spawn(main, nprocs=how_many_process, args=(your_custom_arg_1, your_custom_arg_2))
多机多卡训练
参考:https://blog.****.net/zwqjoy/article/details/89415933
Save and Load Checkpoints
在训练期间,通常使用torch.save和torch.load来设置检查点模块并从检查点恢复。有关更多详细信息,请参见保存和加载模型。
使用DDP时,一种优化方法是仅在一个进程中保存模型,然后将其加载到所有进程中,从而减少写入开销。这是正确的,因为所有进程都从相同的参数开始,并且梯度在向后传递中同步,因此优化程序应将不同位置的模型参数设置为相同的值。
如果使用此优化,请确保在保存完成之前所有进程不要开始加载(使用dist.barrier()命令)。
此外,在加载模块时,您需要提供适当的map_location参数,以防止进程进入其他的设备。
如果缺少map_location,torch.load将首先将模块加载到CPU,然后将每个参数复制到保存该参数的位置,这将导致同一台计算机上的所有进程使用相同的设备集。
有关更高级的故障恢复和弹性支持,请参阅TorchElastic。
示例:https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
import os import sys import tempfile import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim import torch.multiprocessing as mp from torch.nn.parallel import DistributedDataParallel as DDP # On Windows platform, the torch.distributed package only # supports Gloo backend, FileStore and TcpStore. # For FileStore, set init_method parameter in init_process_group # to a local file. Example as follow: # init_method="file:///f:/libtmp/some_file" # dist.init_process_group( # "gloo", # rank=rank, # init_method=init_method, # world_size=world_size) # For TcpStore, same way as on Linux. def setup(rank, world_size): os.environ[\'MASTER_ADDR\'] = \'localhost\' os.environ[\'MASTER_PORT\'] = \'12355\' # initialize the process group dist.init_process_group("gloo", rank=rank, world_size=world_size) def cleanup(): dist.destroy_process_group() class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() self.net1 = nn.Linear(10, 10) self.relu = nn.ReLU() self.net2 = nn.Linear(10, 5) def forward(self, x): return self.net2(self.relu(self.net1(x))) def demo_basic(rank, world_size): print(f"Running basic DDP example on rank {rank}.") setup(rank, world_size) # create model and move it to GPU with id rank model = ToyModel().to(rank) ddp_model = DDP(model, device_ids=[rank]) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) optimizer.zero_grad() outputs = ddp_model(torch.randn(20, 10)) labels = torch.randn(20, 5).to(rank) loss_fn(outputs, labels).backward() optimizer.step() cleanup() def run_demo(demo_fn, world_size): mp.spawn(demo_fn, args=(world_size,), nprocs=world_size, join=True) def demo_checkpoint(rank, world_size): print(f"Running DDP checkpoint example on rank {rank}.") setup(rank, world_size) model = ToyModel().to(rank) ddp_model = DDP(model, device_ids=[rank]) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint" if rank == 0: # All processes should see same parameters as they all start from same # random parameters and gradients are synchronized in backward passes. # Therefore, saving it in one process is sufficient. torch.save(ddp_model.state_dict(), CHECKPOINT_PATH) # Use a barrier() to make sure that process 1 loads the model after process # 0 saves it. dist.barrier() # configure map_location properly map_location = {\'cuda:%d\' % 0: \'cuda:%d\' % rank} ddp_model.load_state_dict( torch.load(CHECKPOINT_PATH, map_location=map_location)) optimizer.zero_grad() outputs = ddp_model(torch.randn(20, 10)) labels = torch.randn(20, 5).to(rank) loss_fn = nn.MSELoss() loss_fn(outputs, labels).backward() optimizer.step() # Not necessary to use a dist.barrier() to guard the file deletion below # as the AllReduce ops in the backward pass of DDP already served as # a synchronization. if rank == 0: os.remove(CHECKPOINT_PATH) cleanup()