Spark ML之特征处理(1)

作者: 董可伦 | 来源:发表于2018-06-28 14:02 被阅读3次

    转载请务必注明原创地址为:https://dongkelun.com/2018/05/17/sparkMlFeatureProcessing1/

    前言

    最近在学习总结机器学习常用算法,在看spark机器学习决策树的官方示例时,发现用到了几个特征处理的类,之前没学习过,所以查了一下,感觉spark在特征处理方面的类还是挺多的,所以准备总结记录一下相关的用法,首先总结一下决策树中用到的几种。

    1、VectorIndexer

    根据源码注释,VectorIndexer是用于在“向量”的数据集中索引分类特征列的类(Class for indexing categorical feature columns in a dataset of Vector),这看起来不太好理解,直接看用法,举例说明就好了。

    1.1 数据

    我们用普通的数据格式即可:
    data1.txt

    1,-1.0 1.0
    0,0.0 3.0
    1,-1.0 5.0
    0,0.0 1.0
    

    其中第一列为label,后面的为features
    spark读取数据程序(供参考):

    import spark.implicits._
    val data_path = "files/ml/featureprocessing/data1.txt"
    val data = spark.read.text(data_path).map {
      case Row(line: String) =>
        var arr = line.split(',')
        (arr(0), Vectors.dense(arr(1).split(' ').map(_.toDouble)))
    }.toDF("label", "features")
    data.show(false)
    


    结果:

    +-----+----------+
    |label|features  |
    +-----+----------+
    |1    |[-1.0,1.0]|
    |0    |[0.0,3.0] |
    |1    |[-1.0,5.0]|
    |0    |[0.0,1.0] |
    +-----+----------+
    

    1.2 代码

    用法一:通过设置MaxCategories

    import org.apache.spark.ml.feature.VectorIndexer
    val indexer = new VectorIndexer()
      .setInputCol("features")
      .setOutputCol("indexed")
      .setMaxCategories(2)
    
    val indexerModel = indexer.fit(data)
    
    val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
    println(s"Chose ${categoricalFeatures.size} categorical features: " +
      categoricalFeatures.mkString(", "))
    
    val indexedData = indexerModel.transform(data)
    indexedData.show(false)
    

    结果

    Chose 1 categorical features: 0
    +-----+----------+---------+
    |label|features  |indexed  |
    +-----+----------+---------+
    |1    |[-1.0,1.0]|[1.0,1.0]|
    |0    |[0.0,3.0] |[0.0,3.0]|
    |1    |[-1.0,5.0]|[1.0,5.0]|
    |0    |[0.0,1.0] |[0.0,1.0]|
    +-----+----------+---------+
    

    可以看出选择了一个分类特征索引0,即第一个特征,设置MaxCategories为2,因为特征1只有两个值[-1.0,0.0]小于等于2所以会被索引,索引从零开始,而第二个特征有三个值[1.0,3.0,5.0],所以被认为是连续值,不会被索引,即保留原值。
    将MaxCategories设为5,重新跑一下上面的代码

    Chose 2 categorical features: 0, 1
    +-----+----------+---------+
    |label|features  |indexed  |
    +-----+----------+---------+
    |1    |[-1.0,1.0]|[1.0,0.0]|
    |0    |[0.0,3.0] |[0.0,1.0]|
    |1    |[-1.0,5.0]|[1.0,2.0]|
    |0    |[0.0,1.0] |[0.0,0.0]|
    +-----+----------+---------+
    

    这次选出两个特征都进行索引,这是因为每个特征值的数量都小于5,可以看出,索引值从0开始,依次加1,对应的原始特征值也是按大小排序对应的。

    2、StringIndexer

    StringIndexer和上面的VectorIndexer类似,是将label列进行重新编号,为了便于理解,我们重新造一个数据,只看label就好了

    2.1 数据

    data2.txt

    10,-1.0 1.0
    10,0.0 3.0
    7,-1.0 5.0
    5,0.0 1.0
    6,0.0 1.0
    6,0.0 1.0
    6,0.0 1.0
    

    读数程序和上面的一样,我们直接看一下StringIndexer代码和效果吧~

    2.2 代码

    import org.apache.spark.ml.feature.StringIndexer
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
    
    val indexerModel = labelIndexer.fit(data)
    
    val indexedData = indexerModel.transform(data)
    indexedData.show(false)
    

    结果

    +-----+----------+------------+
    |label|features  |indexedLabel|
    +-----+----------+------------+
    |10   |[-1.0,1.0]|1.0         |
    |10   |[0.0,3.0] |1.0         |
    |7    |[-1.0,5.0]|2.0         |
    |5    |[0.0,1.0] |3.0         |
    |6    |[0.0,1.0] |0.0         |
    |6    |[0.0,1.0] |0.0         |
    |6    |[0.0,1.0] |0.0         |
    +-----+----------+------------+
    

    从结果一眼可以看出,该类将label重新编号了,也是从零开始,但是不是按数值(因为StringIndexer从名字上就可以看出是将string类型的转出index)大小对应,是按label出现的频次排序的,出现频次最高索引为0,依次类推,如6出现了三次则索引后为0.0。
    既然提到了是将string类型的进行索引,那么简单的造个数看下效果吧,我想实际分类中字符类型的label也很多吧,比如根据特征分类胖瘦等
    名字还是data2.txt吧~

    thin,-1.0 1.0
    fat,0.0 3.0
    fat,-1.0 5.0
    thin,0.0 1.0
    fat,0.0 1.0
    

    将数据替换一下,重新跑一下上面的程序

    +-----+----------+------------+
    |label|features  |indexedLabel|
    +-----+----------+------------+
    |thin |[-1.0,1.0]|1.0         |
    |fat  |[0.0,3.0] |0.0         |
    |fat  |[-1.0,5.0]|0.0         |
    |thin |[0.0,1.0] |1.0         |
    |fat  |[0.0,1.0] |0.0         |
    +-----+----------+------------+
    

    2.3 注意

    利用训练好的模型还转化新来的数据,可能会碰到异常,为了简单起见,还是用上面的thin,fat,假如新来的数据的label里有既不是thin也不是fat的咋办,比如middle(只是举例)或者异常数据。
    data2new.txt

    thin,-1.0 1.0
    fat,0.0 3.0
    middle,1.0 2.0
    

    spark 提供了两种解决方法

    val labelIndexer = new StringIndexer()
      .setInputCol("label")
        //.setHandleInvalid("error")
      .setHandleInvalid("skip")
      .setOutputCol("indexedLabel")
    

    1、默认设置,也就是.setHandleInvalid("error"):会抛出异常

    Caused by: org.apache.spark.SparkException: Unseen label: middle
    

    2、.setHandleInvalid("skip") 忽略这些label所在行的数据,正常运行,将输出如下结果:

    +-----+----------+------------+
    |label|features  |indexedLabel|
    +-----+----------+------------+
    |thin |[-1.0,1.0]|1.0         |
    |fat  |[0.0,3.0] |0.0         |
    +-----+----------+------------+
    

    稍微有点啰嗦,附下完整代码吧~

    2.4 完整代码(供参考理解)

    package com.dkl.leanring.spark.ml
    
    import org.apache.spark.sql.SparkSession
    
    object StringIndexerDemo {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder
          .master("local")
          .getOrCreate()
        import spark.implicits._
    
        import org.apache.spark.ml.linalg.Vectors
        import org.apache.spark.sql.Row
        val data_path = "files/ml/featureprocessing/data2.txt"
        val data = spark.read.text(data_path).map {
          case Row(line: String) =>
            var arr = line.split(',')
            (arr(0), Vectors.dense(arr(1).split(' ').map(_.toDouble)))
        }.toDF("label", "features")
    
        import org.apache.spark.ml.feature.StringIndexer
        val labelIndexer = new StringIndexer()
          .setInputCol("label")
          //      .setHandleInvalid("error")
          .setHandleInvalid("skip")
          .setOutputCol("indexedLabel")
    
        val indexerModel = labelIndexer.fit(data)
    
        val indexedData = indexerModel.transform(data)
        indexedData.show(false)
    
        val data_new_path = "files/ml/featureprocessing/data2new.txt"
        val data_new = spark.read.text(data_new_path).map {
          case Row(line: String) =>
            var arr = line.split(',')
            (arr(0), Vectors.dense(arr(1).split(' ').map(_.toDouble)))
        }.toDF("label", "features")
        val indexedData_new = indexerModel.transform(data_new)
        indexedData_new.show(false)
      }
    }
    

    3、IndexToString

    IndexToString 的作用是将StringIndexer转化的结果转回去,实际应用中大概是这样,将字符型的label转换为数值代入算法模型训练预测,这时候预测的结果是StringIndexer转化的数值型,为了便于直观展示,需要变换回去

    3.1代码

    接上面2.2的代码

    import org.apache.spark.ml.feature.IndexToString
    val labelConverter = new IndexToString()
      .setInputCol("indexedLabel")
      .setOutputCol("predictedLabel")
    val predict = labelConverter.transform(indexedData)
    predict.show(false)
    

    3.2 结果

    +-----+----------+------------+--------------+
    |label|features  |indexedLabel|predictedLabel|
    +-----+----------+------------+--------------+
    |thin |[-1.0,1.0]|1.0         |thin          |
    |fat  |[0.0,3.0] |0.0         |fat           |
    |fat  |[-1.0,5.0]|0.0         |fat           |
    |thin |[0.0,1.0] |1.0         |thin          |
    |fat  |[0.0,1.0] |0.0         |fat           |
    +-----+----------+------------+--------------+
    

    根据结果可以看到该类又把indexedLabel转换为和原来的label一样的类型了

    4、附录

    当然在实际的算法中是要将预测后的数据进行转化的,所以为了便于理解,附上一个决策树分类的代码示例,因为这篇不是讲决策树的,所以这里不展开讲解了。

    4.1 数据

    data3.txt

    thin,1.5 50
    fat,1.5 60
    thin,1.6 40
    fat,1.6 60
    thin,1.7 60
    fat,1.7 80
    thin,1.8 60
    fat,1.8 90
    thin,1.9 70
    fat,1.9 80
    

    其中第一个特征为身高,第二个特征为体重,数据来源于:用Python开始机器学习(2:决策树分类算法),(数据是作者主观臆断,具有一定逻辑性,但请无视其合理性)

    4.2 代码及结果

    该代码主要来自官方示例。

    package com.dkl.leanring.spark.ml
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.classification.DecisionTreeClassificationModel
    import org.apache.spark.ml.classification.DecisionTreeClassifier
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    import org.apache.spark.ml.feature.{ IndexToString, StringIndexer, VectorIndexer }
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.sql.Row
    
    object DecisionTreeExample {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder
          .master("local")
          .getOrCreate()
        import spark.implicits._
        val data_path = "files/ml/featureprocessing/data3.txt"
        val data = spark.read.text(data_path).map {
          case Row(line: String) =>
            var arr = line.split(',')
            (arr(0), Vectors.dense(arr(1).split(' ').map(_.toDouble)))
        }.toDF("label", "features")
    
        // Index labels, adding metadata to the label column.
        // Fit on whole dataset to include all labels in index.
        val labelIndexer = new StringIndexer()
          .setInputCol("label")
          .setOutputCol("indexedLabel")
          .fit(data)
        // Automatically identify categorical features, and index them.
        val featureIndexer = new VectorIndexer()
          .setInputCol("features")
          .setOutputCol("indexedFeatures")
          .setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
          .fit(data)
    
        // Split the data into training and test sets (30% held out for testing).
        val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
    
        // Train a DecisionTree model.
        val dt = new DecisionTreeClassifier()
          .setLabelCol("indexedLabel")
          .setFeaturesCol("indexedFeatures")
    
        // Convert indexed labels back to original labels.
        val labelConverter = new IndexToString()
          .setInputCol("prediction")
          .setOutputCol("predictedLabel")
          .setLabels(labelIndexer.labels)
    
        // Chain indexers and tree in a Pipeline.
        val pipeline = new Pipeline()
          .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
    
        // Train model. This also runs the indexers.
        val model = pipeline.fit(trainingData)
    
        // Make predictions.
        val predictions = model.transform(testData)
    
        // Select example rows to display.
        predictions.select("predictedLabel", "label", "features").show(5)
    
        // Select (prediction, true label) and compute test error.
        val evaluator = new MulticlassClassificationEvaluator()
          .setLabelCol("indexedLabel")
          .setPredictionCol("prediction")
          .setMetricName("accuracy")
        val accuracy = evaluator.evaluate(predictions)
        println("Test Error = " + (1.0 - accuracy))
    
        val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
        println("Learned classification tree model:\n" + treeModel.toDebugString)
      }
    }
    

    结果:
    因为训练数据和测试数据是随机分的,所以每次跑的结果不太一样。

    +--------------+-----+----------+
    |predictedLabel|label|  features|
    +--------------+-----+----------+
    |           fat|  fat|[1.5,60.0]|
    |           fat|  fat|[1.9,80.0]|
    |           fat| thin|[1.5,50.0]|
    +--------------+-----+----------+
    
    Test Error = 0.33333333333333337
    Learned classification tree model:
    DecisionTreeClassificationModel (uid=dtc_4f851a4dd870) of depth 3 with 7 nodes
      If (feature 1 <= 70.0)
       If (feature 0 <= 1.6)
        If (feature 1 <= 40.0)
         Predict: 0.0
        Else (feature 1 > 40.0)
         Predict: 1.0
       Else (feature 0 > 1.6)
        Predict: 0.0
      Else (feature 1 > 70.0)
       Predict: 1.0
    

    参考资料

    https://blog.csdn.net/shenxiaoming77/article/details/63715525

    相关文章

      网友评论

        本文标题:Spark ML之特征处理(1)

        本文链接:https://www.haomeiwen.com/subject/wdugyftx.html