美文网首页
Spark Mllib中聚类_Kmeans

Spark Mllib中聚类_Kmeans

作者: LZhan | 来源:发表于2019-08-04 11:19 被阅读0次

1、Kmeans概述
属于无监督学习,对诸多样本根据特征向量自动划分若干类别。
基本思想就是初始随机给定K个簇中心,按照最邻近原则将待分类样本点分到各个簇。然后按平均法重新计算各个簇的质心,从而确定新的簇心。一直迭代,直到簇心的移动距离小于给定的某个值。

2、Kmeans算法原理
<1> 设置需要聚类的类别个数K,以及n个训练样本,随机初始化K个聚类中心
<2> 计算每个样本与聚类中心的距离,样本选择最近的聚类中心作为其类别
<3> 计算每个聚类中所有点的坐标平均值,并将这个平均值作为新的聚类中心
<4> 迭代执行上一步,知道算法收敛为止

3、示例图:


image.png

4、选择初始中心点的基本思想
初始的聚类中心之间的相互距离要尽量远,初始化过程如下:
<1> 从输入的数据点集合中随机选择一个点作为第一个聚类中心
<2> 对于数据集中的每个点x,计算它与最近聚类中心(指已选择的聚类中心)的距离D(x)
<3> 选择一个新的数据点作为新的聚类中心,选择的原则是:D(x)较大的点,被选择作为聚类中心的概率较高

image.png

<4> 重复2和3,直到k个聚类中心被选出
<5> 利用这k个初始的聚类中心来运行标准的K-means算法

5、源码介绍
<1> KMeansModel类

image.png
该类的底层实现还是基于一个MLlibKMeansModel,这个类是mllib包下的。

transform方法:

  @Since("2.0.0")
  override def transform(dataset: Dataset[_]): DataFrame = {
    transformSchema(dataset.schema, logging = true)
    val predictUDF = udf((vector: Vector) => predict(vector))
    dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
  }

其中predict方法,实际调用的parentModel(那个Mllib包下的)的predict方法

computeCost方法,也是调用的parentModel的computeCost方法

  /**
   * Return the K-means cost (sum of squared distances of points to their nearest center) for this
   * model on the given data.
   */
  // TODO: Replace the temp fix when we have proper evaluators defined for clustering.
  @Since("2.0.0")
  def computeCost(dataset: Dataset[_]): Double = {
    SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
    val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
      case Row(point: Vector) => OldVectors.fromML(point)
    }
    parentModel.computeCost(data)
  }

<2> Kmeans的fit方法,也是训练出parentModel,基于mllib

<3> spark mllib实现聚类算法:
1)首先随机生成聚类中心点(支持随机选择样本点当做初始中心点,还支持kmeans方法选择最优的聚类中心点)
2)迭代计算样本的中心点(分布式实现是先计算每个样本属于哪个中心点,之后采用聚合函数统计属于每个中心点的样本值之和以及样本数量,最后求得最新中心点,并判断中心点是否改变)


image.png

mllib计算样本属于哪一个中心点,计算距离的优化:


image.png

L2范数:向量所有元素的平方和的开平方

image.png

<4>
该表格中的都是基于mllib包下的:

image.png

6、源码解析
<1> mllib包下 Kmeans伴生对象的train方法:

/**
   * Trains a k-means model using the given set of parameters.
   *
   * @param data Training points as an `RDD` of `Vector` types.
   * @param k Number of clusters to create.
   * @param maxIterations Maximum number of iterations allowed.
   * @param initializationMode The initialization algorithm. This can either be "random" or
   *                           "k-means||". (default: "k-means||")
   * @param seed Random seed for cluster initialization. Default is to generate seed based
   *             on system time.
   */
  @Since("2.1.0")
  def train(
      data: RDD[Vector],
      k: Int,
      maxIterations: Int,
      initializationMode: String,
      seed: Long): KMeansModel = {
    new KMeans().setK(k)
      .setMaxIterations(maxIterations)
      .setInitializationMode(initializationMode)
      .setSeed(seed)
      .run(data)
  }

参数说明:
data:数据样本,格式为RDD[Vector]
k:聚类数量
maxIterations:最大迭代次数
runs:并行计算数,返回最佳模型
initializationMode:初始化中心,支持"random"或者"k-means||"
seed:初始化时的随机种子

<2> mllib包下Kmeans类的参数及构造方法


image.png

其中epsilon是中心距离阈值(所有中心的移动距离都小于该阈值,将停止迭代运行),
initializationSteps是在initializationMode设置为"k-means||"时,寻找初始中心点的迭代次数

<3>mllib包下Kmeans类的run方法
训练数据应该要缓存起来,这是一个迭代训练的过程。

  /**
   * Train a K-means model on the given set of points; `data` should be cached for high
   * performance, because this is an iterative algorithm.
   */
  @Since("0.8.0")
  def run(data: RDD[Vector]): KMeansModel = {
    run(data, None)
  }

  private[spark] def run(
      data: RDD[Vector],
      instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {

    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("The input data is not directly cached, which may hurt performance if its"
        + " parent RDDs are also uncached.")
    }

    // Compute squared norms and cache them.
    val norms = data.map(Vectors.norm(_, 2.0))
    norms.persist()
    val zippedData = data.zip(norms).map { case (v, norm) =>
      new VectorWithNorm(v, norm)
    }
    val model = runAlgorithm(zippedData, instr)
    norms.unpersist()

    // Warn at the end of the run as well, for increased visibility.
    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("The input data was not directly cached, which may hurt performance if its"
        + " parent RDDs are also uncached.")
    }
    model
  }

过程:计算RDD[Vector]中每个Vector的L2范数,并且缓存;
zippedData的格式为(向量,向量的L2范数);
runAlgorithm运行Kmeans算法

<4> runAlgorithm 方法 (Kmeans算法的实现)
分析在代码注释中

/**
   * Implementation of K-Means algorithm.
   */
  private def runAlgorithm(
      data: RDD[VectorWithNorm],
      instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {

    val sc = data.sparkContext

    val initStartTime = System.nanoTime()
    
    // 初始化中心,支持“random”或者“k-means||”
    val centers = initialModel match {
      case Some(kMeansCenters) =>
        kMeansCenters.clusterCenters.map(new VectorWithNorm(_))
      case None =>
        if (initializationMode == KMeans.RANDOM) {
          initRandom(data)
        } else {
          initKMeansParallel(data)
        }
    }
    val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
    logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")

    var converged = false
    var cost = 0.0
    var iteration = 0

    val iterationStartTime = System.nanoTime()

    instr.foreach(_.logNumFeatures(centers.head.vector.size))

    // Execute iterations of Lloyd's algorithm until converged
    // Kmeans迭代执行,计算每个样本属于哪个中心点,中心点累加的样本值以及计数 
    // 然后根据中心点的所有样本数据进行中心点的更新,并且比较更新前的数值,根据两者距离判断是否完成
    // 其中runs代表并行度
    while (iteration < maxIterations && !converged) {
    //迭代次数小于最大迭代次数,并行计算的中心点还没有收敛
     //损失值累加器
      val costAccum = sc.doubleAccumulator
      //广播中心点
      val bcCenters = sc.broadcast(centers)

      // Find the new centers
      val newCenters = data.mapPartitions { points =>
       //当前聚类中心
        val thisCenters = bcCenters.value
       //中心点的维度 
       val dims = thisCenters.head.vector.size
        //每一个聚类下样本向量和 
        val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
        //每一个聚类下样本点的数量
        val counts = Array.fill(thisCenters.length)(0L)

        points.foreach { point =>
         //通过当前的聚类中心点,找出最近的聚类中心点
          val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
         //损失累加值 
         costAccum.add(cost)
         //指定是哪一个聚类中心点的sum
          val sum = sums(bestCenter)
          //将该点加到对应最佳聚类中心点的sum,sum=sum+point
          axpy(1.0, point.vector, sum)
          counts(bestCenter) += 1
        }

        counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
      }
//将不同分区中属于相同中心点的样本进行聚合
.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
        axpy(1.0, sum2, sum1)
        (sum1, count1 + count2)
      }.mapValues { case (sum, count) =>
        //sum=sum/count
        scal(1.0 / count, sum)
        new VectorWithNorm(sum)
      }.collectAsMap()

      bcCenters.destroy(blocking = false)

      // Update the cluster centers and costs
      converged = true
      newCenters.foreach { case (j, newCenter) =>
        if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { //计算中心点之间的移动距离
          converged = false  //距离大于,则说明中心点位置改变
        }
        //更新中心点
        centers(j) = newCenter
      }

      cost = costAccum.value
      iteration += 1
    }

    val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
    logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.")

    if (iteration == maxIterations) {
      logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
    } else {
      logInfo(s"KMeans converged in $iteration iterations.")
    }

    logInfo(s"The cost is $cost.")

    new KMeansModel(centers.map(_.vector))
  }

7、demo
ml 参数:

image.png

其中,setTol是设置收敛阈值,对应mllib.KMeans中的epsilon,当所有聚类中心点的移动距离小于该值,说明不需要迭代了

相关文章

网友评论

      本文标题:Spark Mllib中聚类_Kmeans

      本文链接:https://www.haomeiwen.com/subject/gumjdctx.html