package com.imooc;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
public class HelloWorld {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local[2]").setAppName("HelloWorld");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> list = new ArrayList<String>();
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
list.add("spark flume spark");
list.add("hadoop flume hive");
JavaRDD<String> rdd = sc.parallelize(list);
JavaRDD<String> rdd2 = rdd.flatMap(line ->
Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String, Integer> counts= rdd2.mapToPair(word ->
new Tuple2<String,Integer>(word, 1));
counts.foreach(o -> {
System.out.println(o);
});
// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Integer> randomPrefixRdd = counts.mapToPair(
new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, Integer>(prefix + "_" + stringIntegerTuple2._1, stringIntegerTuple2._2);
}
}
);
randomPrefixRdd.foreach(o -> {
System.out.println(o);
});
//第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Integer> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}
);
localAggrRdd.foreach(o -> {
System.out.println(o);
});
//第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<String, Integer> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
String originalKey = stringIntegerTuple2._1.split("_")[1];
return new Tuple2<String, Integer>(originalKey, stringIntegerTuple2._2);
}
}
);
removedRandomPrefixRdd.foreach(o -> {
System.out.println(o);
});
//第四步,对去掉随机数的key进行全局聚合
JavaPairRDD<String, Integer> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}
);
globalAggrRdd.foreach(o -> {
System.out.println(o);
});
}
}
网友评论