美文网首页
对spark reduceBykey(func: (V, V)

对spark reduceBykey(func: (V, V)

作者: 程序媛啊 | 来源:发表于2018-03-23 23:00 被阅读0次

    1、reduceByKey的原理图

    image.png

    2、代码

    2.1、查看每个数据所在的分区

    SparkConf conf = new SparkConf()
                    .setMaster("local[2]") //local后面的括号里写多大,就会把数据分成多少个partition
                    .setAppName("ReduceByKeyTest");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            String line = "hadoop spark hbase hbase hadoop spark hbase spark hive hbase spark hadoop oozie hue spark spark hadoop hadoop storm";
            List<String> list = new ArrayList<>(Arrays
                    .asList(line.split(" ")));
            JavaRDD<String> parallelizeRdd = jsc.parallelize(list);
            JavaRDD<String> stringJavaRDD = parallelizeRdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
                @Override
                public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
                    List<String> list = new ArrayList<>();
                    while (iterator.hasNext()) {
                        list.add("partition" + index + ":" + iterator.next());
                    }
                    return list.iterator();
                }
            }, true);
    System.out.println(stringJavaRDD.collect());
    

    打印出来的内容是:

    [partition0:hadoop, partition0:spark, partition0:hbase, partition0:hbase, partition0:hadoop, partition0:spark, partition0:hbase, partition0:spark, partition0:hive, 
    partition1:hbase, partition1:spark, partition1:hadoop, partition1:oozie, partition1:hue, partition1:spark, partition1:spark, partition1:hadoop, partition1:hadoop, partition1:storm]
    

    2.2、对每一个单词进行mapToPair的转换

    JavaPairRDD<String, Integer> pairRDD = parallelizeRdd.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<>(s, 1);
                }
            });
            System.out.println(pairRDD.collect());
    

    打印出来的内容是:

    [(hadoop,1), (spark,1), (hbase,1), (hbase,1), (hadoop,1), (spark,1), (hbase,1), (spark,1), (hive,1), (hbase,1), (spark,1), (hadoop,1), (oozie,1), (hue,1), (spark,1), (spark,1), (hadoop,1), (hadoop,1), (storm,1)]
    

    2.3、根据key进行聚合计算

    JavaPairRDD<String, Integer> reduceByKeyRdd = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                        System.out.println("v1:" + v1 );
                        System.out.println("v2:" + v2);
                    return v1+v2;
                }
            });
    

    打印出来的是:

    注:如果单词尽出现一次,那么不进行v1+v2的计算
    v1:1
    v2:1
    v1:1
    v2:1
    v1:1
    v2:1
    v1:1
    v2:1
    v1:2
    v2:1
    v1:2
    v2:1
    v1:1
    v2:1
    v1:2
    v2:1
    v1:2
    v2:1
    ---------------------------------------------
    v1:2
    v2:3
    v1:3
    v2:1
    v1:3
    v2:3
    

    注:

    第一次计算,先把每个partition里一样的key进行value的计算,如上原理图:

    [partition0:hadoop, partition0:spark, partition0:hbase, partition0:hbase, partition0:hadoop, partition0:spark, partition0:hbase, partition0:spark, partition0:hive, 
    partition1:hbase, partition1:spark, partition1:hadoop, partition1:oozie, partition1:hue, partition1:spark, partition1:spark, partition1:hadoop, partition1:hadoop, partition1:storm]
    partition0的返回:
    hadoop 
    1 1
    spark
    1 1
    2 1
    hbase
    1 1
    2 1
    
    partition1的返回:
    spark
    1 1
    2 1
    hadoop 
    1 1
    2 1
    

    第二次计算,把两个partition里key相同的value进行计算,如上原理图:

    hadoop
    2 3
    spark 
    3 3
    hbase 
    3 1
    

    2.4、将聚合的寄过打印出来

    System.out.println(reduceByKeyRdd.collect());
    

    打印出来的结果是:

    (hive,1)
    (oozie,1)
    (hue,1)
    (spark,6)
    (hadoop,5)
    (storm,1)
    (hbase,4)
    

    相关文章

      网友评论

          本文标题:对spark reduceBykey(func: (V, V)

          本文链接:https://www.haomeiwen.com/subject/eryfcftx.html