美文网首页
Optimus论文源码阅读

Optimus论文源码阅读

作者: 懒羊羊3号 | 来源:发表于2022-03-31 15:02 被阅读0次

代码地址

https://github.com/pengyanghua/optimus

论文思路

《Optimus: An Efficient Dynamic Resource Scheduler for Deep Learning Clusters》阅读笔记

目标

通过动态分配集群资源,减少所有任务训练总时间

结论

本文在Kubernetes上实现了Optimus,并在一个有7个CPU服务器和6个GPU服务器的深度学习集群上进行了实验,使用MXNet框架运行了9个训练任务。结果表明,Optimus在作业完成时间和完成时间方面分别比典型的集群调度器高出139%和63%。

背景知识和假设

分布式训练 (MXNet [59], Tensor- Flow [23], PaddlePaddle [17], Angel [43], Petuum [67])

  • Ps(paramter server):参数服务,接受参数,更新参数,下发参数
  • W(worker):训练节点,计算梯度


    image.png

    ps和w数量决定了训练速度,ps和w的数量增多可以加速,但是过多可能因为通信开销导致变慢,图A是比例关系,图B是数量关系


    image.png

收敛

  • 实验模型并不是所有都收敛比如DNN,好在生产模型是成熟的可以更好收敛的
    • 一个epoch训练周期是所有mini-batches都处理一次。
    • 数十到数百个epoch,模型会收敛
    • 训练是一个迭代过程,将数据集划分为chunks,每个chunk进一步划分为mini-batches。训练步骤处理一个mini-batches并更新梯度(参数值)。我们还可以在每个mini-batches结束时计算训练性能指标。当所有 mini-batch 都处理完毕后,就完成了一个epoch。通常一个模型会被训练很多个 epoch(几十到几百个)直到它收敛(性能指标稳定)。


      image.png

目前的调度系统具有以下特点

1.静态的资源分配,除非手动修改配置或者重新提交任务,否则不能利用闲置资源
2.调度决策不考虑任务本身特性,导致短时间任务因为无资源饿死
3、用FIFO模式,导致很多任务等待过长

本文调度系统

用的模型
https://github.com/apache/incubator-mxnet/tree/master/example

步骤

  • 任务收敛模型【估算每个模型的训练时间】

    • 针对不同类型模型,对损失和迭代次数的关系进行建模,使得可以通过当前损失预测剩余迭代次数
    • 第一个模型用于估计作业需要完成的步骤/时期数。在给定步数k的情况下,SGD 以 O(1/k) 的速率收敛。因此我们可以使用以下模型来近似训练损失曲线:


      image.png
      image.png
  • 在每一步后获得更多数据点,模型拟合(预测误差)得到改善,如下所示:


    image.png
  • 资源-速度模型【加速关键】

    • 训练速度由前向损失计算时间,反向梯度计算时间,通讯时间等因素决定(同步训练和异步训练公式会不一样)
    • 异步训练,一个job中多个worker不同步,每个work有训练进度PS就更新;同步训练,PS搜集到所有work更新后才更新
    • ps和和w的数量可以决定上述因素
    • 可以通过ps和w数量来预测训练速度


      image.png
      image.png
  • 资源分配

    • 以上两个模型结合可以决定系统中当前的任务的每个任务的ps和w数量
    • 因为是np问题,使用一个启发式算法来确定ps和w数量
      • 为每个任务分配一个ps和一个w作为初始化
      • 计算每个任务增加一个ps或者1个w的,取一个最大收益的进行分配,贪心
      • 重复上述步骤直到资源耗尽或者增加资源成为负收益
  • 任务放置

    • 决定好每个任务的ps和w后要确定ps和w怎么放在服务器上
    • ps和w放在相同服务器上可以减少通信时间
    • 定理:采用最少的服务器来放置这些 w 和 ps,使得每个服务器内部署相同数量的 w 和 ps。


      image.png

源码阅读

论文步骤

1、任务收敛
针对不同类型模型,对损失和迭代次数的关系进行建模,使得可以通过当前损失预测剩余迭代次数
2、资源-速度
3、资源分配
4、任务放置

提出问题

1、如何计算时间
2、资源如何分配
3、任务如何放置
4、如何与K8S做交互

从目录开始

image.png

estimator.py:估算速度和epoch
experimentor.py:代码入口, 代码使用多线程通信
job.py: 每个任务具体调度
jobrepo.py: 三个任务仓库

ResNet-50_ImageNet
VGG-16_ImageNet
ResNext-110_Cifar10

optimus_scheduler.py:主调度器
params.py: 参数值

代码入口 experimentor.py

企业微信截图_0282de44-4ef2-473f-a1b5-0d3eb6801b52_副本.png
  • 清除所有job
def clear():
    os.system('kubectl delete jobs --all')
  • 为每个任务分配单独的线程
    Timer:时间驱动
    Hub:消息转发
    Generator:任务随机生成和模拟提交任务,本代码用了三个任务在jobrepo.py
    Progressor: 任务执行器
    Statsor: 任务状态收集
    UTIL_Scheduler 任务调度器,重点逻辑

主调度器optimus_scheduler.py

image.png
1、在init函数中,除了初始化参数还有一句
self.msg_handler = threading.Thread(target=self._msg_handle, args=())

重点为阅读_msg_handle这个函数


企业微信截图_b91c1b1b-fbbb-44ef-bb41-2b8df5edad8d_副本.png

while循环中每次收到一条消息,按照类型进行处理

第一次收集收集数据点,第一次估算速度,为所有未完成的job分配一个ps一个worker

       for job in self.uncompleted_jobs:
            cpu_req = job.worker_cpu + job.ps_cpu
            mem_req = job.worker_mem + job.ps_mem
            bw_req = job.worker_bw + job.ps_bw
            gpu_req = job.worker_gpu

            suff_resr = self.__check_cluster_resource_full(cpu_req, mem_req, bw_req, gpu_req)
            if suff_resr:
                job.num_worker = 1
                job.num_ps = 1
                self.cluster_used_cpu += cpu_req
                self.cluster_used_mem += mem_req
                self.cluster_used_bw += bw_req
                self.cluster_used_gpu += gpu_req
                # compute initial utility
                self.__update_util_queue(job, util_queue)
            else:
                continue

还有剩余的资源继续增加,看task_type

            if suff_resr:
                # currently no mechanism to reduce resources
                if task_type == "ps":
                    job.num_ps += 1
                elif task_type == "worker":
                    job.num_worker += 1
                self.cluster_used_cpu += cpu_req
                self.cluster_used_mem += mem_req
                self.cluster_used_bw += bw_req
                self.cluster_used_gpu += gpu_req

                self.__update_util_queue(job, util_queue)
            else:
                # no enough resource
                break
        # how to handle not_ready_jobs

__update_util_queue

  • 计算剩余时间
        # compute utility
        # allocate 1 ps or 1 worker each time.
        # sometimes can allocate multiple ps or worker for optimization, to avoid stuck in local optimal.
        end_epoch = self.estimator.est_epoch(job)
        if end_epoch <= 0:
            # error when estimating epoch
            end_epoch = job.progress + 20

        rem_epoch = end_epoch - job.progress  # the rem_epoch is negative if estimated epoch return -1
        est_speed = self.estimator.est_speed(job, job.num_ps, job.num_worker)
        self.logger.debug("estimated speed: " + str(est_speed))
        if est_speed <= 0:
            self.not_ready_jobs.add(job)
            return
        rem_time = rem_epoch / est_speed
  • 当ps+1计算时间,同理worker
    # if add ps 1
        est_speed = self.estimator.est_speed(job, job.num_ps + 1, job.num_worker)
        if est_speed <= 0:
            self.not_ready_jobs.add(job)
            return
        ps_rem_time = rem_epoch / est_speed
        resource_reqs = (job.ps_cpu, job.ps_mem, job.ps_bw)
        shares = (1.0 * job.ps_cpu / self.cluster_num_cpu, 1.0 * job.ps_mem / self.cluster_num_mem,
                  1.0 * job.ps_bw / self.cluster_num_bw)
        dom_res = shares.index(max(shares))
        ps_util = (rem_time - ps_rem_time)/ resource_reqs[dom_res]

给出ps数和工人数,预测训练速度。如果字典中已经存在,使用真正的一个

  def est_speed(self, job, num_ps, num_worker):
        """Give the number of ps and the number of worker, predict the training speed.
        Use the real one if already exists in the dict
        """
        if (num_ps, num_worker) in job.training_speeds:
            return job.training_speeds[(num_ps, num_worker)]
        else:
            # do training speed curve fitting here
            if 'async' in job.kv_store:
                if len(job.training_speeds) >= 4:
                    # do not need curve fitting each time, can be further optimized. future work
                    ps_list = []
                    worker_list = []
                    speed_list = []
                    for key, value in job.training_speeds.items():
                        (ps, worker) = key
                        ps_list.append(float(ps))
                        worker_list.append(float(worker))
                        speed_list.append(value)
                    params = self._async_speed_curve_fitting(np.array(ps_list), np.array(worker_list), np.array(speed_list))
                    if params is None:
                        self.logger.error(self.name+":: " + job.name + " " + str((num_ps, num_worker)) + " speed estimation error")
                        return -1
                    else:
                        [a, b, c, d] = params
                        est_speed = self.__async_speed_fit_func((num_ps, num_worker), a, b, c, d)
                        return est_speed
                else:
                    return -1

继续回到调度器,计算好这个时间周期所有未完成job需要多少资源后开始放置

   # placement
        ps_placements, worker_placements = self.__place(self.uncompleted_jobs)

对任务需求资源进行排序

  # sort jobs based on num_ps and num_worker
        job_sort_queue = Queue.PriorityQueue()
        for job in jobs:
            job_sort_queue.put((job.num_ps + job.num_worker, job))

对节点进行排序

  cpu_avail_queue = Queue.PriorityQueue()
        # sort nodes based on available cpus, since cpu is usually the bottleneck
        for i in range(len(params.NODE_LIST)):
            cpu_avail_queue.put((self.node_used_cpu_list[i], i))

从大到小把任务放入从小到大的节点

     for i in range(job.num_ps):
                    # place ps evenly
                    node = cand_place_nodes[i % len(cand_place_nodes)]
                    # check whether resource is enough to place this ps
                    suff_resr = self.__check_node_resource_full(node, job.ps_cpu, job.ps_mem, job.ps_bw)
                    if suff_resr:
                        ps_nodes.append(node)
                        # minus temporary resources
                        self.__deduct_resr(job, "ps", 1, node)
                    else:
                        # since node is already sorted based on resources,
                        # if a larger node can not place the task, the following one can not too
                        fit_flag = False
                        # add the deducted resource back
                        for node in ps_nodes:
                            self.__add_back_resr(job, "ps", 1, node)
                        ps_already_deduct = True
                        break

最后一些放不进去的节点,和剩余的资源。拆开他们的ps和work单独放置,同时计算会不会速度变慢

                  # have try all nodes, but still can not place, then check if we can place some tasks
                        # and place ps and worker alternatively
                        self.logger.debug("last placed job: " + job.name)
                        ps_nodes = []
                        worker_nodes = []
                        flag_place_ps = True
                        for i in range(job.num_ps + job.num_worker):
                            flag_no_resource = True
                            if flag_place_ps:
                                # place ps task
                                for node in range(len(params.NODE_LIST)):
                                    suff_resr = self.__check_node_resource_full(node, job.ps_cpu, job.ps_mem, job.ps_bw)
                                    if suff_resr:
                                        ps_nodes.append(node)
                                        self.__deduct_resr(job, "ps", 1, node)
                                        flag_no_resource = False
                                        break
                            else:
                                # place worker task
                                for node in range(len(params.NODE_LIST)):
                                    suff_resr = self.__check_node_resource_full(node, job.worker_cpu, job.worker_mem,
                                                                                job.worker_bw, job.worker_gpu)
                                    if suff_resr:
                                        worker_nodes.append(node)
                                        self.__deduct_resr(job, "worker", 1, node)
                                        flag_no_resource = False
                                        break

任务资源和K8S的交互

创建任务

    def start(self):
        # start the job in k8s
        self.logger.info("starting job " + self.name + "...")

        # job working dir
        os.system('mkdir -p ' + self.dir)
        self.ps_mount_dirs = self._set_mount_dirs('ps', self.host_workdir_prefix)  # ps container mount
        self.worker_mount_dirs = self._set_mount_dirs('worker', self.host_workdir_prefix)  # worker container mount
        self.__set_batch_size()

        # create job yamls
        self._create()

        # prepare data
        self._read_data()

        # start pods in k8s
        subprocess.check_output("kubectl create -f " + self.yaml, shell=True)

如何获得yaml文件

        # copy template file
        self.jinja = self.dir + self.name + '.jinja'
        os.system("cp ../templates/k8s-mxnet-template.jinja " + self.jinja)

        # replace variables in jinja file
        temp_file = self.jinja + '.temp'
        for key, value in variables.items():
            os.system('sed -e "s@\$' + key + '@' + value + '@g" "' + self.jinja + '"' + ' > ' + temp_file)
            os.system('rm ' + self.jinja)
            os.system('mv ' + temp_file + ' ' + self.jinja)

        # generate yaml file
        self.yaml = self.dir + self.name + '.yaml'
        os.system("python ../templates/render-template.py " + self.jinja + " > " + self.yaml)
image.png

其它相关交互

 cmd = 'kubectl get pods --selector=' + 'name=' + self.name + ',' + 'job=' + task + ' --namespace=default' + ' |grep ' + task
# get heapster cluster ip
# heapster               192.168.192.16    <none>        80/TCP              5d
cmd = "kubectl get services --namespace=kube-system | grep heapster |awk '{print $2}'"
# in case not delete all
subprocess.check_output('kubectl delete jobs --selector=name=' + self.name, shell=True)

本论文使用了K8S自带的均衡负载

The workers/parameter servers are placed in a load balancing way, according to the default behavior of Kubernetes.
用了其中两个特性 FitPredicate和PriorityFunction

candidates = []
for pod in pods:
    for node in nodes:
        if pod.label-selector == node.label or pod.resources.limits + node.allocated_resources < node.resources.capacity:   // fit predicate
        candidates.append(node)
    for candidate in candidates:
        select one according to priorities (e.g., least used resources) // priority function

general steps:
一般步骤:(1)过滤节点,(2)对过滤后的节点列表进行优先级排序(3)选择最适合的节点
Available Predicates

  • Static predicates:
    PodFitsPorts,
    PodFitsResources,
    NoDiskConflict,
    MatchNodeSelector,
    HostName
  • Configurable predicates:
    ServiceAffinity,
    LabelsPresence

Available Priority Functions

参考例子ResNet-50_ImageNet

'''
ResNet-50_ImageNet
'''
def _set_resnet50_job(job):
    num_ps = params.DEFAULT_NUM_PS
    num_worker = params.DEFAULT_NUM_WORKER
    ps_cpu = 3
    ps_mem = 9
    ps_bw = 0
    worker_cpu = 2
    worker_mem = 8
    worker_gpu = 1
    worker_bw = 0

    job.set_ps_resources(num_ps, ps_cpu, ps_mem, ps_bw)
    job.set_worker_resources(num_worker, worker_cpu, worker_mem, worker_bw, worker_gpu)

    image = 'xxx'
    script = '/init.sh'

    # must end with /, save everything including training data, validation data,
    # training log and training model into this dir
    work_dir = '/mxnet/example/image-classification/data/'
    host_workdir_prefix = '/data/k8s-workdir/experiment/'
    job.set_container(image, script, work_dir, host_workdir_prefix)

    prog = 'python train_imagenet.py --network resnet --num-layers 50 --disp-batches 5 --num-epochs 100 --data-train /data/imagenet-train.rec'
    kv_store = 'dist_sync'
    prog += ' --kv-store ' + kv_store
    if worker_gpu > 0:
        prog += " --gpus" + " " + ",".join([str(i) for i in range(int(worker_gpu))])

    job.set_train(prog=prog, batch_size=32, kv_store=kv_store, scale_bs=True)
    hdfs_data = ['/k8s-mxnet/imagenet/imagenet-train.rec']
    data_dir = '/data/'
    host_data_dir = '/data/mxnet-data/imagenet/'
    job.set_data(hdfs_data=hdfs_data, data_dir=data_dir, host_data_dir=host_data_dir, data_mounted=True)
    job.set_mxnet(kv_store_big_array_bound=1000 * 1000, ps_verbose='')

源码部署

JOB是如何跑到集群的

image.png
由调度器计算出每个job需要用多少ps和多少worker,然后设置这个参数

set_ps_resources
set_worker_resources

放置这些ps和work,需要从环境变量中获取这些资源

基于yarn的mxnet深度学习,需要配置mxnet分布式集群环境(暂时还没有配置这个)
set_worker_placement
set_ps_placement

挂载到容器的主机上的目录

_set_mount_dirs

 mount_dirs = []
        if type == 'ps':
            for i in xrange(self.num_ps):
                postfix = self.name + '-ps-' + str(i) + '/'
                mount_dir = host_workdir_prefix + postfix
                mount_dirs.append(mount_dir)
                cmd = 'ssh ' + self.ps_placement[i] + ' "sudo rm -rf ' + mount_dir + '; mkdir -p ' + mount_dir + '"'
                os.system(cmd)
设置容器

set_container

 # container description
        self.image = image
        self.script = script
        self.work_dir = work_dir
        self.host_workdir_prefix = host_workdir_prefix
        self.work_volume = work_volume
如果数据不在本地主机,从HDFS中获取,数据集列表,包括训练数据和验证数据

可以下载到本地,放到相应目录,目前找到一个cifar10的数据集和训练集
'http://data.mxnet.io/data/cifar10/cifar10_val.rec'
'http://data.mxnet.io/data/cifar10/cifar10_train.rec'
set_data 设置数据
_read_data 读取HDFS数据,如果本地有数据不会执行

        self.hdfs_data = hdfs_data
        self.data_dir = data_dir
        self.host_data_dir = host_data_dir
        self.data_mounted = data_mounted
        self.data_volume = data_volume
设置训练集

set_train
__set_batch_size
set_mxnet

创建任务,利用jinja模版,传入参数,生成相应的yaml文件

_create


        # copy template file
        self.jinja = self.dir + self.name + '.jinja'
        os.system("cp ../templates/k8s-mxnet-template.jinja " + self.jinja)

        # replace variables in jinja file
        temp_file = self.jinja + '.temp'
        for key, value in variables.items():
            os.system('sed -e "s@\$' + key + '@' + value + '@g" "' + self.jinja + '"' + ' > ' + temp_file)
            os.system('rm ' + self.jinja)
            os.system('mv ' + temp_file + ' ' + self.jinja)

        # generate yaml file
        self.yaml = self.dir + self.name + '.yaml'
        os.system("python ../templates/render-template.py " + self.jinja + " > " + self.yaml)

尝试运行有成功生成一些yaml文件,里面ip参数不太对,其它参数也可能有问题,所以执行不起来。


image.png
获取训练状态,主要是获取正在进行的任务和值损失列表

_read_progress_stats
get_training_progress_stats

 self._read_progress_stats()
        return (list(self.progress_list), list(self.val_loss_list))
获取训练速度, 打卡本地文件获取

_read_training_speed
get_training_speed

     '''
            cmd = 'scp ' + node + ':' + local_file + ' ' + self.dir # the new txt will replace the old one, no need to delete
            os.system(cmd)
            try:
                with open(self.dir+speed_fn, 'r') as fh:
                    stb_speed = float(fh.readline().replace('\n', '').split(' ')[1])
                    self.speed_list[i] = float('%.3f'%(stb_speed))
            except Exception as e:
                print e
                continue
            '''
            cmd = "ssh " + node + " 'cat " + local_file + "'"
获取k8s pods

__get_pods

        cmd = 'kubectl get pods --selector=' + 'name=' + self.name + ',' + 'job=' + task + ' --namespace=default' + ' |grep ' + task
        output = subprocess.check_output(cmd, shell=True)
获取job的各项指标

_read_metrics
get_metrics

   # cpu: milli core, mem: bytes, net: bytes/second
        metric_keys = ['cpu/usage_rate', 'memory/usage', 'network/tx_rate', 'network/rx_rate']
运行开始函数

start

    def start(self):
        # start the job in k8s
        self.logger.info("starting job " + self.name + "...")

        # job working dir
        os.system('mkdir -p ' + self.dir)
        self.ps_mount_dirs = self._set_mount_dirs('ps', self.host_workdir_prefix)  # ps container mount
        self.worker_mount_dirs = self._set_mount_dirs('worker', self.host_workdir_prefix)  # worker container mount
        self.__set_batch_size()

        # create job yamls
        self._create()

        # prepare data
        self._read_data()

        # start pods in k8s
        subprocess.check_output("kubectl create -f " + self.yaml, shell=True)
删除任务和删除所有任务
        subprocess.check_output('kubectl delete jobs --selector=name=' + self.name, shell=True)

安装过程中遇到的问题

1、ssh nodelist配置到集群,ssh命令运行失败,堡垒机需要跳转


image.png

2、getenv没有配置完全,需要一个个配置,或者删掉一些再做尝试


image.png
image.png
image.png

3、还需要收集所有相关训练任务的数据集和训练集

结论

动态部署可以利用论文这种方法,生成配置文件,与集群和外界环境的交互可以直接在python命令加shell脚本来获取和运行。

相关文章

网友评论

      本文标题:Optimus论文源码阅读

      本文链接:https://www.haomeiwen.com/subject/xxknjrtx.html