美文网首页
Spark分组取TopN

Spark分组取TopN

作者: 阿坤的博客 | 来源:发表于2018-09-28 13:58 被阅读156次

    本文记录了利用Scala和Java两种语言来实现先分组,然后取每个分组的TopN。

    1.文本内容

    class1 90
    class2 56
    class1 87
    class1 76
    class2 88
    class1 95
    class1 74
    class2 87
    class2 67
    class2 77
    

    班级名 空格 分数

    2.scala实现分组TopN

    object ScalaGroupTop3 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName("ScalaGroupTop3")
          .setMaster("local[1]")
    
        val sc = new SparkContext(conf)
    
        sc.textFile("D:\\workspaces\\idea\\hadoop\\spark\\data\\score.txt")
          .map(line => {
            val datas = line.split(" ")
            (datas(0), datas(1))
          })
          .groupByKey()
          .map(group => (group._1, group._2.toList.sortWith(_ > _).take(3)))
          .sortByKey()
          .foreach(group => {
            println(group._1)
            group._2.foreach(println)
          })
    
        sc.stop()
      }
    }
    

    计算结果:

    class1
    95
    90
    87
    class2
    88
    87
    77
    

    3.java实现分组TopN

    public class GroupTop3 {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setAppName("Top3")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            JavaRDD<String> lines = sc.textFile("D:\\workspaces\\idea\\hadoop\\spark\\data\\score.txt");
    
            JavaPairRDD<String, Integer> pairs = lines.mapToPair(
                    new PairFunction<String, String, Integer>() {
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Integer> call(String line) {
                            String[] lineSplited = line.split(" ");
                            return new Tuple2<>(lineSplited[0], Integer.valueOf(lineSplited[1]));
                        }
                    });
    
            JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();
    
            JavaPairRDD<String, Iterable<Integer>> top3Score = groupedPairs.mapToPair(
                    new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> classScores) {
                            Integer[] top3 = new Integer[3];
    
                            String className = classScores._1;
                            Iterator<Integer> scores = classScores._2.iterator();
    
                            while (scores.hasNext()) {
                                Integer score = scores.next();
                                for (int i = 0; i < 3; i++) {
                                    if (top3[i] == null) {
                                        top3[i] = score;
                                        break;
                                    } else if (score > top3[i]) {
                                        for (int j = 2; j > i; j--) {
                                            top3[j] = top3[j - 1];
                                        }
                                        top3[i] = score;
                                        break;
                                    }
                                }
                            }
                            return new Tuple2<>(className, Arrays.asList(top3));
                        }
    
                    });
    
            top3Score.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
                    System.out.println("class: " + t._1);
                    Iterator<Integer> scoreIterator = t._2.iterator();
                    while (scoreIterator.hasNext()) {
                        Integer score = scoreIterator.next();
                        System.out.println(score);
                    }
                    System.out.println("=======================================");
                }
            });
    
            sc.close();
        }
    }
    

    计算结果:

    class: class1
    95
    90
    87
    =======================================
    class: class2
    88
    87
    77
    =======================================
    

    相关文章

      网友评论

          本文标题:Spark分组取TopN

          本文链接:https://www.haomeiwen.com/subject/qtbsoftx.html