美文网首页
Spark MLlib LinearRegression线性回归

Spark MLlib LinearRegression线性回归

作者: SmileySure | 来源:发表于2018-03-07 18:20 被阅读0次

    线性回归

    简书不支持Latex。。。这一部分在csdn
    http://blog.csdn.net/u010557442/article/details/79474920

    源码分析

    • MLlib源码分析

      • 建立线性回归

      • org/apache/spark/mllib/regression/LinearRegression.scala

      • object LinearRegressionWithSGD //是LogisticRegressionWithSGD的伴生对象(可以理解为单例)
        //主要定义train方法,传递训练参数和RDD给run方法,是LogisticRegressionWithSGD类的入口
        
        //基于随机梯度下降法的线性回归模型
        //损失函数f(weights) = 1/n ||A weights-y||^2^
        class LinearRegressionWithSGD private[mllib] (//在mllib包下的
            private var stepSize: Double,//迭代步长
            private var numIterations: Int,//迭代次数
            private var regParam: Double,//正则化参数
            private var miniBatchFraction: Double)//迭代参与样本的比例
          extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable
          //采用最小平方损失函数的梯度下降法
          private val gradient = new LeastSquaresGradient()
          //采用简单梯度更新,无正则化
          private val updater = new SimpleUpdater()
          @Since("0.8.0")
          //新建梯度优化计算方法
          override val optimizer = new GradientDescent(gradient, updater)
            .setStepSize(stepSize)
            .setNumIterations(numIterations)
            .setRegParam(regParam)
            .setMiniBatchFraction(miniBatchFraction)
        
      • 训练的run方法

      • org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala

      •   /**
           * Run the algorithm with the configured parameters on an input RDD
           * of LabeledPoint entries starting from the initial weights provided.
           *
           */
          @Since("1.0.0")
          def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
            //特征维度
            //求RDD中features的数量
            if (numFeatures < 0) {
              numFeatures = input.map(_.features.size).first()
            }
            //输入样本检测
            if (input.getStorageLevel == StorageLevel.NONE) {
              logWarning("The input data is not directly cached, which may hurt performance if its"
                + " parent RDDs are also uncached.")
            }
        
            // Check the data properties before running the optimizer
            if (validateData && !validators.forall(func => func(input))) {
              throw new SparkException("Input validation failed.")
            }
        
            /**
                数据降维,优化过程中,收敛取决于训练数据的维度,降维提高收敛速度
                目前只适用于逻辑回归
             */
            val scaler = if (useFeatureScaling) {
              new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features))
            } else {
              null
            }
        
            // Prepend an extra variable consisting of all 1.0's for the intercept.
            //增加偏置项bias:θ0常数项
            // TODO: Apply feature scaling to the weight vector instead of input data.
            val data =
              if (addIntercept) {
                if (useFeatureScaling) {
                  input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()
                } else {
                  //在feature后添加bias,持久化到内存
                  input.map(lp => (lp.label, appendBias(lp.features))).cache()
                }
              } else {
                if (useFeatureScaling) {
                  input.map(lp => (lp.label, scaler.transform(lp.features))).cache()
                } else {
                  input.map(lp => (lp.label, lp.features))
                }
              }
        
            /**
             * TODO: For better convergence, in logistic regression, the intercepts should be computed
             * from the prior probability distribution of the outcomes; for linear regression,
             * the intercept should be set as the average of response.
             * 初始权重,包括增加偏置项
             */
            val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) {
              appendBias(initialWeights)
            } else {
              /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */
              initialWeights
            }
            //权重训练优化,进行梯度下降学习,返回最优权重
            val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
            //获取偏置项
            val intercept = if (addIntercept && numOfLinearPredictor == 1) {
              weightsWithIntercept(weightsWithIntercept.size - 1)
            } else {
              0.0
            }
            //获取权重
            var weights = if (addIntercept && numOfLinearPredictor == 1) {
              Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1))
            } else {
              weightsWithIntercept
            }
        
            /**
             * The weights and intercept are trained in the scaled space; we're converting them back to
             * the original scale.
             * 对于降维,权重要进行还原
             * Math shows that if we only perform standardization without subtracting means, the intercept
             * will not be changed. w_i = w_i' / v_i where w_i' is the coefficient in the scaled space, w_i
             * is the coefficient in the original space, and v_i is the variance of the column i.
             */
            if (useFeatureScaling) {
              if (numOfLinearPredictor == 1) {
                weights = scaler.transform(weights)
              } else {
                /**
                 * For `numOfLinearPredictor > 1`, we have to transform the weights back to the original
                 * scale for each set of linear predictor. Note that the intercepts have to be explicitly
                 * excluded when `addIntercept == true` since the intercepts are part of weights now.
                 */
                var i = 0
                val n = weights.size / numOfLinearPredictor
                val weightsArray = weights.toArray
                while (i < numOfLinearPredictor) {
                  val start = i * n
                  val end = (i + 1) * n - { if (addIntercept) 1 else 0 }
        
                  val partialWeightsArray = scaler.transform(
                    Vectors.dense(weightsArray.slice(start, end))).toArray
        
                  System.arraycopy(partialWeightsArray, 0, weightsArray, start, partialWeightsArray.length)
                  i += 1
                }
                weights = Vectors.dense(weightsArray)
              }
            }
        
            // Warn at the end of the run as well, for increased visibility.
            if (input.getStorageLevel == StorageLevel.NONE) {
              logWarning("The input data was not directly cached, which may hurt performance if its"
                + " parent RDDs are also uncached.")
            }
        
            // Unpersist cached data
            if (data.getStorageLevel != StorageLevel.NONE) {
              data.unpersist(false)
            }
            //返回训练模型
            createModel(weights, intercept)
          }
        }
        
      • 梯度下降求解权重

      • org/apache/spark/mllib/optimization/GradientDescent.scala

      • optimizer.optimize->GradientDescent.optimize->GradientDescent.runMiniBatchSGD

      •   def runMiniBatchSGD(
              data: RDD[(Double, Vector)],
              gradient: Gradient,
              updater: Updater,
              stepSize: Double,
              numIterations: Int,
              regParam: Double,
              miniBatchFraction: Double,
              initialWeights: Vector,
              convergenceTol: Double): (Vector, Array[Double]) = {
            //历史迭代误差数组
            val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
            // Record previous weight and current one to calculate solution vector difference
            
            var previousWeights: Option[Vector] = None
            var currentWeights: Option[Vector] = None
            
            //训练样本数m
            val numExamples = data.count()
        
            // if no data, return initial weights to avoid NaNs
            if (numExamples == 0) {
              logWarning("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
              return (initialWeights, stochasticLossHistory.toArray)
            }
        
            if (numExamples * miniBatchFraction < 1) {
              logWarning("The miniBatchFraction is too small")
            }
        
            // 初始化权重Initialize weights as a column vector
            var weights = Vectors.dense(initialWeights.toArray)
            val n = weights.size
        
            /**
             * For the first iteration, the regVal will be initialized as sum of weight squares
             * if it's L2 updater; for L1 updater, the same logic is followed.
             */
            //这里的compute进行了第一次迭代,正则化值初始化为权重的加权平方和
            var regVal = updater.compute(
              weights, Vectors.zeros(weights.size), 0, 1, regParam)._2
        
            var converged = false // indicates whether converged based on convergenceTol
            var i = 1
            //权重迭代计算
            while (!converged && i <= numIterations) {
              //广播权重变量,每个Executer都接受到当前权重
              val bcWeights = data.context.broadcast(weights)
              // Sample a subset (fraction miniBatchFraction) of the total data
              // 随机抽样样本,对抽样的样本子集,采用treeAggregate的RDD方法,进行聚合计算
              //计算每个样本的权重向量,误差值,然后对所有样本权重向量和误差值进行累加,这是一次map-reduce
              // compute and sum up the subgradients on this subset (this is one map-reduce)
              val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
                .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(//初始值
                  //将v合并到c
                  seqOp = (c, v) => {
                    // c: (grad, loss, count), v: (label, features)
                    //返回的是loss
                    val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
                    (c._1, c._2 + l, c._3 + 1)
                  },
                  //合并两个c
                  combOp = (c1, c2) => {
                    // c: (grad, loss, count)
                    //
                    (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
                  })
              bcWeights.destroy(blocking = false)
        
              if (miniBatchSize > 0) {
                /**
                 * lossSum is computed using the weights from the previous iteration
                 * and regVal is the regularization value computed in the previous iteration as well.
                 */
                //保存误差,更新权重
                stochasticLossHistory += lossSum / miniBatchSize + regVal
                val update = updater.compute(
                  weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
                  stepSize, i, regParam)
                weights = update._1
                regVal = update._2
        
                previousWeights = currentWeights
                currentWeights = Some(weights)
                if (previousWeights != None && currentWeights != None) {
                  converged = isConverged(previousWeights.get,
                    currentWeights.get, convergenceTol)
                }
              } else {
                logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
              }
              i += 1
            }
        
            logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
              stochasticLossHistory.takeRight(10).mkString(", ")))
            //返回模型和误差数组
            (weights, stochasticLossHistory.toArray)
        
          }
        
          //判断是否收敛
          private def isConverged(
              previousWeights: Vector,
              currentWeights: Vector,
              convergenceTol: Double): Boolean = {
            // To compare with convergence tolerance.
            val previousBDV = previousWeights.asBreeze.toDenseVector
            val currentBDV = currentWeights.asBreeze.toDenseVector
        
            // This represents the difference of updated weights in the iteration.
            val solutionVecDiff: Double = norm(previousBDV - currentBDV)
        
            solutionVecDiff < convergenceTol * Math.max(norm(currentBDV), 1.0)
          }
        
        }
        
      • 梯度计算

      • gradient.compute计算每个样本的梯度和误差,线性回归中使用LeastSquaresGradient。最小二乘

      • org/apache/spark/mllib/optimization/Gradient.scala

      • /**
         * :: DeveloperApi ::
         * Compute gradient and loss for a Least-squared loss function, as used in linear regression.
         * This is correct for the averaged least squares loss function (mean squared error)
         *              L = 1/2n ||A weights-y||^2
         * See also the documentation for the precise formulation.
         */
        @DeveloperApi
        class LeastSquaresGradient extends Gradient {
          override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
            //y-h0(x)
            val diff = dot(data, weights) - label
            
            val loss = diff * diff / 2.0
            val gradient = data.copy
            scal(diff, gradient) //梯度值:x*(y-h0(x))
            (gradient, loss)
          }
        
          override def compute(
              data: Vector,
              label: Double,
              weights: Vector,
              cumGradient: Vector): Double = {
            val diff = dot(data, weights) - label //y-h0(x)
            axpy(diff, data, cumGradient) // y= x* (y-h0(x))+cumGradient
            diff * diff / 2.0
          }
        }
        
      • 权重更新

    /**
     * :: DeveloperApi ::
     * A simple updater for gradient descent *without* any regularization.
     * Uses a step-size decreasing with the square root of the number of iterations.
     */
    @DeveloperApi
    class SimpleUpdater extends Updater {
      override def compute(
          weightsOld: Vector,
          gradient: Vector,
          stepSize: Double,
          iter: Int,
          regParam: Double): (Vector, Double) = {
        //当前迭代次数的平方根的倒数作为趋近的因子α
        val thisIterStepSize = stepSize / math.sqrt(iter)
        val brzWeights: BV[Double] = weightsOld.asBreeze.toDenseVector
        brzAxpy(-thisIterStepSize, gradient.asBreeze, brzWeights)
    
        (Vectors.fromBreeze(brzWeights), 0)
      }
    }
    

    相关文章

      网友评论

          本文标题:Spark MLlib LinearRegression线性回归

          本文链接:https://www.haomeiwen.com/subject/johsfftx.html