美文网首页机器学习与数据挖掘
深度学习分布式训练(上)-Pytorch实现篇

深度学习分布式训练(上)-Pytorch实现篇

作者: 老居搞机 | 来源:发表于2020-05-23 12:45 被阅读0次

前言

随着数据量越来越多以及模型层数的越来越复杂,深度学习展现了更强劲的效果,但随之而来也带来了负面影响:训练的时间也跟着变的更长,有的模型一训练就是好几天,怎么加快训练的时间?

俗话说人多力量大,很自然想到我们可以把多个GPU组合在一起形成一个集群训练模型来加快训练速度

本着先使用后深入理论,本篇主要讲Pytorch分布式训练的使用,下一篇将详细介绍分布式训练的原理

分布式训练

分布式训练根据并行策略的不同,可以分为模型并行和数据并行

数据并行中根据梯度同步的策略不同,又可以分为参数服务器同步和All-Reduce方式同步(当然这些都放在下一篇讲解)

本篇讲的Pytorch分布式训练采用数据并行方式,梯度信息同步采用All-Reduce

Pytorch分布式训练

废话不多说,我们在实战中学习,先跑个例子然后再慢慢解释每一段的意思,下面一段代码拷下来可以保存成mnist.py文件:

# -*- encoding: utf8 -*-
import torch
from torch import nn
import torch.nn.functional as F
import torch.distributed as dist
from torchvision import datasets, transforms
import argparse
import torch.optim as optim
from torch.utils.data.distributed import DistributedSampler
​
DATA_DIR = '~/data/mnist'
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
​
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4*4*50, 500)
        self.fc2 = nn.Linear(500, 10)
​
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4*4*50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)
​
def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
                
def test(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
            pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
​
    test_loss /= len(test_loader.dataset)
    print('\naccuracy={:.4f}\n'.format(float(correct) / len(test_loader.dataset)))
​
def load_data(dist, batch_size=64, test_batch_size=64):
    train_kwargs = {'num_workers': 1, 'pin_memory': True}
    test_kwargs = {'num_workers': 1, 'pin_memory': True}
​
    train_data_set = datasets.MNIST(DATA_DIR, train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ]))
​
    if dist.is_initialized():
        # 如果采用分布式训练, 使用DistributedSampler让每个worker拿到训练数据集不同的子集
        datasampler = DistributedSampler(train_data_set)
        # sampler shuffle must be `False`
        train_kwargs.update({'sampler': datasampler,
                             'shuffle': False
                             })
​
    train_loader = torch.utils.data.DataLoader(train_data_set, batch_size=batch_size, **train_kwargs)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST(DATA_DIR, train=False, transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])),
        batch_size=test_batch_size, shuffle=True, **test_kwargs)
​
    return train_loader, test_loader
​
def main():
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--backend', type=str, help='Distributed backend',
                        choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
                        default=dist.Backend.GLOO)
    parser.add_argument('--init-method', default=None, type=str,
                        help='Distributed init_method')
    parser.add_argument('--rank', default=-1, type=int,
                        help='Distributed rank')
    parser.add_argument('--world-size', default=-1, type=int,
                        help='Distributed world_size')
    args = parser.parse_args()
​
    dist.init_process_group(backend=args.backend,
                            init_method=args.init_method,
                            rank=args.rank,
                            world_size=args.world_size
                            )
​
    train_loader, test_loader = load_data(dist)
    model = Net().to(DEVICE)
    model = nn.parallel.DistributedDataParallel(model)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
​
    for epoch in range(1, 2):
        train(model, train_loader, optimizer, epoch)
        test(model, test_loader)
​
​
if __name__ == '__main__':
    main()

如果没有多台GPU机器,可以用本地指定端口号的方式来测试一下

在第一个终端运行:

$ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 0 --world-size 3

在第二个第三个终端再运行:

$ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 1 --world-size 3
$ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 2 --world-size 3

wow 激动人心! 在三个进程里面我们等于模拟了三台机器在做分布式训练了,训练输出结果:

我们来先看一下第一段:

parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--backend', type=str, help='Distributed backend',
                        choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
                        default=dist.Backend.GLOO)
    parser.add_argument('--init-method', default=None, type=str,
                        help='Distributed init_method')
    parser.add_argument('--rank', default=-1, type=int,
                        help='Distributed rank')
    parser.add_argument('--world-size', default=-1, type=int,
                        help='Distributed world_size')
    args = parser.parse_args()
​
    dist.init_process_group(backend=args.backend,
                            init_method=args.init_method,
                            rank=args.rank,
                            world_size=args.world_size
                            )

这一段初始化Pytorch分布式训练的参数:

  • rank:等级, 0为master, >0为worker
  • world_size:进程总数量,Pytorch会等到所有world_size个进程就绪之后才会开心训练
  • backend:指定当前进程要使用的通信后端,支持的通信后端有 gloo,mpi,nccl方式,支持如下所示:
  • init_method:分布式训练的初始化方式,默认使用环境变量方式env

1.env:读取环境变量方式,会自动读取系统中的这些环境变量:

MASTER_ADDR: 要求(0级除外), 等级0节点的地址
MASTER_PORT: 机器上的自由端口
RANK: 等级, 0为master, >0为worker,也可以在调用init函数时设置
WORLD_SIZE:  进程数量,也可以在调用init函数时设置

env方式可以很方便的跟Kubeflow结合进行分布式训练,如果本地测试可以使用Pytorch提供的测试工具torch.distributed.launch来提交环境变量

$ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=0 --master_addr="127.0.0.1" --master_port=22225 mnist.py
$ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=1 --master_addr="127.0.0.1" --master_port=22225 mnist.py
$ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=2 --master_addr="127.0.0.1" --master_port=22225 mnist.py

2.host+port的方式:指定通信的ip和端口号,可以在运行的时候输入
3.共享文件的方式:有个多个机器都能访问到的文件夹,那么可以在这里创建个文件来实现初始化

接下来加载数据load_data():

def load_data(dist, batch_size=64, test_batch_size=64):
    train_kwargs = {'num_workers': 1, 'pin_memory': True}
    test_kwargs = {'num_workers': 1, 'pin_memory': True}
​
    train_data_set = datasets.MNIST(DATA_DIR, train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ]))
​
    if dist.is_initialized():
        # 如果采用分布式训练, 使用DistributedSampler让每个worker拿到训练数据集不同的子集
        datasampler = DistributedSampler(train_data_set)
        # sampler shuffle must be `False`
        train_kwargs.update({'sampler': datasampler,
                             'shuffle': False
                             })
​
    train_loader = torch.utils.data.DataLoader(train_data_set, batch_size=batch_size, **train_kwargs)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST(DATA_DIR, train=False, transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])),
        batch_size=test_batch_size, shuffle=True, **test_kwargs)
​
    return train_loader, test_loader
  • Pytorch使用的是数据分布式训练,每个进程实际上是独立加载数据的,所以需要加载相同数据集后用一定的规则根据rank来顺序切割获取不同的数据子集,DistributedSampler就是用来确保dataloader只会load到整个数据集的一个特定子集的做法(实际上不用Pytorch提供的DistributedSampler工具,自己做加载数据后切分word_size个子集按rank顺序拿到子集效果也是一样)

  • 同时为了能够按顺序划分数据子集,拿到不同部分数据,所以数据集不能够进行随机打散,所以用了参数 'shuffle': False

再接下来看模型的分布式:

model = nn.parallel.DistributedDataParallel(model)
  • DistributedDataParallel 是实现多机多卡分布训练最核心东西,封装了All-Reduce方法,可以帮助我们在不同机器的多个模型拷贝之间平均梯度

总结

通过上面例子,我们看到Pytorch做分布式训练实现起来还是比较简单的:

  • Pytorch模型使用DistributedDataParallel方法包装来实现梯度参数的All-Reduce传递
  • 数据集需要在不同机器上按Rank进行切分,以保证每个GPU进程训练的数据集是不一样的
  • 使用Kubeflow创建Docker Pod的方式配合Pytorch env环境变量的训练非常方便

参考


长按二维码,关注本公众号

微信公众号

相关文章

网友评论

    本文标题:深度学习分布式训练(上)-Pytorch实现篇

    本文链接:https://www.haomeiwen.com/subject/xrqkahtx.html