美文网首页
JAVA+SCALA实现SPARK求TopN

JAVA+SCALA实现SPARK求TopN

作者: _Kantin | 来源:发表于2017-11-27 21:01 被阅读44次

    代码如下:使用的spark版本为:spark-core_2.11,基于window平台,使用IDEA+MAVEN

    
    package com.spark.test;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    /**
     * Hello world!
     *
     */
    public class App 
    {
        public static void main( String[] args )
        {
            //创建spark的配置对象sparkConf,并设置相关的信息
            SparkConf sparkConf = new SparkConf().setAppName("JavaTopNGroup").setMaster("local");
            //创建spark的入口程序SparkContext
            JavaSparkContext ctx = new JavaSparkContext(sparkConf);
            //RDD读取本地文件
            JavaRDD<String> lines = ctx.textFile("file:///D://TopN.txt");
            //把每一行数据编程key-value的方式
            JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String line) throws Exception {
                    String[] str = line.split(" ");
                    return new Tuple2<String, Integer>(str[0],Integer.valueOf(str[1]));
                }
            });
            //按照key进行排序
            JavaPairRDD<String,Iterable<Integer>> groupPairs = pairs.groupByKey();
            final   JavaPairRDD<String,Iterable<Integer>> top5 = groupPairs.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
                public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupedData) throws Exception {
                    //用于保存TOP5的数据
                    Integer[] top5 = new Integer[5];
                    //获取分组的组名key
                    String groupByKey = groupedData._1();
                    //获取分组的内容集合
                    Iterator<Integer> groupValue = groupedData._2().iterator();
                    while (groupValue.hasNext()){
                        Integer value = groupValue.next();
                        for (int i = 0; i <5 ; i++) {
                            //还不够五个的情况
                            if(top5[i] == null){
                                top5[i] = value;
                                break;
                                //只取值topK的情况
                            }else if (value>top5[i]){
                                for (int j = 4; j > i; j--) {
                                    top5[j] = top5[j-1];
                                }
                                top5[i]=value;
                                break;
                            }
                        }
                    }
                    return new  Tuple2<String, Iterable<Integer>>(groupByKey, Arrays.asList(top5));
                }
            });
    
            top5.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
                public void call(Tuple2<String, Iterable<Integer>> topped) throws Exception {
                    System.out.println("Group By Key: "+topped._1());
                    Iterator<Integer> toppedValue = topped._2().iterator();
                    while(toppedValue.hasNext()){
                        Integer value = toppedValue.next();
                        System.out.println(value);
                    }
                    System.out.println("*************************************");
                }
    
            });
            ctx.stop();
        }
    }
    
    

    基于SCALA实现

    object TopNGroup{
       val conf=new SparkConf().setAppName("JavaTopNGroup").setMaster("local");
       val sc = new SparkContext(conf);
       val lines = sc.textFile("file:///D://TopN.txt");
       val top5 =lines.map{
         line => val splitedLine = lines.split(" ")
         (splitedLine(0),splitedLine(1).toInt)
       }.groupByKey().map{
         groupedData => val groupByKey = groupedData._1
         val top5:List[Int] =  groupedData._2.toList.sortWith(_>_).take(5)
         Tuple2(groupByKey,top5)
       }
       
       top5.foreach{
         topped => println("Group key: "+topped._1)
         val toopedValue:Iterator[Int] = topped._2.iterator
         while(toopedValue.hasNext){
           val value:Integer = toopedValue.next
           println(value)
         }
          println("*****************")
       }   
       sc.stop()
    }
    

    POM.XML文件如下所示:

     <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>2.2.0</version>
        </dependency>
    

    相关文章

      网友评论

          本文标题:JAVA+SCALA实现SPARK求TopN

          本文链接:https://www.haomeiwen.com/subject/pecjbxtx.html