美文网首页
pytorch并行训练

pytorch并行训练

作者: carry_xz | 来源:发表于2020-04-10 19:02 被阅读0次

    参考:
    https://zhuanlan.zhihu.com/p/105755472

    torch.nn.DataParallel 与torch.distributed 的不同

    • distributed较新,每个进程对应一个独立的训练过程,只有梯度等少量数据在进程中交互传递。
      在各进程梯度计算完成之后,各进程需要将梯度进行汇总平均,然后再由 rank=0 的进程,将其 broadcast 到所有进程。之后,各进程用该梯度来更新参数。
      由于各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。
    • DataParallel 中,全程维护一个 optimizer,对各 GPU 上梯度进行求和,而在主 GPU 进行参数更新,之后再将模型参数 broadcast 到其他 GPU。传输参数+梯度 数据量大,等待时间长。
    • distributed 进程包含独立的进程锁,因此可以减少解释器和 GIL 使用冲突,这对于严重依赖 Python runtime 的 models 而言,比如说包含 RNN 层或大量小组件的 models 而言,这尤为重要。

    RingAllReduce 与 TreeAllReduce

    • TreeAllReduce 采用 PS(参数服务器) 计算模型的分布式,gpu多时通常会遇到网络的问题。由于仅使用某一个 GPU 做服务器,该GPU需要接收其他所有 GPU 的梯度,并求平均以及 broadcast 回去,若 GPU 数量越大时,通信成本也就越高。
    • RingAllreduce GPU 集群被组织成一个逻辑环,每个 GPU 只从左邻居接受数据、并发送数据给右邻居,即每次同步每个 gpu 只获得部分梯度更新,等一个完整的 Ring 完成,每个 GPU 都获得了完整的参数。算法的每次通信成本是恒定的,与系统中 gpu 的数量无关,完全由系统中 gpu 之间最慢的连接决定。

    分布式使用流程

    并行训练又分为数据并行 (Data Parallelism)模型并行两种。
    数据并行指的是,多张 GPU 使用相同的模型副本,但是使用不同的数据批进行训练。而模型并行指的是,多张GPU 分别训练模型的不同部分,使用同一批数据。

    基本概念

    • group
      即进程组。默认情况下,只有一个组,一个 job 即为一个组,也即一个 world。

    当需要进行更加精细的通信时,可以通过 new_group 接口,使用 word 的子集,创建新组,用于集体通信等。

    • world size:
      表示全局进程个数,进程总数。

    • rank:
      表示进程序号,用于进程间通讯,表征进程优先级。rank = 0 的主机为 master 节点。

    • local_rank:
      进程内,GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU。

    数据并行

    import torch
    net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
    

    nn.DataParallel使用起来更加简单(通常只要封装模型然后跑训练代码就ok了)。

    • 模型的权重都是在 一个进程上先算出来 然后再把他们分发到每个GPU上,所以网络通信就成为了一个瓶颈,而GPU使用率也通常很低。
    • 需要所有的GPU都在一个节点(一台机器)上,且并不支持 [Apex] 的 [混合精度训练]
    • 容易出现GPU负载不均衡,一个GPU占满,其他占用很少。

    模型并行

    nn.DistributedDataParallel 进行Multiprocessing可以在多个gpu之间复制该模型,每个gpu由一个进程控制。这些GPU可以位于同一个节点上,也可以分布在多个节点上。每个进程都执行相同的任务,并且每个进程与所有其他进程通信。

    DistributedDataParallel

    torch.nn.parallel.DistributedDataParallel 与 apex.parallel.DistributedDataParallel 区别在于初始化方式不一样,基本可以等价替换。

    from apex.parallel import DistributedDataParallel as DDP
    model = DDP(model)
    
    from torch.nn.parallel import DistributedDataParallel as DDP
    model = DDP(model,device_ids=[local_rank],output_device=local_rank)
    

    带fp16的混合精度、多gpu训练最小实例

    import os
    from datetime import datetime
    import argparse
    import torchvision
    import torchvision.transforms as transforms
    import torch
    import torch.nn as nn
    import torch.distributed as dist
    # from apex.parallel import DistributedDataParallel as DDP
    from torch.nn.parallel import DistributedDataParallel as DDP
    from apex import amp
    
    
    def main():
        parser = argparse.ArgumentParser()
        parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N',
                            help='number of data loading workers (default: 4)')
        parser.add_argument('-g', '--gpus', default=1, type=int,
                            help='number of gpus per node')
        parser.add_argument('-nr', '--nr', default=0, type=int,
                            help='ranking within the nodes')
        parser.add_argument('--epochs', default=10, type=int, metavar='N',
                            help='number of total epochs to run')
        parser.add_argument('--local_rank', default=-1, type=int,
                        help='node rank for distributed training')  # 注意此参数必须加上
        args = parser.parse_args()
        os.environ['MASTER_ADDR'] = '127.0.0.1'
        os.environ['MASTER_PORT'] = '8889' # 多个程序同一机器调试时需要改变port,一个程序时,这两行可以删除
        dist.init_process_group(backend='nccl')  # 初始化
        torch.cuda.set_device(args.local_rank)
        train(args)
    
    class ConvNet(nn.Module):
        def __init__(self, num_classes=10):
            super(ConvNet, self).__init__()
            self.layer1 = nn.Sequential(
                nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
                nn.BatchNorm2d(16),
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2, stride=2))
            self.layer2 = nn.Sequential(
                nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
                nn.BatchNorm2d(32),
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2, stride=2))
            self.fc = nn.Linear(7*7*32, num_classes)
    
        def forward(self, x):
            out = self.layer1(x)
            out = self.layer2(out)
            out = out.reshape(out.size(0), -1)
            out = self.fc(out)
            return out
    
    
    def train(args):
        gpu = args.local_rank # 可以用来控制模型保存,打印等
        torch.manual_seed(0)
        model = ConvNet()
        model.cuda()
        batch_size = 100
        # define loss function (criterion) and optimizer
        criterion = nn.CrossEntropyLoss().cuda()
        optimizer = torch.optim.SGD(model.parameters(), 1e-4)
        # Wrap the model
        model, optimizer = amp.initialize(model, optimizer, opt_level='O1')
        print('Error args.local_rank:',args.local_rank)
        # model = DDP(model) # apex 两种方式都可以,注意对应关系
        model = DDP(model,device_ids=[args.local_rank],output_device=args.local_rank) # torch
        # Data loading code
        train_dataset = torchvision.datasets.MNIST(
            root='./data',
            train=True,
            transform=transforms.ToTensor(),
            download=True
        )
        train_sampler = torch.utils.data.distributed.DistributedSampler(
            train_dataset) # 数据分割,加速训练
        train_loader = torch.utils.data.DataLoader(
            dataset=train_dataset,
            batch_size=batch_size,
            shuffle=False, # 并行时需要关闭此项
            num_workers=0,
            pin_memory=True,
            sampler=train_sampler
        )
    
        start = datetime.now()
        total_step = len(train_loader)
        for epoch in range(args.epochs):
            for i, (images, labels) in enumerate(train_loader):
                images = images.cuda(non_blocking=True)
                labels = labels.cuda(non_blocking=True)
                # Forward pass
                outputs = model(images)
                loss = criterion(outputs, labels)
    
                # Backward and optimize
                optimizer.zero_grad()
                with amp.scale_loss(loss, optimizer) as scaled_loss:
                    scaled_loss.backward()
                optimizer.step()
                if (i + 1) % 100 == 0 and gpu == 0:
                    print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                        epoch + 1,
                        args.epochs,
                        i + 1,
                        total_step,
                        loss.item())
                    )
        if gpu == 0:
            print("Training complete in: " + str(datetime.now() - start))
    
    
    if __name__ == '__main__':
        main()
    

    训练命令
    python -m torch.distributed.launch --nproc_per_node=2 distributed_train.py

    并行训练的模型加载

    并行训练保存的模型与单卡训练的模型有些不同,会导致加载模型出问题

    def _Single2Parallel(self, origin_state):
        """
        将串行的权值参数转换为并行的权值参数
        :param origin_state : 原始串行权值参数
        :return             : 并行的权值参数
        """
        converted = OrderedDict()
    
        for k, v in origin_state.items():
          name = "module." + k
          converted[name] = v
    
        return converted
    
    
    def _Parallel2Single(self, origin_state):
        """
        将并行的权值参数转换为串行的权值参数
        :param origin_state : 原始串行权值参数
        :return             : 并行的权值参数
        """
    
        converted = OrderedDict()
    
        for k, v in origin_state.items():
          name = k[7:]
          converted[name] = v
    
        return converted
    

    相关文章

      网友评论

          本文标题:pytorch并行训练

          本文链接:https://www.haomeiwen.com/subject/wlcdmhtx.html