代码来源于 https://github.com/WZMIAOMIAO/deep-learning-for-image-processing/blob/master/pytorch_classification/train_multi_GPU
up主的讲解视频 在pytorch框架下使用多卡(多GPU)进行并行训练
修改了模型和数据部分,作测试,仅记录。
多GPU数据并行训练主要包括以下方面:
- 数据
train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)
2.模型
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
3.返回结果loss,中间值
torch.distributed.all_reduce(value)
#BN层
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
完整代码
# -*- coding:utf-8 -*-
import math
import os
import sys
import argparse
import torch
from tqdm import tqdm
import torch.optim as optim
from torchvision.datasets import mnist
import torch.optim.lr_scheduler as lr_scheduler
import torchvision.transforms as transforms
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
def init_distributed_mode(args):
# 检查环境变量 RANK 和 WORLD_SIZE
if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
args.rank = int(os.environ['RANK'])
args.world_size = int(os.environ['WORLD_SIZE'])
args.gpu = int(os.environ.get('LOCAL_RANK', 0)) # 使用默认值 0
# 检查环境变量 SLURM_PROCID
elif 'SLURM_PROCID' in os.environ:
args.rank = int(os.environ['SLURM_PROCID'])
args.gpu = args.rank % torch.cuda.device_count()
else:
print('Not using distributed mode')
args.distributed = False
return
args.distributed = True
# 设置 GPU
torch.cuda.set_device(args.gpu)
# 设置通信后端
args.dist_backend = 'nccl' # 通信后端,nvidia GPU推荐使用NCCL
# 初始化分布式环境
print(f'| distributed init (rank {args.rank}): {args.dist_url}', flush=True)
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
world_size=args.world_size, rank=args.rank)
dist.barrier()
def cleanup():
"""
清理函数,用于销毁进程组。
"""
dist.destroy_process_group()
def is_dist_avail_and_initialized():
"""检查是否支持分布式环境"""
# 检查是否支持分布式环境
if not dist.is_available():
return False
# 检查是否已初始化分布式环境
if not dist.is_initialized():
return False
return True
def get_world_size():
# 检查分布式是否可用并已初始化
if not is_dist_avail_and_initialized():
return 1
# 获取分布式大小
return dist.get_world_size()
def get_rank():
# 检查分布式环境是否可用并已初始化
if not is_dist_avail_and_initialized():
return 0
# 获取当前进程的分布式排名
return dist.get_rank()
def is_main_process():
"""
判断当前进程是否为主进程
"""
return get_rank() == 0
def reduce_value(value, average=True):
# 获取当前进程的数量
world_size = get_world_size()
# 如果进程数量小于2,表示单GPU的情况,直接返回value
if world_size < 2:
return value
# 在不计算梯度的情况下,将value进行所有进程的求和操作
with torch.no_grad():
# 使用分布式训练库进行所有进程的求和操作
dist.all_reduce(value)
# 如果average为True,则将value除以进程数量,得到平均值
if average:
value /= world_size
return value
# 定义模型
class CNNNet(torch.nn.Module):
def __init__(self, in_channel, out_channel_one, out_channel_two, out_channel_three, fc_1, fc_2, fc_out):
super(CNNNet, self).__init__()
self.conv1 = torch.nn.Conv2d(in_channels=in_channel, out_channels=out_channel_one, kernel_size=5, stride=1, padding=1)
self.pool1 = torch.nn.MaxPool2d(kernel_size=2, stride=2,padding=1)
self.conv2 = torch.nn.Conv2d(in_channels=out_channel_one, out_channels=out_channel_two, kernel_size=5, stride=1,padding=1)
self.pool2 = torch.nn.MaxPool2d(kernel_size=2, stride=2,padding=1)
self.conv3 = torch.nn.Conv2d(in_channels=out_channel_two,out_channels=out_channel_three, kernel_size=5, stride=1,padding=1)
self.fc1 = torch.nn.Linear(5*5*32, fc_1)
self.fc2 = torch.nn.Linear(fc_1, fc_2)
self.output = torch.nn.Linear(fc_2, fc_out)
def forward(self, x):
x = self.pool1(torch.nn.functional.relu(self.conv1(x)))
x = self.pool2(torch.nn.functional.relu(self.conv2(x)))
x = torch.nn.functional.relu(self.conv3(x))
x = x.view(-1, x.size(1) * x.size(2) * x.size(3))
x = torch.nn.functional.relu(self.fc1(x))
x = torch.nn.functional.relu(self.fc2(x))
x = torch.softmax(self.output(x), dim=1)
return x
def train_one_epoch(model, optimizer, data_loader, device, epoch):
# 设置模型为训练模式
model.train()
# 定义交叉熵损失函数
loss_function = torch.nn.CrossEntropyLoss()
# 初始化平均损失为0
mean_loss = torch.zeros(1).to(device)
# 清空优化器的梯度
optimizer.zero_grad()
# 在进程0中打印训练进度
if is_main_process():
data_loader = tqdm(data_loader, file=sys.stdout)
# 遍历数据加载器中的每个步骤
for step, data in enumerate(data_loader):
# 获取图像和标签
images, labels = data
# 使用模型进行预测
pred = model(images.to(device))
# 计算损失
loss = loss_function(pred, labels.to(device))
# 反向传播计算梯度
loss.backward()
# 对损失进行平均
loss = reduce_value(loss, average=True)
# 更新平均损失
mean_loss = (mean_loss * step + loss.detach()) / (step + 1)
# 在进程0中打印平均损失
if is_main_process():
data_loader.desc = "[epoch {}] mean loss {}".format(epoch, round(mean_loss.item(), 3))
# 检查损失是否为非有限值
if not torch.isfinite(loss):
print('WARNING: non-finite loss, ending training ', loss)
sys.exit(1)
# 更新优化器参数
optimizer.step()
# 清空优化器的梯度
optimizer.zero_grad()
# 等待所有进程计算完毕
if device != torch.device("cpu"):
torch.cuda.synchronize(device)
# 返回平均损失
return mean_loss.item()
def evaluate(model, data_loader, device):
# 将模型设置为评估模式
model.eval()
# 用于存储预测正确的样本个数
sum_num = torch.zeros(1).to(device)
# 在进程0中打印验证进度
if is_main_process():
data_loader = tqdm(data_loader, file=sys.stdout)
# 遍历数据加载器中的每个批次
for step, data in enumerate(data_loader):
# 获取图像和标签
images, labels = data
# 使用模型进行预测
pred = model(images.to(device))
# 获取预测结果中的最大值
pred = torch.max(pred, dim=1)[1]
# 统计预测正确的样本个数
sum_num += torch.eq(pred, labels.to(device)).sum()
# 等待所有进程计算完毕
if device != torch.device("cpu"):
torch.cuda.synchronize(device)
# 将结果进行归一化处理
sum_num = reduce_value(sum_num, average=False)
# 返回预测正确的样本个数
return sum_num.item()
def main(args):
if not torch.cuda.is_available() :
raise EnvironmentError("not find GPU device for training.")
# 初始化各进程环境
init_distributed_mode(args=args)
rank = args.rank
device = torch.device(args.device)
batch_size = args.batch_size
weights_path = args.weights
args.lr *= args.world_size # 学习率要根据并行GPU的数量进行倍增
checkpoint_path = ""
if rank == 0: # 在第一个进程中打印信息,并实例化tensorboard
print(args)
print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
tb_writer = SummaryWriter()
if os.path.exists("./weights") is False:
os.makedirs("./weights")
# train_info, val_info, num_classes = read_split_data(args.data_path)
# train_images_path, train_images_label = train_info
# val_images_path, val_images_label = val_info
# check num_classes
# assert args.num_classes == num_classes, "dataset num_classes: {}, input {}".format(args.num_classes,num_classes)
data_transform = {
"train": transforms.Compose([transforms.Resize(28),
transforms.ToTensor(),
transforms.Normalize(mean=(0.1307,), std=(0.3081,))]),
"val": transforms.Compose([transforms.Resize(28),
transforms.ToTensor(),
transforms.Normalize(mean=(0.1307,), std=(0.3081,))])}
train_data_set = mnist.MNIST('./MNIST', train=True, transform=data_transform['train'], download=True)
val_data_set = mnist.MNIST("./MNIST", train=False, transform=data_transform['val'], download=True)
# 给每个rank对应的进程分配训练的样本索引
train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)
# 将样本索引每batch_size个元素组成一个list
train_batch_sampler = torch.utils.data.BatchSampler(
train_sampler, batch_size, drop_last=True)
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers
if rank == 0:
print('Using {} dataloader workers every process'.format(nw))
train_loader = torch.utils.data.DataLoader(train_data_set,
batch_sampler=train_batch_sampler,
pin_memory=True,
num_workers=nw,)
val_loader = torch.utils.data.DataLoader(val_data_set,
batch_size=batch_size,
sampler=val_sampler,
pin_memory=True,
num_workers=nw,)
# 实例化模型
model = CNNNet(1, 16, 32, 32, 128, 64, 10)
model = model.to(device)
# 如果存在预训练权重则载入
if os.path.exists(weights_path):
weights_dict = torch.load(weights_path, map_location=device)
load_weights_dict = {k: v for k, v in weights_dict.items()
if model.state_dict()[k].numel() == v.numel()}
model.load_state_dict(load_weights_dict, strict=False)
else:
checkpoint_path = os.path.join("./weights", "initial_weights.pt")
# 如果不存在预训练权重,需要将第一个进程中的权重保存,然后其他进程载入,保持初始化权重一致
if rank == 0:
torch.save(model.state_dict(), checkpoint_path)
dist.barrier()
# 这里注意,一定要指定map_location参数,否则会导致第一块GPU占用更多资源
model.load_state_dict(torch.load(checkpoint_path, map_location=device))
# 是否冻结权重
if args.freeze_layers:
for name, para in model.named_parameters():
# 除最后的全连接层外,其他权重全部冻结
if "fc" not in name:
para.requires_grad_(False)
else:
# 只有训练带有BN结构的网络时使用SyncBatchNorm采用意义
if args.syncBN:
# 使用SyncBatchNorm后训练会更耗时
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
# 转为DDP模型
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
# optimizer
pg = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005)
# Scheduler https://arxiv.org/pdf/1812.01187.pdf
lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf # cosine
scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)
for epoch in range(args.epochs):
train_sampler.set_epoch(epoch)
mean_loss = train_one_epoch(model=model,
optimizer=optimizer,
data_loader=train_loader,
device=device,
epoch=epoch)
scheduler.step()
sum_num = evaluate(model=model,
data_loader=val_loader,
device=device)
acc = sum_num / val_sampler.total_size
if rank == 0:
print(f"[epoch {epoch}] accuracy: {acc:.3f}")
tags = ["loss", "accuracy", "learning_rate"]
tb_writer.add_scalar(tags[0], mean_loss, epoch)
tb_writer.add_scalar(tags[1], acc, epoch)
tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)
torch.save(model.module.state_dict(), f"./weights/model-{epoch}.pth")
# 删除临时缓存文件
if rank == 0:
if os.path.exists(checkpoint_path) is True:
os.remove(checkpoint_path)
cleanup()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num_classes', type=int, default=10)
parser.add_argument('--epochs', type=int, default=10)
parser.add_argument('--batch-size', type=int, default=128)
parser.add_argument('--lr', type=float, default=0.01)
parser.add_argument('--lrf', type=float, default=0.1)
# 是否启用SyncBatchNorm
parser.add_argument('--syncBN', type=bool, default=False)
# 数据集所在根目录
# https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz
# parser.add_argument('--data-path', type=str, default="/home/wz/data_set/flower_data/flower_photos")
# resnet34 官方权重下载地址
# https://download.pytorch.org/models/resnet34-333f7ec4.pth
parser.add_argument('--weights', type=str, default='./weights/initial_weights.pth',
help='initial weights path')
parser.add_argument('--freeze-layers', type=bool, default=False)
# 不要改该参数,系统会自动分配
parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')
# 开启的进程数(注意不是线程),不用设置该参数,会根据nproc_per_node自动设置
parser.add_argument('--world-size', default=4, type=int,
help='number of distributed processes')
parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')
opt = parser.parse_args()
main(opt)
训练命令
python -m torch.distributed.launch --nproc_per_node=4 --use_env mutil_gpu.py
网友评论