美文网首页
对spark reduceBykey(func: (V, V)

对spark reduceBykey(func: (V, V)

作者: 程序媛啊 | 来源:发表于2018-03-23 23:00 被阅读0次

1、reduceByKey的原理图

image.png

2、代码

2.1、查看每个数据所在的分区

SparkConf conf = new SparkConf()
                .setMaster("local[2]") //local后面的括号里写多大,就会把数据分成多少个partition
                .setAppName("ReduceByKeyTest");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        String line = "hadoop spark hbase hbase hadoop spark hbase spark hive hbase spark hadoop oozie hue spark spark hadoop hadoop storm";
        List<String> list = new ArrayList<>(Arrays
                .asList(line.split(" ")));
        JavaRDD<String> parallelizeRdd = jsc.parallelize(list);
        JavaRDD<String> stringJavaRDD = parallelizeRdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            @Override
            public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
                List<String> list = new ArrayList<>();
                while (iterator.hasNext()) {
                    list.add("partition" + index + ":" + iterator.next());
                }
                return list.iterator();
            }
        }, true);
System.out.println(stringJavaRDD.collect());

打印出来的内容是:

[partition0:hadoop, partition0:spark, partition0:hbase, partition0:hbase, partition0:hadoop, partition0:spark, partition0:hbase, partition0:spark, partition0:hive, 
partition1:hbase, partition1:spark, partition1:hadoop, partition1:oozie, partition1:hue, partition1:spark, partition1:spark, partition1:hadoop, partition1:hadoop, partition1:storm]

2.2、对每一个单词进行mapToPair的转换

JavaPairRDD<String, Integer> pairRDD = parallelizeRdd.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        System.out.println(pairRDD.collect());

打印出来的内容是:

[(hadoop,1), (spark,1), (hbase,1), (hbase,1), (hadoop,1), (spark,1), (hbase,1), (spark,1), (hive,1), (hbase,1), (spark,1), (hadoop,1), (oozie,1), (hue,1), (spark,1), (spark,1), (hadoop,1), (hadoop,1), (storm,1)]

2.3、根据key进行聚合计算

JavaPairRDD<String, Integer> reduceByKeyRdd = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                    System.out.println("v1:" + v1 );
                    System.out.println("v2:" + v2);
                return v1+v2;
            }
        });

打印出来的是:

注:如果单词尽出现一次,那么不进行v1+v2的计算
v1:1
v2:1
v1:1
v2:1
v1:1
v2:1
v1:1
v2:1
v1:2
v2:1
v1:2
v2:1
v1:1
v2:1
v1:2
v2:1
v1:2
v2:1
---------------------------------------------
v1:2
v2:3
v1:3
v2:1
v1:3
v2:3

注:

第一次计算,先把每个partition里一样的key进行value的计算,如上原理图:

[partition0:hadoop, partition0:spark, partition0:hbase, partition0:hbase, partition0:hadoop, partition0:spark, partition0:hbase, partition0:spark, partition0:hive, 
partition1:hbase, partition1:spark, partition1:hadoop, partition1:oozie, partition1:hue, partition1:spark, partition1:spark, partition1:hadoop, partition1:hadoop, partition1:storm]
partition0的返回:
hadoop 
1 1
spark
1 1
2 1
hbase
1 1
2 1

partition1的返回:
spark
1 1
2 1
hadoop 
1 1
2 1

第二次计算,把两个partition里key相同的value进行计算,如上原理图:

hadoop
2 3
spark 
3 3
hbase 
3 1

2.4、将聚合的寄过打印出来

System.out.println(reduceByKeyRdd.collect());

打印出来的结果是:

(hive,1)
(oozie,1)
(hue,1)
(spark,6)
(hadoop,5)
(storm,1)
(hbase,4)

相关文章

网友评论

      本文标题:对spark reduceBykey(func: (V, V)

      本文链接:https://www.haomeiwen.com/subject/eryfcftx.html