代码如下:使用的spark版本为:spark-core_2.11,基于window平台,使用IDEA+MAVEN
package com.spark.test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
//创建spark的配置对象sparkConf,并设置相关的信息
SparkConf sparkConf = new SparkConf().setAppName("JavaTopNGroup").setMaster("local");
//创建spark的入口程序SparkContext
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
//RDD读取本地文件
JavaRDD<String> lines = ctx.textFile("file:///D://TopN.txt");
//把每一行数据编程key-value的方式
JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String line) throws Exception {
String[] str = line.split(" ");
return new Tuple2<String, Integer>(str[0],Integer.valueOf(str[1]));
}
});
//按照key进行排序
JavaPairRDD<String,Iterable<Integer>> groupPairs = pairs.groupByKey();
final JavaPairRDD<String,Iterable<Integer>> top5 = groupPairs.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupedData) throws Exception {
//用于保存TOP5的数据
Integer[] top5 = new Integer[5];
//获取分组的组名key
String groupByKey = groupedData._1();
//获取分组的内容集合
Iterator<Integer> groupValue = groupedData._2().iterator();
while (groupValue.hasNext()){
Integer value = groupValue.next();
for (int i = 0; i <5 ; i++) {
//还不够五个的情况
if(top5[i] == null){
top5[i] = value;
break;
//只取值topK的情况
}else if (value>top5[i]){
for (int j = 4; j > i; j--) {
top5[j] = top5[j-1];
}
top5[i]=value;
break;
}
}
}
return new Tuple2<String, Iterable<Integer>>(groupByKey, Arrays.asList(top5));
}
});
top5.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
public void call(Tuple2<String, Iterable<Integer>> topped) throws Exception {
System.out.println("Group By Key: "+topped._1());
Iterator<Integer> toppedValue = topped._2().iterator();
while(toppedValue.hasNext()){
Integer value = toppedValue.next();
System.out.println(value);
}
System.out.println("*************************************");
}
});
ctx.stop();
}
}
基于SCALA实现
object TopNGroup{
val conf=new SparkConf().setAppName("JavaTopNGroup").setMaster("local");
val sc = new SparkContext(conf);
val lines = sc.textFile("file:///D://TopN.txt");
val top5 =lines.map{
line => val splitedLine = lines.split(" ")
(splitedLine(0),splitedLine(1).toInt)
}.groupByKey().map{
groupedData => val groupByKey = groupedData._1
val top5:List[Int] = groupedData._2.toList.sortWith(_>_).take(5)
Tuple2(groupByKey,top5)
}
top5.foreach{
topped => println("Group key: "+topped._1)
val toopedValue:Iterator[Int] = topped._2.iterator
while(toopedValue.hasNext){
val value:Integer = toopedValue.next
println(value)
}
println("*****************")
}
sc.stop()
}
POM.XML文件如下所示:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
网友评论