1、Kmeans概述
属于无监督学习,对诸多样本根据特征向量自动划分若干类别。
基本思想就是初始随机给定K个簇中心,按照最邻近原则将待分类样本点分到各个簇。然后按平均法重新计算各个簇的质心,从而确定新的簇心。一直迭代,直到簇心的移动距离小于给定的某个值。
2、Kmeans算法原理
<1> 设置需要聚类的类别个数K,以及n个训练样本,随机初始化K个聚类中心
<2> 计算每个样本与聚类中心的距离,样本选择最近的聚类中心作为其类别
<3> 计算每个聚类中所有点的坐标平均值,并将这个平均值作为新的聚类中心
<4> 迭代执行上一步,知道算法收敛为止
3、示例图:
![](https://img.haomeiwen.com/i7803976/44ae5157b716b11d.png)
4、选择初始中心点的基本思想
初始的聚类中心之间的相互距离要尽量远,初始化过程如下:
<1> 从输入的数据点集合中随机选择一个点作为第一个聚类中心
<2> 对于数据集中的每个点x,计算它与最近聚类中心(指已选择的聚类中心)的距离D(x)
<3> 选择一个新的数据点作为新的聚类中心,选择的原则是:D(x)较大的点,被选择作为聚类中心的概率较高
![](https://img.haomeiwen.com/i7803976/5b372dc5ab07fd69.png)
<4> 重复2和3,直到k个聚类中心被选出
<5> 利用这k个初始的聚类中心来运行标准的K-means算法
5、源码介绍
<1> KMeansModel类
![](https://img.haomeiwen.com/i7803976/63ed1d426be3e894.png)
该类的底层实现还是基于一个MLlibKMeansModel,这个类是mllib包下的。
transform方法:
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
val predictUDF = udf((vector: Vector) => predict(vector))
dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
}
其中predict方法,实际调用的parentModel(那个Mllib包下的)的predict方法
computeCost方法,也是调用的parentModel的computeCost方法
/**
* Return the K-means cost (sum of squared distances of points to their nearest center) for this
* model on the given data.
*/
// TODO: Replace the temp fix when we have proper evaluators defined for clustering.
@Since("2.0.0")
def computeCost(dataset: Dataset[_]): Double = {
SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
}
parentModel.computeCost(data)
}
<2> Kmeans的fit方法,也是训练出parentModel,基于mllib
<3> spark mllib实现聚类算法:
1)首先随机生成聚类中心点(支持随机选择样本点当做初始中心点,还支持kmeans方法选择最优的聚类中心点)
2)迭代计算样本的中心点(分布式实现是先计算每个样本属于哪个中心点,之后采用聚合函数统计属于每个中心点的样本值之和以及样本数量,最后求得最新中心点,并判断中心点是否改变)
![](https://img.haomeiwen.com/i7803976/70e9642086602e05.png)
mllib计算样本属于哪一个中心点,计算距离的优化:
![](https://img.haomeiwen.com/i7803976/7d7b6cff124a06a0.png)
L2范数:向量所有元素的平方和的开平方
![](https://img.haomeiwen.com/i7803976/82a94470f859ede2.png)
<4>
该表格中的都是基于mllib包下的:
![](https://img.haomeiwen.com/i7803976/8cc496153704308a.png)
6、源码解析
<1> mllib包下 Kmeans伴生对象的train方法:
/**
* Trains a k-means model using the given set of parameters.
*
* @param data Training points as an `RDD` of `Vector` types.
* @param k Number of clusters to create.
* @param maxIterations Maximum number of iterations allowed.
* @param initializationMode The initialization algorithm. This can either be "random" or
* "k-means||". (default: "k-means||")
* @param seed Random seed for cluster initialization. Default is to generate seed based
* on system time.
*/
@Since("2.1.0")
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int,
initializationMode: String,
seed: Long): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setInitializationMode(initializationMode)
.setSeed(seed)
.run(data)
}
参数说明:
data:数据样本,格式为RDD[Vector]
k:聚类数量
maxIterations:最大迭代次数
runs:并行计算数,返回最佳模型
initializationMode:初始化中心,支持"random"或者"k-means||"
seed:初始化时的随机种子
<2> mllib包下Kmeans类的参数及构造方法
![](https://img.haomeiwen.com/i7803976/70a2bf8c44e528dc.png)
其中epsilon是中心距离阈值(所有中心的移动距离都小于该阈值,将停止迭代运行),
initializationSteps是在initializationMode设置为"k-means||"时,寻找初始中心点的迭代次数
<3>mllib包下Kmeans类的run方法
训练数据应该要缓存起来,这是一个迭代训练的过程。
/**
* Train a K-means model on the given set of points; `data` should be cached for high
* performance, because this is an iterative algorithm.
*/
@Since("0.8.0")
def run(data: RDD[Vector]): KMeansModel = {
run(data, None)
}
private[spark] def run(
data: RDD[Vector],
instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {
if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data is not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
}
// Compute squared norms and cache them.
val norms = data.map(Vectors.norm(_, 2.0))
norms.persist()
val zippedData = data.zip(norms).map { case (v, norm) =>
new VectorWithNorm(v, norm)
}
val model = runAlgorithm(zippedData, instr)
norms.unpersist()
// Warn at the end of the run as well, for increased visibility.
if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data was not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
}
model
}
过程:计算RDD[Vector]中每个Vector的L2范数,并且缓存;
zippedData的格式为(向量,向量的L2范数);
runAlgorithm运行Kmeans算法
<4> runAlgorithm 方法 (Kmeans算法的实现)
分析在代码注释中
/**
* Implementation of K-Means algorithm.
*/
private def runAlgorithm(
data: RDD[VectorWithNorm],
instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {
val sc = data.sparkContext
val initStartTime = System.nanoTime()
// 初始化中心,支持“random”或者“k-means||”
val centers = initialModel match {
case Some(kMeansCenters) =>
kMeansCenters.clusterCenters.map(new VectorWithNorm(_))
case None =>
if (initializationMode == KMeans.RANDOM) {
initRandom(data)
} else {
initKMeansParallel(data)
}
}
val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")
var converged = false
var cost = 0.0
var iteration = 0
val iterationStartTime = System.nanoTime()
instr.foreach(_.logNumFeatures(centers.head.vector.size))
// Execute iterations of Lloyd's algorithm until converged
// Kmeans迭代执行,计算每个样本属于哪个中心点,中心点累加的样本值以及计数
// 然后根据中心点的所有样本数据进行中心点的更新,并且比较更新前的数值,根据两者距离判断是否完成
// 其中runs代表并行度
while (iteration < maxIterations && !converged) {
//迭代次数小于最大迭代次数,并行计算的中心点还没有收敛
//损失值累加器
val costAccum = sc.doubleAccumulator
//广播中心点
val bcCenters = sc.broadcast(centers)
// Find the new centers
val newCenters = data.mapPartitions { points =>
//当前聚类中心
val thisCenters = bcCenters.value
//中心点的维度
val dims = thisCenters.head.vector.size
//每一个聚类下样本向量和
val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
//每一个聚类下样本点的数量
val counts = Array.fill(thisCenters.length)(0L)
points.foreach { point =>
//通过当前的聚类中心点,找出最近的聚类中心点
val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
//损失累加值
costAccum.add(cost)
//指定是哪一个聚类中心点的sum
val sum = sums(bestCenter)
//将该点加到对应最佳聚类中心点的sum,sum=sum+point
axpy(1.0, point.vector, sum)
counts(bestCenter) += 1
}
counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
}
//将不同分区中属于相同中心点的样本进行聚合
.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
axpy(1.0, sum2, sum1)
(sum1, count1 + count2)
}.mapValues { case (sum, count) =>
//sum=sum/count
scal(1.0 / count, sum)
new VectorWithNorm(sum)
}.collectAsMap()
bcCenters.destroy(blocking = false)
// Update the cluster centers and costs
converged = true
newCenters.foreach { case (j, newCenter) =>
if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { //计算中心点之间的移动距离
converged = false //距离大于,则说明中心点位置改变
}
//更新中心点
centers(j) = newCenter
}
cost = costAccum.value
iteration += 1
}
val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.")
if (iteration == maxIterations) {
logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
} else {
logInfo(s"KMeans converged in $iteration iterations.")
}
logInfo(s"The cost is $cost.")
new KMeansModel(centers.map(_.vector))
}
7、demo
ml 参数:
![](https://img.haomeiwen.com/i7803976/779d7af72679c8dc.png)
其中,setTol是设置收敛阈值,对应mllib.KMeans中的epsilon,当所有聚类中心点的移动距离小于该值,说明不需要迭代了
网友评论