美文网首页大数据,机器学习,人工智能
Intel-BigDL 训练过程(DistriOptimize)

Intel-BigDL 训练过程(DistriOptimize)

作者: 由木人_番茄 | 来源:发表于2019-03-27 15:12 被阅读0次

    Intel-BigDL DistriOptimizer内部过程分析

    Intel深度学习库BigDL在分布式模式下进行Model的训练是非常简单的,用户只需要提供需要训练的Model,训练集(RDD[Sample] or DataSet),损失函数(Criterion),batchSize(若训练集是DataSet则不需要提供),选择需要使用的优化方法(默认是SGD),然后通过val optimizer = new Optimizer(...)创建一个Optimizer实例,最后调用optimizer.optimize()方法即可。根据具体的需要,还可以在训练过程中进行验证(需提供验证集,validationMethod,batchSize等),训练过程可视化(通过TrainSummary和ValidationSummary)。

    BigDL分布式训练过程

    BigDL中Model的训练主要分为如下几步:

    1. 创建Optimizer实例并设置自定义的训练的正常结束条件,优化方法,是否需要可视化等。
    2. 调用Optimizer实例的optimize()方法开始训练。
      1. 初始化训练参数。
      2. 对于每个IIteration。
        • 在Driver端初始化累加器,metrics等。
        • 在Executor上获取最新的参数,并将随机获取的batch划分为_subModelNumber个miniBatch。
        • 根据当前model和miniBatch计算Gradient。
        • Aggregate Gradiens。
        • 根据梯度进行参数更新。
        • 根据触发条件进行Validation,保存checkpoint和Summary等。
    3. 获取训练好的参数并返回训练后的Model。

    这个过程的流程如如下:

    optim-1.jpg

    其中最重要的就是第2步了,下面将详细进行讲解。

    创建Optimizer实例

    在创建Optimizer实例时,BigDL会根据传入的参数类型返回特定的Optimizer,目前BigDL中实现的Optimizer有DistriOptimizer和LocalOptimizer两种,其中使用最多的是DistriOptimizer,下面的讲解也只针对DistriOptimizer进行。

    // create optimizer
    val optimizer = Optimizer(
      model = model,
      sampleRDD = trainSamples,
      criterion = new ClassNLLCriterion[Float](),
      batchSize = param.batchSize
    )
    

    在创建Optimizer实例时传入参数和返回的Optimizer实例的对应关系如下:

    object Optimizer(){
    def applay(model,sampleRDD,criterion,batchsize,feapad=null,labpad=null)=new DistriOptimizer[T](model,DataSet.rdd(sampleRDD)->SampleToMiniBatch(batchsize,_feapad,_labpad).asInstanceOf[DistributeDataSet[MiniBatch[T]]],criterion).asInstanceOf[Optimizer[T,MiniBatch[T]]
    
    def applay(model,sampleRDD,criterion,batchsize,minBatchImp1)=new DistriOptimizer[T](model,DataSet.rdd(sampleRDD)->SampleToMiniBatch(miniBatchImp1,batchsize,None).asInstanceOf[DistributeDataSet[MiniBatch[T]]],criterion).asInstanceOf[Optimizer[T,MiniBatch[T]]
    
    def applay(model,dataset,criterion)= dataset match {
    case d:DistributedDataset[T]=> new DistriOptimizer[T](model,dataset.asInstanceOf[DistributedDataSet[MiniBatch[T]]],criterion).asInstanceOf[Optimizer[T,D]]
    case d :LocalDataSet[_] =>
    new LocalOptimizer[T](model,dataset.asInstanceOf[LocalDataSet[MiniBatch[T]],criterion).asInstanceOf[Optimizer[T,D]])
    }
    

    Model初始化

    参数初始化过程需要完成的工作包括:

    • 清除optimMethod的历史信息,并初始化某些(超)参数;
    • 获取不同optimMethod和subModel之间的对应关系,以subModuleName ->(storageOffset,length,AllReduceParameter)的形式返回Model的weights和bias;
    • 对训练集进行类型转换并Cache到内存中prepareInput();
    • 初始化模型参数,并按照训练集的Partition构建对应的RDD[Cache];
    • Option:建立checkpoint的保存目录。

    其中最重要的就是根据训练集生成对应的RDD[Cache],并对Model参数进行初始化。需要特别提出的是在同一个Worker上的Model共享相同的Parameters(Weights and Bias)即使用相同的存储空间但是Gradients则不共享。

    1. 清除历史记录并设置(超)参数。
    // clear history
    optimMethods.values.foreach { optimMethod =>
              optimMethod.clearHistory()
    }
    // set HyperParameters
    if (optimMethods.size == 1) {
        optimMethods.head._2.loadFromTable(state)
    }
    
    1. subModelName -> AllReduceParameter的形式返回Model参数。
    AllReduceParameter 表示存储在block manager中的parameter。在分布式模式下,每个worker从block manager同步parameter,此时Block manager相当于是parameter server。 Tensor 会被划分为`partitionNum`块(chunk),每一块存储到对应的node(spark executor)。gradient也采用同样的方法存储到不同的node。这样gradient aggregation 和parameter update就可能独立进行。
    
    // AllReduceParameter中定义的主要函数有
    def getWeights(localParameter: Tensor[T]): FutureResult[Int] 
    def aggregateGradientPartition(avgNumbers: Int): Unit
    def putGradients(parameter: Tensor[T]): Unit
    def sendWeightPartition(): Unit
    

    获取subModelName和AllReduceParameter对应关系的代码如下:

        val parameters = if (optimMethods.size != 1) {
          val p = optimMethods.map{case (subModuleName, optimMethods) =>
            val subModule = model(subModuleName)
            val subModuleWeights = subModule.get.getParameters()._1
            (subModuleName, subModuleWeights)
          }
          val sortedWeights = p.values.toArray.sortWith((a, b) => a.storageOffset() < b.storageOffset())
          val compactWeights = Module.isCompact(sortedWeights)
          p.map{case (subModuleName, weights) =>
            (subModuleName, AllReduceParameter.newParameter[T](
              partitionNum, weights.nElement(), weights.storageOffset()))
          }
        } else if (optimMethods.contains(model.getName())) {
          Map(model.getName() -> AllReduceParameter.newParameter[T](
            partitionNum, modelParameters._1.nElement()))
        } else {
          throw new IllegalArgumentException(...)
        }
    
    1. 对训练集进行类型转换并Cache到内存中。
    if (!dataset.asInstanceOf[DistributedDataSet[MiniBatch[T]]].isCached){
          DistriOptimizer.logger.info("caching training rdd ...")
         DistriOptimizer.prepareInput(this.dataset,this.validationDataSet)
     }
    
    1. 模型参数初始化并构建RDD[Cache]。

    在DistriOptimizer中通过调用DistriOptimizer.initThreadModels(...)来实现模型参数的划分,参数初始化和RDD[Cache]的生成。具体代码如下:

    // Init engine and cache models, weights, gradients, criterions, state tables and validation methods on worker nodes.
    val broadcast = sc.broadcast((criterion, state, validationMethods, optimMethod))
     // Notes: All models returned by modelBroadcast.value() share the same weight&bias, while gradWeight&gradBias is unshared.
     val modelBroadcast = ModelBroadcast[T]().broadcast(sc, model)
    
    然后在每个worker上执行
    val partitionId = TaskContext.getPartitionId
    val (broadcastCriterion, broadcastState, broadcastMethod,broadcastOptim) = broadcast.value
    进行Singleton检查 //true if current execution is a singleton on the JVM
    
     val cached = (0 until _subModelNumber).map { _ =>
            val localModel = modelBroadcast.value(true)
            // differentiate partition models from each other by partition ID
            setModelId(localModel, partitionId)
            val localCriterion = broadcastCriterion.cloneCriterion()
            val localState = broadcastState.clone()
            val localMethod =
              if (broadcastMethod.isDefined) Some(broadcastMethod.get.map(_.clone())) else None
            val (weights, grads) = localModel.getParameters()
            (localModel, weights, grads, localCriterion, localState, localMethod)
          }.toArray
    
    // 初始化参数,调用 AllReduceParameter.init(...)
    val weights = cached.head._2
          parameters.foreach(v =>
            v._2.init(weights.narrow(1, v._2.paramOffset, v._2.size))
          )
    
    val models = RDD[Cache[T]] .persist() // Memory_only
    

    其中Cache是一个case class,具体结构如下:

    // model cached in memeroy using following format
    /**
      * Optimizer cache some metadata on each executor
      *
      * @param localModels cached models
      * @param modelWeights weights of the cached models
      * @param modelGradients gradients of the cached models
      * @param localCriterions cached criterion
      * @param localStates cached state
      * @param moduleTimeList module running time
      * @param localMethods cached validation methods
      * @param optimMethods cached optim methods
      * @tparam T Tensor element type
      */
    case class Cache[T](
                         localModels: Array[Module[T]],
                         modelWeights: Array[Tensor[T]],
                         modelGradients: Array[Tensor[T]],
                         localCriterions: Array[Criterion[T]],
                         localStates: Array[Table],
                         var moduleTimeList: Array[Long] = null,
                         localMethods: Array[Option[Array[ValidationMethod[T]]]],
                         optimMethods: Map[String, OptimMethod[T]]
                       )
    

    在这个过程中最重要的就是进行Model参数的初始化和参数的存储,在正式进行训练的时候,dataRDD与models(RDD[Cache[T]]进行zipPartitions操作,从而在每个Executor上完成梯度的计算。
    最终经过参数初始化之后的RDD[Cache[T]]结构如下:

    optim-2.png
    1. Option:建立checkpoint的存储目录
    if (checkpointPath.isDefined) {
          val file = checkpointPath.get + "/" +
            new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime())
          new File(file).mkdir()
          checkpointPath = Some(file)
    }
    

    分布式训练

    BigDL中以Iteration为训练的基本单位,首先需要在Driver端进行训练过程信息的初始化,如设置当前已经训练的多少个Iteration,多少个Epoch,当前的loss是多少等。这些信息都以Table的形式保存在optimMethod.state和driverState中。
    分布式训练本身是一个比较复杂的过程,为了方便用户能实时跟踪模型的训练过程,BigDL还需要收集训练过程中的多种信息,在正式开始训练之前也需要对这些进行初始化。
    当上面的工作都完成之后,就正式进入训练了。对应每个迭代,首先需要初始化累加器(lossSum和recordsSum),一些必要的状态信息也需要进行初始化。然后调用dataRDD.zipPartitions(models,preservesPartitioning=true){(data,modelIter) => ...}开始在每个Executor上执行具体的计算任务。

    同步最新的参数到worker

    Note: All models in cached share the same storage for weights, so we only need to copy the weights from parameter server into the first model's weights.

    val weightsResults = parameters.values.map(p =>
    p.getWeights(cached.modelWeights.head.narrow(1, p.paramOffset,p.size))
    ).toArray
    

    划分miniBatch

    对于每个Task,BigDL会从当前Partition中随机获取一个Batch,然后将它划分为多个miniBatch,miniBatch的个数是当前Executor能并行的MapTask数——即当前Executor的core数(_subModelNumber)。

    val cached = modelIter.next()
    val batch = data.next()
    ...
    var b = 0
    while (b < _subModelNumber) {
       miniBatchBuffer(b) = batch.slice(b * stackSize + 1, stackSize)
       b += 1
    }
    

    梯度计算-model-forward-backward-Job

    在参数同步和miniBatch划分完成之后就开始正式开始调用forward和backward方法来进行梯度的计算了。

    optim-3.jpg

    具体代码如下:

    val trainingThreads = Engine.default.invokeAndWait2((0 until _subModelNumber).map(i =>() => {
        val trainStart = System.nanoTime()
        val localModel = cached.localModels(i)
        localModel.training()
        val localCriterion = cached.localCriterions(i)
        val input = miniBatchBuffer(i).getInput()
        val target = miniBatchBuffer(i).getTarget()
        val output = localModel.forward(input)
        lossArray(i) = ev.toType[Double](localCriterion.forward(output, target))
        val errors = localCriterion.backward(output, target)
        localModel.backward(input, errors)
        cached.moduleTimeList(i + pre) = System.nanoTime() - trainStart + weightSyncTime
        i
     }
    

    当这个Partition中的梯度就算完成之后,BigDL会把_subModelNumber个MapTask的梯度聚合(sum)到一起,然后将最终的结果保存到BlockManager中,以待后续其他worker拉取梯度并更新参数时使用。
    这里的gradient aggregate是在同一个executor中进行。

    //获取已经计算完成的梯度
    val finishedGradients = finishedThreads.map(cached.modelGradients(_))
    // 梯度聚合(sum)
    if (finishedThreads.nonEmpty) {
     parameters.values.foreach { p =>
       time = System.nanoTime()
       val pOffset = p.paramOffset
       val pLength = p.size
       val taskSize = pLength / _subModelNumber
       val extraTask = pLength % _subModelNumber
       // Aggregate multi-model's gradient to the first model's gradient
       val parallelNum = if (taskSize == 0) extraTask else _subModelNumber
       Engine.default.invokeAndWait((0 until parallelNum).map(tid => () => {
         val offset = pOffset + tid * taskSize + math.min(tid, extraTask)
         val length = taskSize + (if (tid < extraTask) 1 else 0)
         var i = 1
         while (i < finishedGradients.length) {
         finishedGradients(0).narrow(1, offset, length)
           .add(finishedGradients(i).narrow(1, offset, length))
           i += 1
         }
         }))
       //将梯度保存到BlockManager中
       // Put first finished model's gradient who aggregated
       // all other models' gradient to AllReduceParameter
       p.putGradients(finishedGradients(0).narrow(1, pOffset, pLength))
    }else{
       // zero gradient in BlockManager when no thread finished.
          cached.modelGradients(0).zero()
          parameters.values.foreach{p =>
          p.putGradients(cached.modelGradients(0).narrow(1, p.paramOffset, p.size))}
    

    参数更新-Parameter-synchronization-Job

    Aggregate gradients
    在进行梯度合并的时候,BigDL先将每个Task计算得到的gradient划分为n(n为Task的数目)份,然后启动n个Task来进行梯度聚合,每个task只负责处理对应的部分,如Task1只负责聚合每个local gradient中的第一部分。

    optim-4.jpg

    这里的gradient aggregate是在不同Executor间进行的:

    // enough records were processed for this batch, so update the model
    val value = lossSum.value / numFinishedModelUpdates
    
    driverState("numFinishedModel") = numFinishedModelUpdates
    // isGradientUpdated is flag to mark whether gradient is updated. May changed in the future.
    driverState("isGradientUpdated") = false
    // parameterProcesser like L2NormClippingProcessor may aggregate gradient,
    // and change the value of isGradientUpdated in driverState.
    parameters.foreach { p =>
      parameterProcessers.foreach(_.collectGlobalData(models, p._2, metrics, driverState))
    }
    val isGradientUpdated = driverState[Boolean]("isGradientUpdated")
    val stateBroadcast = sc.broadcast(driverState)
    ...
    }.reduce(_ + _) //梯度计算 Spark Job
    

    进行gradients aggregate的具体代码如下:

    /**
     * Retrieve gradients for the slice of the model that this node is responsible for from all the
     * other nodes. A new thread is created for each separate node. The gradients are then summed
     * and then stored in decompressed form in `gradientPartition`.
     * @param avgNumbers average numbers.
     */
    def aggregateGradientPartition(avgNumbers: Int): Unit = {
      require(partitionId < partitionNum, s"This parameter was created with $partitionNum " +
        s"partitions. It cannot be used on RDDs with > $partitionNum partitions.")
      // 从不同的node fetch 对应的gradients 
      val params = new Array[CompressedTensor[T]](partitionNum)
      val sgThreads = (0 until partitionNum).map { pid =>
        new Callable[Int] {
          override def call(): Int = {
            try {
              val blockId = getGradientBlockId(pid, partitionId)
              val tmp = BlockManagerWrapper.getLocalOrRemoteBytes(blockId).get
              params(pid) = SerializerInstance.create(tmp)
              BlockManagerWrapper.unlock(blockId)
              pid
            } catch {
              case t: Throwable =>
                logger.error("Error: " + ExceptionUtils.getStackTrace(t))
                throw t
            }
          }
        }
      }
      syncPool.invokeAll(sgThreads.asJava)
    
        // 不同部分的梯度进行求和
      val length = taskSize + (if (partitionId < extraSize) 1 else 0)
      val poolSize = Engine.default.getPoolSize
      val innerTaskSize = length / poolSize
      val innerExtraSize = length % poolSize
      val availableTask = if (innerTaskSize == 0) innerExtraSize else poolSize
      computePool.invokeAll((0 until availableTask).map(tid =>
        new Callable[Int] {
          override def call(): Int = {
            val innerStart = tid * innerTaskSize + math.min(innerExtraSize, tid)
            val innerLength = innerTaskSize + (if (tid < innerExtraSize) 1 else 0)
            params.reduce { (l, r) =>
              l.add(r.bytes(innerStart, innerLength), innerStart, innerLength)
            }
            tid
          }
        }
      ).asJava)
      params.head.deCompress(gradientPartition)
      gradientPartition.div(ev.fromType(avgNumbers))
    }
    

    当所有的Gradient都完成聚合之后,先使用parameterProcesser对gradient进行处理,目前BigDL中实现了ConstanClippingProcessor和L2NormClippingProcessor,其中使用ConstantClippingProcessor不会更新driverState("isGradientUpdated")的值。
    上述处理完成之后将进行参数的更新,同时还需要更新optimMethods的信息,代码如下:

     models.mapPartitions { modelIter =>
        val modelCache = modelIter.next()
        // 如果gradient aggregate没有完成,那么需要等待其完成在进行参数更新
        if (!isGradientUpdated) {
            val getG = System.nanoTime()
    // 这里完成不同Executor间的gradient聚合    parameters.values.foreach(_.aggregateGradientPartition(numFinishedModelUpdates))
            driverMetrics.add("aggregrateGradientParition average executor",
            System.nanoTime() - getG)
         }
        parameters.foreach { p =>
        parameterProcessers.foreach(_.processParameters(p._2, modelCache, driverState))
         }
        // 开始对参数进行更新
        modelCache.optimMethods.foreach{case (name,optimMethod =>
          // 更新状态信息optimMethod.state
          // 计算新的weight
          ...
          val p = parameters(name)
          optimMethod.optimize(_ => (ev.fromType(value),p.gradientPartition),p.weightPartition)
          // 将更新后的weight保存到对应的BlockManager
          p.sendWeightPartition()
         }
         Iterator.empty
    }.count()  // 参数更新 Spark Job
    

    更新超参数

    参数更新完成之后,还需要进行超参数的更新和输出这个Iteration的一些训练信息。

    // 更新超参数
    optimMethods.foreach{ v =>
        v._2.updateHyperParameter()
    }
    // 打印训练的中间信息 epoch iteration neval learningRate Loss Throughput records等
    

    除此之外,BigDL还需要计算Threshold,计算完成之后还需要清除一些存储在不同node的时间信息。

    if (dropPercentage > 0.0 && iteration > warmupIterationNum &&
        iteration % computeThresholdbatchSize == 0) {
        val moduleTimeList = models.mapPartitions { iter =>
           iter.next().moduleTimeList.iterator}.collect()
        val k = (dropPercentage * computeThresholdbatchSize * driverSubModelNum).toInt
        if (k > dropModelNumBatch) {
            threshold = Util.kthLargest(moduleTimeList, 0, moduleTimeList.length-1,
            k - dropModelNumBatch)
        } else {
            threshold = (threshold * 1.01).toLong
        }
        logger.info("threshold: " + threshold)
         // clear moduleTimeList in each node
         models.mapPartitions { iter =>
            val timeList = iter.next.moduleTimeList
            var i = 0
             while (i < timeList.length) {
               timeList(i) = 0
               i += 1
             }
             Iterator.empty
            }.count()
          dropModelNumBatch = 0
        }
    

    训练集shuffle

    当已经训练过的Records数等于训练集的总数时,BigDL认为这完成了一个Epoch的训练,然后需要对整个训练集进行shuffle操作,重新分配每个Partition的Records。

    // 完成一个epoch进行数据shuffle 
    if (recordsProcessedThisEpoch >= numSamples) {
        ...
        dataset.shuffle()
        ataRDD = dataset.data(train = true)
        recordsProcessedThisEpoch = 0
    }
    

    注意:因为在每个IterationBigDL是从Partition中随机获取一个batch进行训练,故当recordsProcessedThisEpoch >= numSamples时,实际上这个epoch可能没有覆盖所有的samples。

    Validation

    当触发条件满足时,BigDL会根据创建Optimizer实例时设置的ValidattionDataset,ValidationMethod进行validate,然后进行validationSummary。

    // 调用optimizer.optimize()之前设置
    optimizer.setValidation(
      trigger = Trigger.everyEpoch,
      sampleRDD = testSamples,
      vMethods = Array(new Top1Accuracy, new Top5Accuracy, new Loss),
      batchSize = param.batchSize
    )
    //判断是否要进行validation
    val trigger = validationTrigger.get
    if (!trigger(state)) {
         return
    }
    // validate
    val results = ZippedPartitionsWithLocalityRDD(models, validateRDD)((modelIter, dataIter) => {
          val cached = modelIter.next()
          val vMethodsArr = cached.localMethods
          val workingModels = cached.localModels
          workingModels.foreach(_.evaluate())
          dataIter.map(batch => {
          ...
          Engine.default.invokeAndWait(
              (0 until parallelism).map(b =>
          ...
    // validation Summary
    if(validationSummary.isDefined) {
          results.foreach { r =>
            val result = r._1.result
            validationSummary.get.addScalar(r._2.toString(), result._1,
              state[Int]("neval") - 1
            )
          }
        }
    

    TrainSummary

    BigDL可以使用tensorboard可视化训练过程,默认会收集Loss,LearningRate和Threshold信息,其他的用户可以通过TrainSummary进行设置,官方目前仅仅支持LearningRate, Loss, Throughput, Parameters(包括weight,gradWeight,bias,gradBias和一些running status(eg runningMean and runningVar in BatchNormalization)。用户也可以继承抽象类Summary实现自己的TrainSummary和ValidationSummary。

    // 调用optimizer.optimize()之前设置
    val trainSummary = TrainSummary(param.summaryPath.get,param.appname)
    val validationSummary = ValidationSummary(param.summaryPath.get,param.appname)
    trainSummary.setSummaryTrigger("LearningRate",Trigger.everyEpoch)
    trainSummary.setSummaryTrigger("Parameters",Trigger.everyEpoch)
    optimizer.setTrainSummary(trainSummary)
    optimizer.setValidationSummary(validationSummary)
    ...
     val parametersTrigger = trainSummary.getSummaryTrigger("Parameters")
    if (parametersTrigger.isDefined && parametersTrigger.get(driverState)) {
        // Parallelize to create Histogram
       ....
       val scalarTrigger = trainSummary.getScalarTriggers()
       // Not parallelizable, because driverState is changing each iteration.
    
       scalarTrigger.foreach { v =>
          if (v._2(driverState)) {
              trainSummary.addScalar(
              v._1, driverState[Float](v._1), currentIteration)
    

    checkpoint

    根据用户设置的触发条件,BigDL可以在训练过程中保存checkpoint,训练中断后也可以通过checkpoint来恢复训练。

    // 调用optimizer.optimize()之前设置
    // save checkpoint
    if(param.checkpoint.isDefined){
     optimizer.setCheckpoint(param.checkpoint.get,Trigger.severalIteration(param.checkpointIteration))
    }
    if(param.overWriteCheckpoint){
      optimizer.overWriteCheckpoint()
    }
    ...
    cacheTrigger.foreach { trigger =>
          cachePath.foreach { path =>
          if (trigger(state)) {
              saveModel(getModel(models, parameters, trainingModel), cachePath, isOverWrite, s".${state[Int]("neval")}")
              optimMethods.foreach{case (name, optimMethod) =>
              saveOptimMethod(optimMethod, cachePath, isOverWrite, s"-$name.${state[Int]("neval")}") }
    

    Fetch parameters and return trainedModel

    当训练结束之后,BigDL会Fetch Model当前的Parameters到Driver,并将其copy到trianingModel,然后将训练好的Model返回,这个过程是通过调用DistriOptimizer.getModel(models,parameters,model)来实现的。

    // Fetch current model parameters to driver, and copy to trainingModel.
    val partitionNum = models.partitions.length
    val extraState = models.map(_.localModels.head.getExtraParameter()).first()
    trainingModel.setExtraParameter(extraState)
        
    // make sure gradient is as the same length as weight
    val parameterArray = trainingModel.parameters()
    (0 until parameterArray._2.length).foreach(i =>
        parameterArray._2(i).resizeAs(parameterArray._1(i))
    )
        
    val (parameter, gradientParameter) = trainingModel.getParameters()
        
    parameters.foreach { case (moduleName, p) =>
        val currentModule = trainingModel(moduleName)
        require(currentModule.isDefined, s"Couldn't find $moduleName in $trainingModel")
        val (weights, gradients) = models.mapPartitions(iter => {
             val cached = iter.next()
             val curPartitionId = TaskContext.getPartitionId()
             Iterator.single((Map(curPartitionId -> p.weightPartition),
                  Map(curPartitionId -> p.gradientPartition)))
         }).reduce((a, b) => (a._1 ++ b._1, a._2 ++ b._2))
        
         val taskSize = p.size / partitionNum
         require(taskSize != 0, "parameter length should not less than partition number")
         val extraSize = p.size % partitionNum
        
         (0 until partitionNum).map(pid => {
            val start = p.paramOffset + pid * taskSize + math.min(pid, extraSize)
            val length = taskSize + (if (pid < extraSize) 1 else 0)
            parameter.narrow(1, start, length).copy(weights(pid))
            gradientParameter.narrow(1, start,length).copy(gradients(pid))
              })
            }
    trainingModel
    

    到此BigDL的分布式训练过程就结束了,用户只需要调用下面的方法即可。

    val trainedModel = optimizer.optimize()
    

    Reference:

    [1] BigDL: A Distributed Deep Learning Framework for Big Data

    相关文章

      网友评论

        本文标题:Intel-BigDL 训练过程(DistriOptimize)

        本文链接:https://www.haomeiwen.com/subject/tamzvqtx.html