1、reduceByKey的原理图
image.png2、代码
2.1、查看每个数据所在的分区
SparkConf conf = new SparkConf()
.setMaster("local[2]") //local后面的括号里写多大,就会把数据分成多少个partition
.setAppName("ReduceByKeyTest");
JavaSparkContext jsc = new JavaSparkContext(conf);
String line = "hadoop spark hbase hbase hadoop spark hbase spark hive hbase spark hadoop oozie hue spark spark hadoop hadoop storm";
List<String> list = new ArrayList<>(Arrays
.asList(line.split(" ")));
JavaRDD<String> parallelizeRdd = jsc.parallelize(list);
JavaRDD<String> stringJavaRDD = parallelizeRdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
@Override
public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
List<String> list = new ArrayList<>();
while (iterator.hasNext()) {
list.add("partition" + index + ":" + iterator.next());
}
return list.iterator();
}
}, true);
System.out.println(stringJavaRDD.collect());
打印出来的内容是:
[partition0:hadoop, partition0:spark, partition0:hbase, partition0:hbase, partition0:hadoop, partition0:spark, partition0:hbase, partition0:spark, partition0:hive,
partition1:hbase, partition1:spark, partition1:hadoop, partition1:oozie, partition1:hue, partition1:spark, partition1:spark, partition1:hadoop, partition1:hadoop, partition1:storm]
2.2、对每一个单词进行mapToPair的转换
JavaPairRDD<String, Integer> pairRDD = parallelizeRdd.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
System.out.println(pairRDD.collect());
打印出来的内容是:
[(hadoop,1), (spark,1), (hbase,1), (hbase,1), (hadoop,1), (spark,1), (hbase,1), (spark,1), (hive,1), (hbase,1), (spark,1), (hadoop,1), (oozie,1), (hue,1), (spark,1), (spark,1), (hadoop,1), (hadoop,1), (storm,1)]
2.3、根据key进行聚合计算
JavaPairRDD<String, Integer> reduceByKeyRdd = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
System.out.println("v1:" + v1 );
System.out.println("v2:" + v2);
return v1+v2;
}
});
打印出来的是:
注:如果单词尽出现一次,那么不进行v1+v2的计算
v1:1
v2:1
v1:1
v2:1
v1:1
v2:1
v1:1
v2:1
v1:2
v2:1
v1:2
v2:1
v1:1
v2:1
v1:2
v2:1
v1:2
v2:1
---------------------------------------------
v1:2
v2:3
v1:3
v2:1
v1:3
v2:3
注:
第一次计算,先把每个partition里一样的key进行value的计算,如上原理图:
[partition0:hadoop, partition0:spark, partition0:hbase, partition0:hbase, partition0:hadoop, partition0:spark, partition0:hbase, partition0:spark, partition0:hive,
partition1:hbase, partition1:spark, partition1:hadoop, partition1:oozie, partition1:hue, partition1:spark, partition1:spark, partition1:hadoop, partition1:hadoop, partition1:storm]
partition0的返回:
hadoop
1 1
spark
1 1
2 1
hbase
1 1
2 1
partition1的返回:
spark
1 1
2 1
hadoop
1 1
2 1
第二次计算,把两个partition里key相同的value进行计算,如上原理图:
hadoop
2 3
spark
3 3
hbase
3 1
2.4、将聚合的寄过打印出来
System.out.println(reduceByKeyRdd.collect());
打印出来的结果是:
(hive,1)
(oozie,1)
(hue,1)
(spark,6)
(hadoop,5)
(storm,1)
(hbase,4)
网友评论