美文网首页
在Spark上运行KMeans聚类

在Spark上运行KMeans聚类

作者: Threathunter | 来源:发表于2021-02-03 19:34 被阅读0次

来源:https://rsandstroem.github.io/sparkkmeans.html

填充空值的相关文章:

https://blog.csdn.net/sunjinshengli/article/details/90766113

https://zhuanlan.zhihu.com/p/143933094

https://zhuanlan.zhihu.com/p/97461977

https://dblab.xmu.edu.cn/blog/1779-2/

在最近的一个项目中,我面临着在大约100tb的数据上运行机器学习的任务。这个数据量已经超过了我的工作站上的容量,所以我使用PySpark API将代码从在scikit-learn上运行转换到Apache Spark。这让我能够使用内存分布式计算来处理这些数据。

这篇博文是基于这个最近的项目,在Spark上介绍机器学习的。

一、安装

本节中的说明是指在单个机器上的本地安装。虽然这在生产环境中没有多大意义,但对于我在这篇博文中所做的演示来说,这是很好的尝试。

首先访问http://spark.apache.org/downloads.html并下载您所选择的预构建Hadoop版本。对于我们今天将要演示的KMeans算法,Spark 1.6.x和2.x之间有一些变化,所以如果选择使用旧版本,这个演示中的所有行都不能工作。

下载并解压缩后,就可以将它移动到/opt,并设置到新版本的符号链接(假设您使用的是linux操作系统)。

sudo mv spark-2.1.1-bin-hadoop2.7 /opt/spark-2.1.1

sudo ln -s /opt/spark-2.1.1 /opt/spark

在.bashrc中添加以下代码行,您将使pyspark类对您的python安装可用,并且您将能够通过从命令行输入pyspark来启动一个启用了pyspark的新jupyter笔记本。

# Spark

export SPARK_HOME=/opt/spark

export PATH=$SPARK_HOME/bin:$PATH

export PYSPARK_DRIVER_PYTHON=jupyter

export PYSPARK_DRIVER_PYTHON_OPTS='notebook'

# Add the PySpark classes to the Python path:

export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH

二、开始

现在应该有一个新的jupyter笔记本在你面前。首先,我们导入一些稍后需要的东西。

2.1.1+hadoop2.7

三、生成一些输入数据

我们也想要一些数据来处理。一个简单的方法是使用scikit-learn的方法在三维空间中生成10个 blob。

为了使它更真实,我们还添加了一个字符串'id'列,在现实生活中,它通常是客户id或物联网设备的IP地址,或类似的。

最后,我们将数据集写入为CSV文件,尽管CSV文件的格式很糟糕,但这是我每天都会遇到的文件。

这是我们刚刚生成的数据的可视化,其中每个数据点的真实类型用一种唯一的颜色表示。

四、SQL上下文

默认情况下,SparkContext使用变量名sc创建,并以启动启用spark的笔记本的方式创建。但是,我们确实希望创建一个SQLContext。根据Spark文档https://spark.apache.org/docs/latest/sql-programming-guide.html:

Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了更多关于数据结构和正在执行的计算的信息。”

这意味着我们可以使用Spark dataframes,它类似于Pandas dataframes,是一个组织成命名列的数据集。

#sc = SparkContext(appName="PythonKMeansExample")

# exist by default

sqlContext=SQLContext(sc)

将数据从CSV读入Spark dataframes

数据包含四列,'id', 'x', 'y', 'z',而后三列是我们希望在我们的聚类模型中使用的特征。

FEATURES_COL=['x','y','z']

path='input.csv'

如果你使用的是最近的Spark版本,读取CSV文件可以在一行中完成:

df=sqlContext.read.csv(path,header=True)

# requires spark 2.0

df.show()

如果使用较早的版本,可能会得到类似这样的结果:

lines=sc.textFile(path)

data=lines.map(lambdaline:line.split(","))

data.take(2)

[[u'id', u'x', u'y', u'z'],

[u'row0', u'-6.07769967697', u'-2.90961030573', u'-1.51817288652']]

注意,第一行包含了表头。我们暂时忽略这一点,将数据转换为Spark dataframes。

df=data.toDF(['id','x','y','z'])

print(df)

df.show()

五、将所有数据列转换为float

数据帧现在由四列字符串组成。机器学习模型需要数据为数字,所以我们必须将特征转换为浮点数。

将所有数据转换为float可以在一行中完成。然而,这将使'id'列填充null,否则我们将不得不省略它:

df_feat=df.select(*(df[c].cast("float").alias(c)forcindf.columns[1:]))

df_feat.show()

因为我们知道哪些列需要转换,所以通过逐个转换,我们得到了一个更清晰的结果。

六、删除空值

由于在数据帧的第一行额外包含了标题行,该行现在被null值填充。让我们删除这一行,以及任何其他可能包含空值的行。

df=df.na.drop()

df.show()

七、创建要在集群中使用的features列

Spark的KMeans实现与例如scikit-learn的版本有一点不同。我们需要将所有的特征存储为一个浮点数组,并将这个数组存储为一个名为“features”的列。因为我们不再需要原始的列,所以我们用select语句将它们过滤掉。

八、k的最优选择

与更先进的聚类算法相比,KMeans的一个缺点是算法必须被告知它应该尝试寻找多少个聚类k。为了优化k,我们对k的不同选择聚集一部分数据,并在代价函数中寻找一个“弯头”(elbow)。

看起来在k=10之后收益很少,所以我们在处理完整数据集时坚持这个选择。

九、训练机器学习模型

现在我们已经准备好在完整数据集上训练模型了。在运行时,您可能希望打开http://localhost:4040,其中显示有关作业的详细信息。

一旦训练集合,我们就可以打印出这十个集群的质心。

九、将聚类分配给事件

他们还有一件重要的事情要做;将各个行分配给最近的簇形质心。这可以通过transform方法实现,它将“预测”列添加到数据帧中。预测值为0 ~ k之间的整数,但与输入的y值没有相关性。

[Row(id=u'row0', prediction=2), Row(id=u'row1', prediction=0), Row(id=u'row2', prediction=8)]

从collect方法返回的行中,使用SQL上下文创建一个新的数据框架非常简单。

十、将预测与原始数据连接起来

原始dataframe中的x、y、z可以通过'id'列上的两个dataframe来添加到新创建的dataframe中。

十一、转换为Pandas dataframe

通常在这一点上,我需要对数据做一些其他的事情,这并不需要Spark,所以让我们将Spark数据帧转换为一个很好的旧的Pandas数据帧,以便进一步处理。

十二、可视化的结果

最后一步是直观地检查输出,看看KMeans模型是否做得很好。与第一个图相比,很明显大多数星团确实被发现了,但是左边的蓝色星团应该被分成两个,橙色+棕色星团应该只有一个星团。

如果使用了Spark,我们现在可以关闭Spark上下文:

sc.stop()

结论

这篇博文演示了如何在本地机器上安装Spark,并使用Spark运行一个简单的集群算法。

相关文章

网友评论

      本文标题:在Spark上运行KMeans聚类

      本文链接:https://www.haomeiwen.com/subject/cqeptltx.html