美文网首页
Spark中文文本分类

Spark中文文本分类

作者: 郭彦超 | 来源:发表于2021-04-25 15:32 被阅读0次

    文本分类是指将一篇文章归到事先定义好的某一类或者某几类,互联网时代到来,数据以指数级增长,自媒体的兴起,让文本的增长更是突飞猛进,文档作为一种非结构化的数据(MySQL 中存放的是结构化数据),对于它的分析本来就存在一定的难度,再加上数据量的猛增,让原本 Python 的单机机器学习也压力倍增,显得力不从心。。
    本文介绍使用Spark MLlib提供的朴素贝叶斯(Naive Bayes)及随机森林算法,完成对中文文本的分类过程。主要包括中文分词、文本向量化表示(TF-IDF、word2vec)、模型训练、分类预测等。

    中文分词

    对于中文文本分类而言,需要先对文章进行分词,我使用的是Hanlp中文分析工具

    <dependency>
        <groupId>com.hankcs</groupId>
        <artifactId>hanlp</artifactId>
        <version>portable-1.8.1</version>
    </dependency>
    

    中文特征向量化处理

    对文本特征处理,即文本向量化的过程。常用的特征处理方法有:


    TF-IDF 从字面意思来看分为 TF 和 IDF,TF 的意思是 Term Frequency,也就是词在文章中出现的频率,可以简单的认为是:一个词在文章中出现的频率越高,代表这个词越重要。比如:“坦克”这个词在军事类文章中出现了很多次,那么这个词对这类文章就会很重要,可能经济类的文章也会偶尔出现“坦克”,但肯定不会出现很多,那么这个词对经济类文章相对而言就不是那么重要。

    IDF 的意思是 Inverse Document Frequency,也就是逆文本频率,可以认为是:一些词在一类文章中出现很多,如“坦克”,但在其他经济、政治类文章中很少出现,那么这个词就具有很好的分类能力,但相反,一些词在很多文章中都出现,如“有的”、“我们”等,它们虽然在很多文章中都出现了,但并没有很好的分类的能力,这个时候逆词频就发挥作用了,你出现的越多,你的比重反而下降了。

    • TF-IDF 的基本思想是:
      一个词的重要性随着它在文件中出现的次数成正比增加,但同时会随着它包含的文档数成反比下降。

    分好词后,每一个词都作为一个特征,但需要将中文词语转换成Double型来表示,通常使用该词语的TF-IDF值作为特征值,Spark提供了全面的特征抽取及转换的API,非常方便,详见:TF-IDF的API

    • 用以下数据举例
    0,苹果 官网 苹果 宣布
    1,苹果 梨 香蕉
    

    举个例子,“苹果”在 1 篇文章共 1000 个词中总共出现了 10 次,那么“苹果”的 TF 就是 10/1000 = 0.01,“苹果”在 10000 篇文章中只在 10 篇里面出现过,那么“苹果”的 IDF 就是lg(10000/10) = 3,那么“苹果”的 TF-IDF 值就是 0.01*3 = 0.03。
    TFIDF特征处理如下:

    case class RawDataRecord(category: String, text: String)
     
    //将原始数据映射到DataFrame中,字段category为分类编号,字段text为分好的词,以空格分隔
    srcDF.select("category", "text").take(2).foreach(println)
    [0,苹果 官网 苹果 宣布]
    [1,苹果 梨 香蕉]
    //将分好的词转换为数组
    var tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
    var wordsData = tokenizer.transform(srcDF)
     
    wordsData.select($"category",$"text",$"words").take(2).foreach(println)
    [0,苹果 官网 苹果 宣布,WrappedArray(苹果, 官网, 苹果, 宣布)]
    [1,苹果 梨 香蕉,WrappedArray(苹果, 梨, 香蕉)]
     
    //将每个词转换成Int型,并计算其在文档中的词频(TF)
    var hashingTF = 
    new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(100)
    var featurizedData = hashingTF.transform(wordsData)
    

    这里将中文词语转换成INT型的Hashing算法,类似于Bloomfilter,上面的setNumFeatures(100)表示将Hash分桶的数量设置为100个,这个值默认为2的20次方,即1048576,可以根据你的词语数量来调整,一般来说,这个值越大,不同的词被计算为一个Hash值的概率就越小,数据也更准确,但需要消耗更大的内存,和Bloomfilter是一个道理。

    featurizedData.select($"category", $"words", $"rawFeatures").take(2).foreach(println)
    [0,WrappedArray(苹果, 官网, 苹果, 宣布),(100,[23,81,96],[2.0,1.0,1.0])]
    [1,WrappedArray(苹果, 梨, 香蕉),(100,[23,72,92],[1.0,1.0,1.0])]
    

    结果中,“苹果”用23来表示,第一个文档中,词频为2,第二个文档中词频为1.

    //计算TF-IDF值
    var idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    var idfModel = idf.fit(featurizedData)
    var rescaledData = idfModel.transform(featurizedData)
    rescaledData.select($"category", $"words", $"features").take(2).foreach(println)
     
    [0,WrappedArray(苹果, 官网, 苹果, 宣布),(100,[23,81,96],[0.0,0.4054651081081644,0.4054651081081644])]
    [1,WrappedArray(苹果, 梨, 香蕉),(100,[23,72,92],[0.0,0.4054651081081644,0.4054651081081644])]
     
    //因为一共只有两个文档,且都出现了“苹果”,因此该词的TF-IDF值为0.
    

    特征转换

    最后将上述数据转换为Bayes输入格式

    var trainDataRdd = rescaledData.select($"category",$"features").map {
        case Row(label: String, features: Vector) =>
        LabeledPoint(label.toDouble, Vectors.dense(features.toArray))
    }
    

    每一个LabeledPoint中,特征数组的长度为100(setNumFeatures(100)),”官网”和”宣布”对应的特征索引号分别为81和96,因此,在特征数组中,第81位和第96位分别为它们的TF-IDF值。

    模型训练

    数据准备好了,接下来进行模型训练及分类预测,代码:

    %spark
    import org.apache.spark.ml.attribute.Attribute
    import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
    import com.hankcs.hanlp.HanLP;
    import scala.collection.JavaConversions._
    import com.hankcs.hanlp.seg.common.Term;
    import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
    import org.apache.spark.ml.classification.NaiveBayes
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    import org.apache.spark.ml.tuning.{TrainValidationSplit, CrossValidator, ParamGridBuilder}
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.feature.Word2Vec
    import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
    import org.apache.spark.ml.classification.{RandomForestClassifier, GBTClassifier}
    // import spark.implicits._
    
    val df = spark.read.option("header",true).csv("/data/stat/recommend/ireg2/2021-05-12.csv")
    
    
    case class wordFearture(category:String, wordsFearture:String)
    
    
    val wordsSet = df.where("category_id is not null and name<>'会员免费'").rdd.map(row=>{
    //   try {
        //  print(row)
         var words = ""
        
         val ls = HanLP.segment(row.getAs("title").toString())
         for(item <- ls){
            if(item.word.length>=1 && !item.word.startsWith("%")){
                words = words + item.word + " "
            }
         }
         wordFearture(row.getAs[String]("name") , words)
    //   }catch {
    //         //如果解析报错赋予空值
    //      case e:Exception=> print(e)
    //   }
     })
    
    val wordsDF = spark.createDataFrame(wordsSet)
    val indexer = new StringIndexer()
      .setInputCol("category")
      .setOutputCol("label")
      .fit(wordsDF)
    //val indexed = indexer.transform(wordsDF)
    
    val tokenizer = new Tokenizer().setInputCol("wordsFearture").setOutputCol("words")
     
    //TF IDF
    
    val hashingTF = new HashingTF()
       .setInputCol("words").setOutputCol("rawFeatures")
    
    //将上一步的 TF 计算 IDF
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    
    val VECTOR_SIZE = 512
    //word2vec
    val word2Vec = new Word2Vec()
      .setInputCol("words")
      .setOutputCol("features")
      .setVectorSize(VECTOR_SIZE)
      .setMinCount(1)
    
    val nb = new NaiveBayes()
    val layers = Array[Int](VECTOR_SIZE,6,5,indexer.labels.size)
    // val md = new MultilayerPerceptronClassifier().setLayers(layers).setBlockSize(512).setSeed(1234L).setMaxIter(128).setFeaturesCol("features").setPredictionCol("prediction")
    
    val md = new RandomForestClassifier()
      .setLabelCol("label")
      .setFeaturesCol("features")
      .setNumTrees(20)
      .setMaxDepth(5)
    
    val converter = new IndexToString()
      .setInputCol("prediction")
      .setOutputCol("predictionName") 
      .setLabels(indexer.labels) 
    
    //贝叶斯分类  
    //val pipeline = new Pipeline().setStages(Array(indexer, tokenizer, hashingTF, idf, nb, converter))
    //随机森林分类树
    val pipeline = new Pipeline().setStages(Array(indexer, tokenizer, word2Vec, md, converter))
    
    //网格参数使得超参数调优更加的方便,只需要在网格中加入可能的参数
    val paramGrid = new ParamGridBuilder()
      .addGrid(nb.smoothing, Array(0.5, 1,1.5))
      .build()
    
    //将所有的步骤加入到 TrainValidationSplit 中,包括 训练器、评估方法、模型的网格参数、并行度等
    // val cv = new TrainValidationSplit()
    //   .setEstimator(pipeline)
    //   .setEvaluator(new MulticlassClassificationEvaluator)
    //   .setEstimatorParamMaps(paramGrid)
    //   .setTrainRatio(0.7)
    //   .setParallelism(2)
    val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(new MulticlassClassificationEvaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(5)  // Use 3+ in practice
      .setParallelism(2)  // Evaluate up to 2 parameter settings in parallel
      
    val Array(training, test) = wordsDF.randomSplit(Array(0.8, 0.2), seed = 12345)
    
    // val model = cv.fit(training)
    val model = pipeline.fit(training)
    
    val predictions = model.transform(test)
    
    //评估模型
    val evaluator = new MulticlassClassificationEvaluator()
       .setLabelCol("label")
       .setPredictionCol("prediction")
       .setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)
    println(s"Test set accuracy = $accuracy")
     
    import org.apache.spark.ml.functions.vector_to_array
    import org.apache.spark.sql.functions._
    predictions.select($"category", $"predictionName",round(element_at(vector_to_array($"probability"),1),4)).show
    
    
    • 效果展示

    相关文章

      网友评论

          本文标题:Spark中文文本分类

          本文链接:https://www.haomeiwen.com/subject/wxbdrltx.html