美文网首页
pytorch并行训练

pytorch并行训练

作者: carry_xz | 来源:发表于2020-04-10 19:02 被阅读0次

参考:
https://zhuanlan.zhihu.com/p/105755472

torch.nn.DataParallel 与torch.distributed 的不同

  • distributed较新,每个进程对应一个独立的训练过程,只有梯度等少量数据在进程中交互传递。
    在各进程梯度计算完成之后,各进程需要将梯度进行汇总平均,然后再由 rank=0 的进程,将其 broadcast 到所有进程。之后,各进程用该梯度来更新参数。
    由于各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。
  • DataParallel 中,全程维护一个 optimizer,对各 GPU 上梯度进行求和,而在主 GPU 进行参数更新,之后再将模型参数 broadcast 到其他 GPU。传输参数+梯度 数据量大,等待时间长。
  • distributed 进程包含独立的进程锁,因此可以减少解释器和 GIL 使用冲突,这对于严重依赖 Python runtime 的 models 而言,比如说包含 RNN 层或大量小组件的 models 而言,这尤为重要。

RingAllReduce 与 TreeAllReduce

  • TreeAllReduce 采用 PS(参数服务器) 计算模型的分布式,gpu多时通常会遇到网络的问题。由于仅使用某一个 GPU 做服务器,该GPU需要接收其他所有 GPU 的梯度,并求平均以及 broadcast 回去,若 GPU 数量越大时,通信成本也就越高。
  • RingAllreduce GPU 集群被组织成一个逻辑环,每个 GPU 只从左邻居接受数据、并发送数据给右邻居,即每次同步每个 gpu 只获得部分梯度更新,等一个完整的 Ring 完成,每个 GPU 都获得了完整的参数。算法的每次通信成本是恒定的,与系统中 gpu 的数量无关,完全由系统中 gpu 之间最慢的连接决定。

分布式使用流程

并行训练又分为数据并行 (Data Parallelism)模型并行两种。
数据并行指的是,多张 GPU 使用相同的模型副本,但是使用不同的数据批进行训练。而模型并行指的是,多张GPU 分别训练模型的不同部分,使用同一批数据。

基本概念

  • group
    即进程组。默认情况下,只有一个组,一个 job 即为一个组,也即一个 world。

当需要进行更加精细的通信时,可以通过 new_group 接口,使用 word 的子集,创建新组,用于集体通信等。

  • world size:
    表示全局进程个数,进程总数。

  • rank:
    表示进程序号,用于进程间通讯,表征进程优先级。rank = 0 的主机为 master 节点。

  • local_rank:
    进程内,GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU。

数据并行

import torch
net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])

nn.DataParallel使用起来更加简单(通常只要封装模型然后跑训练代码就ok了)。

  • 模型的权重都是在 一个进程上先算出来 然后再把他们分发到每个GPU上,所以网络通信就成为了一个瓶颈,而GPU使用率也通常很低。
  • 需要所有的GPU都在一个节点(一台机器)上,且并不支持 [Apex] 的 [混合精度训练]
  • 容易出现GPU负载不均衡,一个GPU占满,其他占用很少。

模型并行

nn.DistributedDataParallel 进行Multiprocessing可以在多个gpu之间复制该模型,每个gpu由一个进程控制。这些GPU可以位于同一个节点上,也可以分布在多个节点上。每个进程都执行相同的任务,并且每个进程与所有其他进程通信。

DistributedDataParallel

torch.nn.parallel.DistributedDataParallel 与 apex.parallel.DistributedDataParallel 区别在于初始化方式不一样,基本可以等价替换。

from apex.parallel import DistributedDataParallel as DDP
model = DDP(model)

from torch.nn.parallel import DistributedDataParallel as DDP
model = DDP(model,device_ids=[local_rank],output_device=local_rank)

带fp16的混合精度、多gpu训练最小实例

import os
from datetime import datetime
import argparse
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
# from apex.parallel import DistributedDataParallel as DDP
from torch.nn.parallel import DistributedDataParallel as DDP
from apex import amp


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N',
                        help='number of data loading workers (default: 4)')
    parser.add_argument('-g', '--gpus', default=1, type=int,
                        help='number of gpus per node')
    parser.add_argument('-nr', '--nr', default=0, type=int,
                        help='ranking within the nodes')
    parser.add_argument('--epochs', default=10, type=int, metavar='N',
                        help='number of total epochs to run')
    parser.add_argument('--local_rank', default=-1, type=int,
                    help='node rank for distributed training')  # 注意此参数必须加上
    args = parser.parse_args()
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '8889' # 多个程序同一机器调试时需要改变port,一个程序时,这两行可以删除
    dist.init_process_group(backend='nccl')  # 初始化
    torch.cuda.set_device(args.local_rank)
    train(args)

class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out


def train(args):
    gpu = args.local_rank # 可以用来控制模型保存,打印等
    torch.manual_seed(0)
    model = ConvNet()
    model.cuda()
    batch_size = 100
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    # Wrap the model
    model, optimizer = amp.initialize(model, optimizer, opt_level='O1')
    print('Error args.local_rank:',args.local_rank)
    # model = DDP(model) # apex 两种方式都可以,注意对应关系
    model = DDP(model,device_ids=[args.local_rank],output_device=args.local_rank) # torch
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(
        root='./data',
        train=True,
        transform=transforms.ToTensor(),
        download=True
    )
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset) # 数据分割,加速训练
    train_loader = torch.utils.data.DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        shuffle=False, # 并行时需要关闭此项
        num_workers=0,
        pin_memory=True,
        sampler=train_sampler
    )

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(args.epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            with amp.scale_loss(loss, optimizer) as scaled_loss:
                scaled_loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1,
                    args.epochs,
                    i + 1,
                    total_step,
                    loss.item())
                )
    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))


if __name__ == '__main__':
    main()

训练命令
python -m torch.distributed.launch --nproc_per_node=2 distributed_train.py

并行训练的模型加载

并行训练保存的模型与单卡训练的模型有些不同,会导致加载模型出问题

def _Single2Parallel(self, origin_state):
    """
    将串行的权值参数转换为并行的权值参数
    :param origin_state : 原始串行权值参数
    :return             : 并行的权值参数
    """
    converted = OrderedDict()

    for k, v in origin_state.items():
      name = "module." + k
      converted[name] = v

    return converted


def _Parallel2Single(self, origin_state):
    """
    将并行的权值参数转换为串行的权值参数
    :param origin_state : 原始串行权值参数
    :return             : 并行的权值参数
    """

    converted = OrderedDict()

    for k, v in origin_state.items():
      name = k[7:]
      converted[name] = v

    return converted

相关文章

  • Pytorch单机多卡分布式训练 数据并行

    Pytorch单机多卡训练(数据并行训练) Pytorch的数据并行训练,已经被封装的十分完善。全程只需两步: 1...

  • pytorch并行训练

    参考:https://zhuanlan.zhihu.com/p/105755472 torch.nn.DataPa...

  • Pytorch学习笔记(5) Pytorch GPU加速训练

    主要介绍在Pytorch中如何使用GPU进行加速训练,同时在多GPU的条件下如何进行单机并行计算。 一、Pytor...

  • pytorch多GPU并行

    一、pytorch多GPU并行 (1)引用库 (2)加载模型 (3) 并行化 二、GPU数据转成list (1)引...

  • PyTorch 训练

     PyTorch 训练与加速神经网络训练. 更多可以查看官网 :* PyTorch 官网 批训练 Torch 中提...

  • 20201114-pytorch指定GPU

    使用pytorch的并行GPU接口 model= torch.nn.DataParallel(model, dev...

  • Pytorch袖珍手册之十一

    第六章 Pytorch加速及优化(性能提升) 之二 模型并行处理 model parallel processin...

  • 基于Pytorch的MLP实现

    基于Pytorch的MLP实现 目标 使用pytorch构建MLP网络 训练集使用MNIST数据集 使用GPU加速...

  • pytorch finetune模型

    pytorch finetune模型 文章主要讲述如何在pytorch上读取以往训练的模型参数,在模型的名字已经变...

  • Pytorch Fine-tuning

    pytorch 使用预训练过的ResNet 进行微调,训练新的数据集CIFAR100

网友评论

      本文标题:pytorch并行训练

      本文链接:https://www.haomeiwen.com/subject/wlcdmhtx.html