美文网首页
数据倾斜解决实例【适用于reduceByKey】

数据倾斜解决实例【适用于reduceByKey】

作者: Aluha_f289 | 来源:发表于2020-06-26 17:00 被阅读0次
package com.imooc;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

public class HelloWorld {
    public static void main(String[] args) {


        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]").setAppName("HelloWorld");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> list = new ArrayList<String>();
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");
        list.add("spark flume spark");
        list.add("hadoop flume hive");


        JavaRDD<String> rdd = sc.parallelize(list);

        JavaRDD<String> rdd2 =  rdd.flatMap(line ->
                Arrays.asList(line.split(" ")).iterator());

        JavaPairRDD<String, Integer> counts= rdd2.mapToPair(word ->
                new Tuple2<String,Integer>(word, 1));

        counts.foreach(o -> {
            System.out.println(o);
        });

        // 第一步,给RDD中的每个key都打上一个随机前缀。
        JavaPairRDD<String, Integer> randomPrefixRdd =  counts.mapToPair(
                new PairFunction<Tuple2<String, Integer>, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        Random random = new Random();
                        int prefix = random.nextInt(10);
                        return new Tuple2<String, Integer>(prefix + "_" + stringIntegerTuple2._1, stringIntegerTuple2._2);
                    }
                }
        );

        randomPrefixRdd.foreach(o -> {
            System.out.println(o);
        });


        //第二步,对打上随机前缀的key进行局部聚合。
        JavaPairRDD<String, Integer> localAggrRdd = randomPrefixRdd.reduceByKey(
               new Function2<Integer, Integer, Integer>() {
                   @Override
                   public Integer call(Integer v1, Integer v2) throws Exception {
                       return v1 + v2;
                   }
               }
       );

        localAggrRdd.foreach(o -> {
            System.out.println(o);
        });


        //第三步,去除RDD中每个key的随机前缀。
        JavaPairRDD<String, Integer> removedRandomPrefixRdd =   localAggrRdd.mapToPair(
                new PairFunction<Tuple2<String, Integer>, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        String originalKey = stringIntegerTuple2._1.split("_")[1];
                        return new Tuple2<String, Integer>(originalKey, stringIntegerTuple2._2);
                    }
                }
        );

        removedRandomPrefixRdd.foreach(o -> {
            System.out.println(o);
        });

        //第四步,对去掉随机数的key进行全局聚合
        JavaPairRDD<String, Integer> globalAggrRdd =  removedRandomPrefixRdd.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }
        );

        globalAggrRdd.foreach(o -> {
            System.out.println(o);
        });
    }
}


相关文章

网友评论

      本文标题:数据倾斜解决实例【适用于reduceByKey】

      本文链接:https://www.haomeiwen.com/subject/wasufktx.html