美文网首页机器学习与数据挖掘
深度学习分布式训练(上)-Pytorch实现篇

深度学习分布式训练(上)-Pytorch实现篇

作者: 老居搞机 | 来源:发表于2020-05-23 12:45 被阅读0次

    前言

    随着数据量越来越多以及模型层数的越来越复杂,深度学习展现了更强劲的效果,但随之而来也带来了负面影响:训练的时间也跟着变的更长,有的模型一训练就是好几天,怎么加快训练的时间?

    俗话说人多力量大,很自然想到我们可以把多个GPU组合在一起形成一个集群训练模型来加快训练速度

    本着先使用后深入理论,本篇主要讲Pytorch分布式训练的使用,下一篇将详细介绍分布式训练的原理

    分布式训练

    分布式训练根据并行策略的不同,可以分为模型并行和数据并行

    数据并行中根据梯度同步的策略不同,又可以分为参数服务器同步和All-Reduce方式同步(当然这些都放在下一篇讲解)

    本篇讲的Pytorch分布式训练采用数据并行方式,梯度信息同步采用All-Reduce

    Pytorch分布式训练

    废话不多说,我们在实战中学习,先跑个例子然后再慢慢解释每一段的意思,下面一段代码拷下来可以保存成mnist.py文件:

    # -*- encoding: utf8 -*-
    import torch
    from torch import nn
    import torch.nn.functional as F
    import torch.distributed as dist
    from torchvision import datasets, transforms
    import argparse
    import torch.optim as optim
    from torch.utils.data.distributed import DistributedSampler
    ​
    DATA_DIR = '~/data/mnist'
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    ​
    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 20, 5, 1)
            self.conv2 = nn.Conv2d(20, 50, 5, 1)
            self.fc1 = nn.Linear(4*4*50, 500)
            self.fc2 = nn.Linear(500, 10)
    ​
        def forward(self, x):
            x = F.relu(self.conv1(x))
            x = F.max_pool2d(x, 2, 2)
            x = F.relu(self.conv2(x))
            x = F.max_pool2d(x, 2, 2)
            x = x.view(-1, 4*4*50)
            x = F.relu(self.fc1(x))
            x = self.fc2(x)
            return F.log_softmax(x, dim=1)
    ​
    def train(model, train_loader, optimizer, epoch):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(DEVICE), target.to(DEVICE)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 100 == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}'.format(
                    epoch, batch_idx * len(data), len(train_loader.dataset),
                    100. * batch_idx / len(train_loader), loss.item()))
                    
    def test(model, test_loader):
        model.eval()
        test_loss = 0
        correct = 0
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(DEVICE), target.to(DEVICE)
                output = model(data)
                test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
                pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
                correct += pred.eq(target.view_as(pred)).sum().item()
    ​
        test_loss /= len(test_loader.dataset)
        print('\naccuracy={:.4f}\n'.format(float(correct) / len(test_loader.dataset)))
    ​
    def load_data(dist, batch_size=64, test_batch_size=64):
        train_kwargs = {'num_workers': 1, 'pin_memory': True}
        test_kwargs = {'num_workers': 1, 'pin_memory': True}
    ​
        train_data_set = datasets.MNIST(DATA_DIR, train=True, download=True,
                           transform=transforms.Compose([
                               transforms.ToTensor(),
                               transforms.Normalize((0.1307,), (0.3081,))
                           ]))
    ​
        if dist.is_initialized():
            # 如果采用分布式训练, 使用DistributedSampler让每个worker拿到训练数据集不同的子集
            datasampler = DistributedSampler(train_data_set)
            # sampler shuffle must be `False`
            train_kwargs.update({'sampler': datasampler,
                                 'shuffle': False
                                 })
    ​
        train_loader = torch.utils.data.DataLoader(train_data_set, batch_size=batch_size, **train_kwargs)
        test_loader = torch.utils.data.DataLoader(
            datasets.MNIST(DATA_DIR, train=False, transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))
            ])),
            batch_size=test_batch_size, shuffle=True, **test_kwargs)
    ​
        return train_loader, test_loader
    ​
    def main():
        parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
        parser.add_argument('--backend', type=str, help='Distributed backend',
                            choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
                            default=dist.Backend.GLOO)
        parser.add_argument('--init-method', default=None, type=str,
                            help='Distributed init_method')
        parser.add_argument('--rank', default=-1, type=int,
                            help='Distributed rank')
        parser.add_argument('--world-size', default=-1, type=int,
                            help='Distributed world_size')
        args = parser.parse_args()
    ​
        dist.init_process_group(backend=args.backend,
                                init_method=args.init_method,
                                rank=args.rank,
                                world_size=args.world_size
                                )
    ​
        train_loader, test_loader = load_data(dist)
        model = Net().to(DEVICE)
        model = nn.parallel.DistributedDataParallel(model)
        optimizer = optim.Adam(model.parameters(), lr=0.001)
    ​
        for epoch in range(1, 2):
            train(model, train_loader, optimizer, epoch)
            test(model, test_loader)
    ​
    ​
    if __name__ == '__main__':
        main()
    

    如果没有多台GPU机器,可以用本地指定端口号的方式来测试一下

    在第一个终端运行:

    $ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 0 --world-size 3
    

    在第二个第三个终端再运行:

    $ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 1 --world-size 3
    $ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 2 --world-size 3
    

    wow 激动人心! 在三个进程里面我们等于模拟了三台机器在做分布式训练了,训练输出结果:

    我们来先看一下第一段:

    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
        parser.add_argument('--backend', type=str, help='Distributed backend',
                            choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
                            default=dist.Backend.GLOO)
        parser.add_argument('--init-method', default=None, type=str,
                            help='Distributed init_method')
        parser.add_argument('--rank', default=-1, type=int,
                            help='Distributed rank')
        parser.add_argument('--world-size', default=-1, type=int,
                            help='Distributed world_size')
        args = parser.parse_args()
    ​
        dist.init_process_group(backend=args.backend,
                                init_method=args.init_method,
                                rank=args.rank,
                                world_size=args.world_size
                                )
    

    这一段初始化Pytorch分布式训练的参数:

    • rank:等级, 0为master, >0为worker
    • world_size:进程总数量,Pytorch会等到所有world_size个进程就绪之后才会开心训练
    • backend:指定当前进程要使用的通信后端,支持的通信后端有 gloo,mpi,nccl方式,支持如下所示:
    • init_method:分布式训练的初始化方式,默认使用环境变量方式env

    1.env:读取环境变量方式,会自动读取系统中的这些环境变量:

    MASTER_ADDR: 要求(0级除外), 等级0节点的地址
    MASTER_PORT: 机器上的自由端口
    RANK: 等级, 0为master, >0为worker,也可以在调用init函数时设置
    WORLD_SIZE:  进程数量,也可以在调用init函数时设置
    

    env方式可以很方便的跟Kubeflow结合进行分布式训练,如果本地测试可以使用Pytorch提供的测试工具torch.distributed.launch来提交环境变量

    $ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=0 --master_addr="127.0.0.1" --master_port=22225 mnist.py
    $ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=1 --master_addr="127.0.0.1" --master_port=22225 mnist.py
    $ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=2 --master_addr="127.0.0.1" --master_port=22225 mnist.py
    

    2.host+port的方式:指定通信的ip和端口号,可以在运行的时候输入
    3.共享文件的方式:有个多个机器都能访问到的文件夹,那么可以在这里创建个文件来实现初始化

    接下来加载数据load_data():

    def load_data(dist, batch_size=64, test_batch_size=64):
        train_kwargs = {'num_workers': 1, 'pin_memory': True}
        test_kwargs = {'num_workers': 1, 'pin_memory': True}
    ​
        train_data_set = datasets.MNIST(DATA_DIR, train=True, download=True,
                           transform=transforms.Compose([
                               transforms.ToTensor(),
                               transforms.Normalize((0.1307,), (0.3081,))
                           ]))
    ​
        if dist.is_initialized():
            # 如果采用分布式训练, 使用DistributedSampler让每个worker拿到训练数据集不同的子集
            datasampler = DistributedSampler(train_data_set)
            # sampler shuffle must be `False`
            train_kwargs.update({'sampler': datasampler,
                                 'shuffle': False
                                 })
    ​
        train_loader = torch.utils.data.DataLoader(train_data_set, batch_size=batch_size, **train_kwargs)
        test_loader = torch.utils.data.DataLoader(
            datasets.MNIST(DATA_DIR, train=False, transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))
            ])),
            batch_size=test_batch_size, shuffle=True, **test_kwargs)
    ​
        return train_loader, test_loader
    
    • Pytorch使用的是数据分布式训练,每个进程实际上是独立加载数据的,所以需要加载相同数据集后用一定的规则根据rank来顺序切割获取不同的数据子集,DistributedSampler就是用来确保dataloader只会load到整个数据集的一个特定子集的做法(实际上不用Pytorch提供的DistributedSampler工具,自己做加载数据后切分word_size个子集按rank顺序拿到子集效果也是一样)

    • 同时为了能够按顺序划分数据子集,拿到不同部分数据,所以数据集不能够进行随机打散,所以用了参数 'shuffle': False

    再接下来看模型的分布式:

    model = nn.parallel.DistributedDataParallel(model)
    
    • DistributedDataParallel 是实现多机多卡分布训练最核心东西,封装了All-Reduce方法,可以帮助我们在不同机器的多个模型拷贝之间平均梯度

    总结

    通过上面例子,我们看到Pytorch做分布式训练实现起来还是比较简单的:

    • Pytorch模型使用DistributedDataParallel方法包装来实现梯度参数的All-Reduce传递
    • 数据集需要在不同机器上按Rank进行切分,以保证每个GPU进程训练的数据集是不一样的
    • 使用Kubeflow创建Docker Pod的方式配合Pytorch env环境变量的训练非常方便

    参考


    长按二维码,关注本公众号

    微信公众号

    相关文章

      网友评论

        本文标题:深度学习分布式训练(上)-Pytorch实现篇

        本文链接:https://www.haomeiwen.com/subject/xrqkahtx.html