PyTorch多卡分布式训练DDP单机多卡

发布时间 2023-08-30 09:04:03作者: 海_纳百川
PyTorch多卡分布式训练:DistributedDataParallel (DDP) 简要分析

前言

因为课题组发的卡还没有下来,先向导师问了实验室的两张卡借用。之前都是单卡训练模型,正好在这个机会实践以下单机多卡训练模型的方法。关于 DDP 网上有很多资料,但都比较零碎(有些博客的代码甚至没办法 run),Pytorch 给出的官方文档看起来也比较吃力。因此这篇文章的主要目的是梳理一下笔者学习过程中认为比较好的资料,用通俗的语言介绍一下 DDP 的原理,最后给出使用 DDP 的模板以及一份详细的运行案例。

当代研究生应当掌握的并行训练方法(单机多卡):

https://zhuanlan.zhihu.com/p/98535650

这篇文章中介绍了目前常用的并行训练方法。其中,nn.Dataparallel 的使用最简单,只需要使用 Dataparallel 包装模型,再设置一些参数就可以实现。参数中需要指定参与训练的 GPU,device_ids=gpus;汇总梯度的 GPU,output_device=gpus[0]。

model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])

nn.Dataparallel 方法实际上是使用单进程将模型和数据加载到多个 GPU 上,控制数据在 GPU 之间流动,协同不同的 GPU 上的模型进行并行训练。这篇文章 [8] 中提到 nn.Dataparallel 方法的弊端:在训练的过程中,每个 batch 的模型权重是在一个进程上计算出来之后,再分发到每个 GPU 上。

这会导致负载不均衡的问题,可能第一个 GPU(12GB)占用了 10GB,剩余 GPU 却只使用了 4GB。因为在数据并行的时候,loss 会在第一个 GPU 上相加计算,更新好以后把权重分发到其余卡。这就造成了第一个 GPU 的负载远大于其他显卡。

 

nn.DistributedDataParallel 原理

与 nn.Dataparallel 使用单进程控制多个 GPU 不同, nn.DistributedDataParallel 为每个 GPU 都创建一个进程。这些 GPU 可以位于同一个结点上(单机多卡),也可以分布在多个节点上(多机多卡)。每个进程都执行相同的任务,每个进程都与其他进程进行通信。

另外一点不同是,只有梯度会在进程(GPU)之间传播。以单机多卡举例,假设我们有三张卡并行训练,那么在每个 epoch 中,数据集会被划分成三份给三个 GPU,每个 GPU 使用自己的 minibatch 数据做自己的前向计算,然后梯度在 GPU 之间全部约简。在反向传播结束的时候,每个 GPU 都有平均的梯度,确保模型权值保持同步(synchronized)。

 

运行模板

这一小节以官方文档给出的 demo 作为例子,介绍 DDP 的使用模板以及运行流程:

https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

首先我们导入库文件,其中 torch.multiprocessing 用于创建进程,后面会详细介绍。


import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mpfrom torch.nn.parallel import DistributedDataParallel as DDP

前文提到,DDP 模型对于每个 GPU 都会创建一个单独的进程管理。在程序并发执行的过程中,进程之间需要同步和通信。因此我们需要一个方法管理进程组,这个方法需要知道如何找到进程 0。也要知道进程组中同步了多少个进程。init_process_group 方法能够实现上述功能,其中参数的含义解释如下:

  • backend:使用的后端。包括 mpi, gloo, 和 nccl。根据官方文档的介绍,nccl 是运行速度最快的,因此大多设置为这个

  • rank:当前进程的等级。在 DDP 管理的进程组中,每个独立的进程需要知道自己在所有进程中的阶序,我们称为 rank

  • world_size:在 DDP 管理的进程组中,每个独立的进程还需要知道进程组中管理进程的数量,我们称为 world_size

下面 setup 以及 cleanup 分别实现了进程组的设置以及销毁。


ef setup(rank, world_size):os.environ['MASTER_ADDR'] = 'localhost'os.environ['MASTER_PORT'] = '12355'# initialize the process groupdist.init_process_group("nccl", rank=rank, world_size=world_size)def cleanup():dist.destroy_process_group()

本例中我们训练一个简单的网络结构 ToyModel


class ToyModel(nn.Module):def __init__(self):super(ToyModel, self).__init__()self.net1 = nn.Linear(10, 10)self.relu = nn.ReLU()self.net2 = nn.Linear(10, 5)def forward(self, x):return self.net2(self.relu(self.net1(x)))

下面是模型训练部分的模板。数据和模型加载到当前进程使用的 GPU 中,正常进行正反向传播,需要注意以下几点:

  • 每个进程都需要复制一份模型以及数据。我们需要根据前文提到的 rank 和 world_size 两个参数初始化进程组。这样进程之间才能相互通信。使用我们前文定义的 setup() 方法实现;

  • model = ToyModel().to(rank) 这条语句将我们的模型移动到对应的 GPU中, rank 参数作为进程之间的阶序,可以理解为当前进程 index。由于每个进程都管理自己的 GPU,因此通过阶序可以索引到对应的 GPU;

  • ddp_model = DDP(model, device_ids=[rank])这条语句包装了我们的模型;

  • 其他与 pytorch 中训练模型的模板相同,最后一点需要注意的是,在我们将 tensor 移动到 GPU 的时候,同样需要使用 rank 索引,代码中体现在第 14 行。


def demo_basic(rank, world_size):print(f"Running basic DDP example on rank {rank}.")setup(rank, world_size)# create model and move it to GPU with id rankmodel = ToyModel().to(rank)ddp_model = DDP(model, device_ids=[rank])loss_fn = nn.MSELoss()optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)optimizer.zero_grad()outputs = ddp_model(torch.randn(20, 10))labels = torch.randn(20, 5).to(rank)loss_fn(outputs, labels).backward()optimizer.step()cleanup()

最后是启动器的介绍。DDP 的启动有两种方式,分别对应不同的代码。

  • torch.distributed.launch 启动器,用于在命令行分布式地执行 python 文件。在执行过程中,启动器会将当前进程的(其实就是 GPU 的)index 通过参数传递给 python。在使用的时候执行语句CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py 调用启动器 torch.distributed.launch。

  • 本例中使用的是第二种方法 torch.multiprocessing.spawn。使用时,只需要调用 torch.multiprocessing.spawn,torch.multiprocessing 就会帮助我们自动创建进程。

如下面的代码所示,spawn 开启了 world_size 个进程,每个进程执行 demo_fn 并向其中传入 local_rank(当前进程 index)作为参数。这里需要结合前文 demo_basic 的定义来看。args 中的 world_size 对应 demo_basic 的 world_size 参数;mp.spawn 中 nprocs 则是创建进程的数量;至于demo_basic 中的 rank 参数,应当是框架内部实现了索引机制因此不需要我们显示对应(笔者自己的理解)。


def run_demo(demo_fn, world_size):mp.spawn(demo_fn,args=(world_size,),nprocs=world_size,join=True)

完整代码如下:


import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mpfrom torch.nn.parallel import DistributedDataParallel as DDP# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# For TcpStore, same way as on Linux.def setup(rank, world_size):os.environ['MASTER_ADDR'] = 'localhost'os.environ['MASTER_PORT'] = '12355'# initialize the process groupdist.init_process_group("nccl", rank=rank, world_size=world_size)def cleanup():dist.destroy_process_group()class ToyModel(nn.Module):def __init__(self):super(ToyModel, self).__init__()self.net1 = nn.Linear(10, 10)self.relu = nn.ReLU()self.net2 = nn.Linear(10, 5)def forward(self, x):return self.net2(self.relu(self.net1(x)))def demo_basic(rank, world_size):print(f"Running basic DDP example on rank {rank}.")setup(rank, world_size)# create model and move it to GPU with id rankmodel = ToyModel().to(rank)ddp_model = DDP(model, device_ids=[rank])loss_fn = nn.MSELoss()optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)optimizer.zero_grad()outputs = ddp_model(torch.randn(20, 10))labels = torch.randn(20, 5).to(rank)loss_fn(outputs, labels).backward()optimizer.step()cleanup()def run_demo(demo_fn, world_size):mp.spawn(demo_fn,args=(world_size,),nprocs=world_size,join=True)if __name__ == "__main__":n_gpus = torch.cuda.device_count()assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"world_size = n_gpusrun_demo(demo_basic, world_size)

这份代码可以直接运行,输入结果如下,笔者使用两张卡,因此对应的 rank 分别是 0 和 1:

Running basic DDP  example on Rank 1
Running basic DDP  example on Rank 0

在官网给的这份 demo 之外,其实还有一点需要注意。我们使用 pytorch 处理数据集创建 dataloader 的过程中,需要使用 DistributedSampler 采样器。我们已经知道,每个进程都会拷贝一份模型和数据的副本,但是在并行计算的过程中,单个进程只会处理自己的 minibatch 的数据。

假设我们使用五个 GPU 并行,其对应的五个进程都有模型和数据的副本,我们假设训练集有一万条数据,那么在单个 epoch 中每个进程实际上只需要使用两千条数据训练,之后进行梯度整合。那么进程如何知道自己需要处理哪些数据呢?这是 DistributedSampler 的功能。

关于 DistributedSampler 具体做了什么,可以参考文献 7。

在此基础上,笔者根据官网的 demo 总结了一份使用 DDP 进行多卡并行加速模型的模板,读者在使用过程中根据需要进行简单更改即可使用:

import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mpfrom torch.nn.parallel import DistributedDataParallel as DDPdef setup(rank, world_size):os.environ['MASTER_ADDR'] = 'localhost'os.environ['MASTER_PORT'] = '12355'# initialize the process groupdist.init_process_group("nccl", rank=rank, world_size=world_size)def run(demo_fn, world_size):setup(rank, world_size)torch.manual_seed(18)torch.cuda.manual_seed_all(18)torch.backends.cudnn.deterministic = Truetorch.cuda.set_device(rank) # 这里设置 device ,后面可以直接使用 data.cuda(),否则需要指定 ranktrain_dataset = ...train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)model = ...model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])optimizer = optim.SGD(model.parameters())for epoch in range(100):train_sampler.set_epoch(epoch)for batch_idx, (data, target) in enumerate(train_loader):data = data.cuda()target = target.cuda()...output = model(images)loss = criterion(output, target)...optimizer.zero_grad()loss.backward()optimizer.step()if __name__ == "__main__":n_gpus = torch.cuda.device_count()assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"world_size = n_gpusmp.spawn(run,args=(world_size,),nprocs=world_size,join=True)

总结一下,使用 DDP 进行多卡并行加速模型的重点:

  • init_process_group 函数管理进程组

  • 在创建 Dataloader 的过程中,需要使用 DistributedSampler 采样器

  • 正反向传播之前需要将数据以及模型移动到对应 GPU,通过参数 rank 进行索引,还要将模型使用 DistributedDataParallel 进行包装

  • 在每个 epoch 开始之前,需要使用 train_sampler.set_epoch(epoch)为 train_sampler 指定 epoch,这样做可以使每个 epoch 划分给不同进程的  minibatch 不同,从而在整个训练过程中,不同的进程有机会接触到更多的训练数据

  • 使用启动器进行启动。不同启动器对应不同的代码。torch.distributed.launch 通过命令行的方法执行,torch.multiprocessing.spawn 则可以直接运行程序。

最后,笔者使用上述模板实现了一份基于 Roberta 的文本分类任务,使用 DDP 进行单机双卡并行加速,运行的结果如下,有感兴趣的读者再放代码吧,这里展示一下运行结果:

 

 

参考文献

注:下述文献大多使用 torch.distributed.launch 启动器执行程序

[1] 当代研究生应当掌握的并行训练方法(单机多卡):

https://zhuanlan.zhihu.com/p/98535650

[2] PyTorch Parallel Training(单机多卡并行、混合精度、同步BN训练指南文档):https://zhuanlan.zhihu.com/p/145427849

[3] pytorch多卡分布式训练简要分析:https://zhuanlan.zhihu.com/p/159404316

[4] Pytorch中的Distributed Data Parallel与混合精度训练(Apex):https://zhuanlan.zhihu.com/p/105755472

[5] PyTorch分布式训练基础--DDP使用:https://zhuanlan.zhihu.com/p/358974461

[6] 使用PyTorch编写分布式应用程序:https://www.jianshu.com/p/be9f8b90a1b8?utm_campaign=hugo&utm_medium=reader_share&utm_content=note&utm_source=weixin-friends

[7] DistributedSampler 具体做了什么:https://blog.csdn.net/searobbers_duck/article/details/115299691?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link

[8] Pytorch的nn.DataParallel:https://zhuanlan.zhihu.com/p/102697821