美文网首页Spark ML
使用Spark ML进行数据分析

使用Spark ML进行数据分析

作者: 涛O_O | 来源:发表于2018-12-27 22:05 被阅读141次

    Spark版本:2.4.0
    语言:Scala
    任务:分类

    这里对数据的处理步骤如下:

    1. 载入数据
    2. 归一化
    3. PCA降维
    4. 划分训练/测试集
    5. 线性SVM分类
    6. 验证精度
    7. 输出cvs格式的结果

    前言

    从Spark 2.0开始,Spark机器学习API是基于DataFrame的spark.ml。而之前的基于RDD的API spark.mllib已进入维护模式。
    也就是说,Spark ML是Spark MLlib的一种新的API,它主要有以下几个优点:

    • 面向DataFrame,在RDD基础上进一步封装,提供更强大更方便的API
    • Pipeline功能,便于实现复杂的机器学习模型
    • 性能提升

    基于Pipeline的Spark ML中的几个概念:

    • DataFrame:从Spark SQL 的引用的概念,表示一个数据集,它可以容纳多种数据类型。例如可以存储文本,特征向量,标签和预测值等
    • Transformer:是可以将一个DataFrame变换成另一个DataFrame的算法。例如,一个训练好的模型是一个Transformer,通过transform方法,将原始DataFrame转化为一个包含预测值的DataFrame
    • Estimator:是一个算法,接受一个DataFrame,产生一个Transformer。例如,一个学习算法(如PCA,SVM)是一个Estimator,通过fit方法,训练DataFrame并产生模型Transformer
    • Pipeline: Pipeline将多个Transformers和Estimators连接起来组合成一个机器学习工作流程
    • Parameter:用于对Transformers和Estimators指定参数的统一接口

    本次实验使用的是Spark ML的API

    首先要创建SparkSession

    // 创建SparkSession
    val spark = SparkSession
      .builder
      .appName("LinearSVCExample")
      .master("local")
      .getOrCreate()
    

    数据处理步骤

    1 载入数据

    数据载入的方式有多种,这里使用libsvm格式的数据作为数据源,libsvm格式常被用来存储稀疏的矩阵数据,它每一行的格式如下:

    label index1:value1 index2:value2 ...
    

    第一个值是标签,后面是由“列号:值”组成键值对,只需要记录非0项即可。

    数据加载使用load方法完成:

    // 加载训练数据,生成DataFrame
    val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
    

    2 归一化

    作为数据预处理的第一步,需要对原始数据做归一化处理,即把原始数据的每一维减去其平均值,再除以其标准差,使得数据总体分布为以0为中心,且标准差为1。

    // 归一化
    val scaler = new StandardScaler()
       .setInputCol("features")
       .setOutputCol("scaledFeatures")
       .setWithMean(true)
       .setWithStd(true)
       .fit(data)
    
    val scaleddata = scaler.transform(data).select("label", "scaledFeatures").toDF("label","features")
    

    3 PCA降维

    有时数据的维数可能很大,直接进行分类不仅计算量很大,而且对数据量的要求也很高,常常会出现过拟合。因此需要进行降维,常用的是主成分分析(PCA)算法。

    // 创建PCA模型,生成Transformer
    val pca = new PCA()
      .setInputCol("features")
      .setOutputCol("pcaFeatures")
      .setK(5)
      .fit(scaleddata)
    
    //  transform数据,生成主成分特征
    val pcaResult = pca.transform(scaleddata).select("label","pcaFeatures").toDF("label","features")
    

    4 划分训练/测试集

    经过降维的数据就可以拿来训练分类器了,但是在此之前要将数据划分为训练集和测试集,分类器只能在训练集上进行训练,在测试集上验证其分类精度。Spark提供了很方便的接口,按给定的比例随机划分训练/测试集。

    // 将经过主成分分析的数据,按比例划分为训练数据和测试数据
    val Array(trainingData, testData) = pcaResult.randomSplit(Array(0.7, 0.3), seed = 20)
    

    5 线性SVM分类

    这一步构建线性SVM模型,设置最大迭代次数和正则化项的系数,使用训练集进行训练。

    // 创建SVC分类器(Estimator)
    val lsvc = new LinearSVC()
      .setMaxIter(10)
      .setRegParam(0.1)
    
    // 训练分类器,生成模型(Transformer)
    val lsvcModel = lsvc.fit(trainingData)
    

    6 验证精度

    将训练好的分类器作用于测试集上,获得分类结果。

    分类结果的好坏有很多种衡量的方法,如查准率、查全率等,这里我们使用最简单的一种衡量标准——精度,即正确分类的样本数占总样本数的比值。

    // 用训练好的模型,验证测试数据
    val res = lsvcModel.transform(testData).select("prediction","label")
    
    // 计算精度
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val accuracy = evaluator.evaluate(res)
    
    println(s"Accuracy = ${accuracy}")
    

    7 输出cvs格式的结果

    Spark的DataFrame类型支持导出多种格式,这里以常用的csv格式为例。

    这里输出的目的是为了使用Python进行可视化,在降维后进行,可以直观的看出降维后的数据是否明显可分。

    使用VectorAssembler,将标签与特征合并为一列,再进行输出。

    (这里是将合并后的列转换为String再输出的,因此输出的csv文件是带有引号和括号的,至于为什么要这样输出,请看第二部分)

    // 将标签与主成分合成为一列
    val assembler = new VectorAssembler()
      .setInputCols(Array("label","features"))
      .setOutputCol("assemble")
    val output = assembler.transform(pcaResult)
    
    // 输出csv格式的标签和主成分,便于可视化
    val ass = output.select(output("assemble").cast("string"))
    ass.write.mode("overwrite").csv("output.csv")
    

    当然也可以用同样的方法输出训练/预测的结果,这里就不再详细介绍。

    遇到的问题

    完成这个简单的分类实验,花了我两天多的时间,从配置环境到熟悉API,再到遇见各种奇怪的问题……这里我都把他们记录下来,供以后参考。

    1 配置环境

    起初,我想通过在本机编写代码,然后访问安装在虚拟机中的Spark节点(单节点)这种方式进行实验的(不是提交jar包然后执行spark-submit),也就在是创建SparkSession时,指定虚拟机中的Spark:

    val spark = SparkSession
      .builder
      .appName("LinearSVCExample")
      .master("spark://192.168.1.128:7077") // 虚拟机IP
      .getOrCreate()
    

    然而,这样并没有成功。遇到的问题有:

    • 拒绝连接
    • Spark的worker里可以查看到提交的任务,但是一直处于等待状态,没有响应。并且提示:
      WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources(实际上,内存和CPU是够的)
    • 报错RuntimeException: java.io.EOFException......

    在尝试过各种方案都没有解决问题之后,我放弃了,最后还是在本机中安装Spark,在local模式下运行。(如果有同学成功实现上面的访问方法,欢迎留言告诉我~

    至于如何在本机(Windows)安装Spark,百度搜索即可

    2 导出CSV格式的数据

    将DataFrame导出为cvs格式的时候,遇到了这个问题:
    java.lang.UnsupportedOperationException: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type.

    而我要导出的DataFrame只是一个多行数组而已啊:

    image.png

    根据StackOverflow上面的提问,Spark的csv导出不支持复杂结构,array<double>都不行。

    然后有人给了一种办法,把数组转化为String,就可以导出了。

    但是导出的结果是这样的:

    image.png

    需要进一步处理。

    所以还不如手动实现导出csv文件,或者你有更好的办法,欢迎留言告诉我,非常感谢~

    3 PCA维数限制

    当我想跑一个10万维度的数据时,程序运行到PCA报错:
    java.lang.IllegalArgumentException: Argument with more than 65535 cols: 109600

    原来,Spark ML的PCA不支持超过65535维的数据。参见源码

    4 SVM核

    翻阅了Spark ML文档,只找到Linear Support Vector Machine,即线性核的支持向量机。对于高斯核和其他非线性的核,Spark ML貌似还没有实现。

    image.png

    5 withColumn操作

    起初我认为对数据进行降维前,需要把DataFrame中的标签label与特征feature分开,然后对feature进行降维,再使用withColumn方法,把label与降维后的feature组合成新的DataFrame。

    发现这样既不可行也没有必要。

    首先,withColumn只能添加当前DataFrame的数据(对DataFrame某一列进行一些操作,再添加到这个DataFrame本身),不能把来自于不同DataFrame的Column添加到当前DataFrame中。

    其次,PCA降维时,只需指定InputCoulum作为特征列,指定OutputColumn作为输出列,其他列的存在并不影响PCA的执行,PCA也不会改变它们,在新生成的DataFrame中依然会保留原来所有Column,并且添加上降维后的数据Column,后面再使用select方法选择出所需的Column即可。

    完整代码(Pipeline版)

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.feature._
    import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
    import org.apache.spark.ml.feature.PCA
    import org.apache.spark.ml.classification.LinearSVC
    import org.apache.spark.sql.SparkSession
    
    object Hello {
      def main(args: Array[String]) {
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.8.3")
        //  屏蔽日志
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    
        // 创建sparkSession
        val spark = SparkSession
          .builder
          .appName("LinearSVCExample")
          .master("local")
          .getOrCreate()
    
        // 加载训练数据,生成DataFrame
        val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
    
        println(data.count())
    
        // 归一化
        val scaler = new StandardScaler()
          .setInputCol("features")
          .setOutputCol("scaledFeatures")
          .setWithMean(true)
          .setWithStd(true)
          .fit(data)
    
        val scaleddata = scaler.transform(data).select("label", "scaledFeatures").toDF("label","features")
    
        // 创建PCA模型,生成Transformer
        val pca = new PCA()
          .setInputCol("features")
          .setOutputCol("pcaFeatures")
          .setK(5)
          .fit(scaleddata)
    
        //  transform 数据,生成主成分特征
        val pcaResult = pca.transform(scaleddata).select("label","pcaFeatures").toDF("label","features")
    
        //  pcaResult.show(truncate=false)
    
        // 将标签与主成分合成为一列
        val assembler = new VectorAssembler()
          .setInputCols(Array("label","features"))
          .setOutputCol("assemble")
        val output = assembler.transform(pcaResult)
    
        // 输出csv格式的标签和主成分,便于可视化
        val ass = output.select(output("assemble").cast("string"))
        ass.write.mode("overwrite").csv("output.csv")
    
        // 将经过主成分分析的数据,按比例划分为训练数据和测试数据
        val Array(trainingData, testData) = pcaResult.randomSplit(Array(0.7, 0.3), seed = 20)
    
        // 创建SVC分类器(Estimator)
        val lsvc = new LinearSVC()
          .setMaxIter(10)
          .setRegParam(0.1)
    
        // 创建pipeline, 将上述步骤连接起来
        val pipeline = new Pipeline()
          .setStages(Array(scaler, pca, lsvc))
        
        // 使用串联好的模型在训练集上训练
        val model = pipeline.fit(trainingData)
        
        // 在测试集上测试
        val predictions = model.transform(testData).select("prediction","label")
    
        // 计算精度
        val evaluator = new MulticlassClassificationEvaluator()
          .setLabelCol("label")
          .setPredictionCol("prediction")
          .setMetricName("accuracy")
        val accuracy = evaluator.evaluate(predictions)
    
        println(s"Accuracy = ${accuracy}")
    
        spark.stop()
      }
    }
    
    

    最后的精度为1.0,这里使用的测试数据比较好分,从PCA后对前两维的可视化结果可以看出:

    image.png

    参考资料

    Spark ML文档
    DataFrame API
    PCA列数限制-源码
    导出cvs文件方法-stackoverflow
    无法导出csv文件-stackoverflow
    示例数据

    相关文章

      网友评论

        本文标题:使用Spark ML进行数据分析

        本文链接:https://www.haomeiwen.com/subject/tvtdlqtx.html