美文网首页
数据算法 Hadoop/Spark大数据处理---第十四章

数据算法 Hadoop/Spark大数据处理---第十四章

作者: _Kantin | 来源:发表于2018-07-08 11:22 被阅读19次

本章为朴素贝叶斯算法

朴素贝叶斯算法的思想
image

本章实现方式

  • 1.基于spark来实现
  • 2.基于传统Scala来实现

++基于传统spark来实现++

1. 先构建分类器

image
 //获得训练数据集的数量
        long trainingDataSize = training.count();

        JavaPairRDD<Tuple2<String, String>, Integer> pairs = training.flatMapToPair(new PairFlatMapFunction<String, Tuple2<String, String>, Integer>() {
            @Override
            public Iterator<Tuple2<Tuple2<String, String>, Integer>> call(String rec) throws Exception {
                List<Tuple2<Tuple2<String, String>, Integer>> result =
                        new ArrayList<Tuple2<Tuple2<String, String>, Integer>>();
                String[] tokens = rec.split(",");
                //获得分类的结果
                int classification = tokens.length - 1;
                String theClassfication = tokens[classification];
                for (int i = 0; i < (classification - 1); i++) {
                    Tuple2<String, String> K = new Tuple2<>(tokens[i], theClassfication);
                    result.add(new Tuple2<Tuple2<String, String>, Integer>(K, 1));
                }
                Tuple2<String, String> V = new Tuple2<>("CLASS", theClassfication);
                result.add(new Tuple2<>(V, 1));
                return result.iterator();
            }
image
        });
        JavaPairRDD<Tuple2<String, String>, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i1;
            }
        });
image
        Map<Tuple2<String, String>, Integer> countsMap = counts.collectAsMap();

        //创建可能的分类概率表PT和所有的分类CLASSIFICATIONS
        //这些是用于存储在HDFS中供调用的
        HashMap<Tuple2<String, String>, Double> PT = new HashMap<>();
        List<String> CLASSIFICATIONS = new ArrayList<>();

        for(Map.Entry<Tuple2<String, String>, Integer> entry : countsMap.entrySet()){
            Tuple2<String, String> k = entry.getKey();
            //获得分类的目录
            String classification = k._2;
            if(k._1.equals("CLASS")){
                PT.put(k, (double) (entry.getValue()/trainingDataSize));
            }else{
                Tuple2<String, String> k2 = new Tuple2<>("CLASS", classification);
                Integer count = countsMap.get(k2);
                if(count == null){
                    PT.put(k,0.0);
                }else{
                    //count获取的是一个类目的总数
                    PT.put(k, (double) (entry.getValue()/count.intValue()));
                }
            }
        }
image
        //将PT和CLASSFICATION存储到HDFS中去
        List<Tuple2<PairOfStrings, DoubleWritable>> list = toWritableList(PT);
        JavaRDD<Tuple2<PairOfStrings, DoubleWritable>> ptRDD = ctx.parallelize(list);
        ptRDD.saveAsHadoopFile("/naivebayes/pt",              // name of path
                PairOfStrings.class,              // key class
                DoubleWritable.class,             // value class
                SequenceFileOutputFormat.class    // output format class
        );
        JavaRDD<String> classificationsRDD = ctx.parallelize(CLASSIFICATIONS);
        classificationsRDD.saveAsTextFile("/naivebayes/classes"); // name of path

        // done
        ctx.close();
        System.exit(0);

    }
    //把pt的key按照PairOfStrings和值DoubleWritable的方式进行保存
     static List<Tuple2<PairOfStrings, DoubleWritable>> toWritableList(HashMap<Tuple2<String, String>, Double> pt) {
         List<Tuple2<PairOfStrings, DoubleWritable>> list =
                 new ArrayList<Tuple2<PairOfStrings, DoubleWritable>>();
         for(Map.Entry<Tuple2<String,String>,Double> entry : pt.entrySet()){
             list.add(new Tuple2<PairOfStrings, DoubleWritable>( new PairOfStrings(entry.getKey()._1,entry.getKey()._2);
             new DoubleWritable(entry.getValue());
         }
         return list;
    }

image

2. 测试训练分类器

 //从HDFS中将刚刚保存的ptRDDq取出
        JavaPairRDD<PairOfStrings, DoubleWritable> ptRDD = ctx.hadoopFile(nbProbTablePath, SequenceFileInputFormat.class, PairOfStrings.class, DoubleWritable.class);

        //将ptRDD反转回正常的分类的概率表
        JavaPairRDD<Tuple2<String,String>, Double> classifierRDD = ptRDD.mapToPair(
                new PairFunction<
                        Tuple2<PairOfStrings,DoubleWritable>, // T
                        Tuple2<String,String>,                // K2,
                        Double                                // V2
                        >() {
                    @Override
                    public Tuple2<Tuple2<String,String>,Double> call(Tuple2<PairOfStrings,DoubleWritable> rec) {
                        PairOfStrings pair = rec._1;
                        Tuple2<String,String> K2 = new Tuple2<String,String>(pair.getLeftElement(), pair.getRightElement());
                        Double V2 = rec._2.get();
                        return new Tuple2<Tuple2<String,String>,Double>(K2, V2);
                    }
                });
        //将这个classifierRDD转换成Map类型
        Map<Tuple2<String, String>, Double> classifier = classifierRDD.collectAsMap();
        Broadcast<Map<Tuple2<String, String>, Double>> broadcastClassifier = ctx.broadcast(classifier);

        //将分类的结果也提取出来
        JavaRDD<String> classesRDD = ctx.textFile("/naivebayes/classes", 1);
        List<String> CLASSES = classesRDD.collect();
        final Broadcast<List<String>> broadcastClasses = ctx.broadcast(CLASSES);

        JavaPairRDD<String,String> classified =  newdata.mapToPair(new PairFunction<String, String, String>() {
            @Override
            public Tuple2<String, String> call(String rec) throws Exception {
                //获得分类的概率结果
                Map<Tuple2<String,String>, Double> CLASSIFIER = broadcastClassifier.value();
                //获得分类的结果
                List<String> CLASSES = broadcastClasses.value();

                String[] arrtibutes = rec.split(",");
                String selectedClass = null;
                double maxPosterior = 0.0;
                for(String aClass : CLASSES){
                    double posterior = CLASSIFIER.get(new Tuple2<>("CLASS",aClass));
                    for (int i=0;i<arrtibutes.length;i++){
                        Double probability = CLASSIFIER.get(new Tuple2<>(arrtibutes[i], aClass));
                        if(probability == null){
                            probability=0.0;
                            break;
                        }else{
                            posterior*=probability;
                        }
                        if (selectedClass ==null){
                            maxPosterior = posterior;
                            selectedClass = aClass;
                        }else{
                            if (posterior> maxPosterior){
                                maxPosterior = posterior;
                                selectedClass = aClass;
                            }
                        }
                    }

                }
                return new Tuple2<String,String>(rec, selectedClass);
            }
        });

++基于传统scala来实现++

1. 先构建分类器

 val training = sc.textFile(input)
    //获得训练数据的个数
    val traningDataSize = training count

    //对训练数据进行拆分,CLASS+class 1  value+class 1
    val pairs = training.flatMap(line =>{
      val tokens = line.split(",")
      val theClassification = tokens.last
      (("CLASS", theClassification), 1) :: tokens.init.map(token => ((token, theClassification), 1)).toList
    })

    val counts = pairs reduceByKey (_+_)

    val countsAsMap = counts collectAsMap
    val pt = countsAsMap.map(tuple =>{
      if(tuple._1._1 =="CLASS") (tuple._1,(tuple._2/traningDataSize.toDouble)) else{
        val count  = countsAsMap.getOrElse(("CLASS",tuple._1._2),0)
        if(count ==0)(tuple._1,0d) else(tuple._1,(tuple._2/count.toDouble))
      }
    })
    val ptRDD = sc.parallelize(pt.toList)

    pt.foreach(f => println(s"${f._1._1},${f._1._2},${f._2}"))

    ptRDD.saveAsObjectFile(output + "/naivebayes/pt")
  }

2. 对数据进行分类

 val newdata = sc.textFile(input)
    //从HDFS表中获取分类的概率
    val classifierRDD = sc.objectFile[Tuple2[Tuple2[String, String], Double]](nbProbabilityTablePath)

    //获取原先保存的分类表和分类类目
    val classifier = classifierRDD.collectAsMap();
    val broadcastClassifier = sc.broadcast(classifier);
    val classesRDD = sc.textFile(classesPath)
    val broadcastClasses = sc.broadcast(classesRDD.collect())

    val classified = newdata.map(rec =>{
      val classifier = broadcastClassifier.value
      val classes = broadcastClasses.value
      val attributes = rec.split(",")

      val class_score = classes.map(aClass =>{
        val posterior = classifier.getOrElse(("CLASS", aClass), 1d)
        //获得每个属性的概率
        val probabilitied = attributes.map(attribute =>{
          classifier.getOrElse((attribute,aClass),0d)
        })
        //把概率累加起来
        (aClass,probabilitied.product*posterior)
      })
      //获得最大的那个class
      val maxClass = class_score.maxBy(_._2)
      //获得分类的类型
      (rec,maxClass._1)
    })

相关文章

网友评论

      本文标题:数据算法 Hadoop/Spark大数据处理---第十四章

      本文链接:https://www.haomeiwen.com/subject/jqqouftx.html