美文网首页
数据倾斜解决实例【适用于reduceByKey】

数据倾斜解决实例【适用于reduceByKey】

作者: Aluha_f289 | 来源:发表于2020-06-26 17:00 被阅读0次
    package com.imooc;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Random;
    
    public class HelloWorld {
        public static void main(String[] args) {
    
    
            SparkConf conf = new SparkConf();
            conf.setMaster("local[2]").setAppName("HelloWorld");
            JavaSparkContext sc = new JavaSparkContext(conf);
            List<String> list = new ArrayList<String>();
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
            list.add("spark flume spark");
            list.add("hadoop flume hive");
    
    
            JavaRDD<String> rdd = sc.parallelize(list);
    
            JavaRDD<String> rdd2 =  rdd.flatMap(line ->
                    Arrays.asList(line.split(" ")).iterator());
    
            JavaPairRDD<String, Integer> counts= rdd2.mapToPair(word ->
                    new Tuple2<String,Integer>(word, 1));
    
            counts.foreach(o -> {
                System.out.println(o);
            });
    
            // 第一步,给RDD中的每个key都打上一个随机前缀。
            JavaPairRDD<String, Integer> randomPrefixRdd =  counts.mapToPair(
                    new PairFunction<Tuple2<String, Integer>, String, Integer>() {
                        @Override
                        public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                            Random random = new Random();
                            int prefix = random.nextInt(10);
                            return new Tuple2<String, Integer>(prefix + "_" + stringIntegerTuple2._1, stringIntegerTuple2._2);
                        }
                    }
            );
    
            randomPrefixRdd.foreach(o -> {
                System.out.println(o);
            });
    
    
            //第二步,对打上随机前缀的key进行局部聚合。
            JavaPairRDD<String, Integer> localAggrRdd = randomPrefixRdd.reduceByKey(
                   new Function2<Integer, Integer, Integer>() {
                       @Override
                       public Integer call(Integer v1, Integer v2) throws Exception {
                           return v1 + v2;
                       }
                   }
           );
    
            localAggrRdd.foreach(o -> {
                System.out.println(o);
            });
    
    
            //第三步,去除RDD中每个key的随机前缀。
            JavaPairRDD<String, Integer> removedRandomPrefixRdd =   localAggrRdd.mapToPair(
                    new PairFunction<Tuple2<String, Integer>, String, Integer>() {
                        @Override
                        public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                            String originalKey = stringIntegerTuple2._1.split("_")[1];
                            return new Tuple2<String, Integer>(originalKey, stringIntegerTuple2._2);
                        }
                    }
            );
    
            removedRandomPrefixRdd.foreach(o -> {
                System.out.println(o);
            });
    
            //第四步,对去掉随机数的key进行全局聚合
            JavaPairRDD<String, Integer> globalAggrRdd =  removedRandomPrefixRdd.reduceByKey(
                    new Function2<Integer, Integer, Integer>() {
                        @Override
                        public Integer call(Integer integer, Integer integer2) throws Exception {
                            return integer + integer2;
                        }
                    }
            );
    
            globalAggrRdd.foreach(o -> {
                System.out.println(o);
            });
        }
    }
    
    
    

    相关文章

      网友评论

          本文标题:数据倾斜解决实例【适用于reduceByKey】

          本文链接:https://www.haomeiwen.com/subject/wasufktx.html