原始数据
1949-10-01 14:21:02 34 23
1949-10-01 19:21:02 38 34
1949-10-02 14:01:02 36 56
1950-01-01 11:21:02 32 67
1950-10-01 12:21:02 37 11
1951-12-01 12:21:02 23 78
1950-10-02 12:21:02 41 39
1950-10-03 12:21:02 27 88
。。。。。
思路:
1.将数据读取到RDD1中
2.将RDD1中的数据转换成K-V格式的RDD2
3.对RDD2使用sortByKey排序
代码
public class SecondSort {
public static void main(String[] args) {
//获取温度 湿度信息
SparkConf conf = new SparkConf().setAppName("SecondSort").setMaster("local[1]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> weatherRdd = sc.textFile("weather");
/**
* mapToPair 算子是java api只能够独有的,在scala api中没有这个算子 在scala中相当于map
* mapToPair可以返回一个KV格式的RDD
* 泛型解释:
* String:wetherRDD中每一条元素的类型, SortObj:返回的RDD的key类型, String:返回的RDD的value类型
*/
weatherRdd.mapToPair(new PairFunction<String, SortObj, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<SortObj, String> call(String log) throws Exception {
//log = weatherRdd中每一条记录
String[] splited = log.split("\t");
Integer temperature = Integer.parseInt(splited[1].trim());
Integer shidu = Integer.parseInt(splited[2]);
SortObj sortObj = new SortObj(temperature,shidu);
return new Tuple2<SortObj, String>(sortObj,log);
}
}).sortByKey()//对RDD2的温度进行排序
.foreach(new VoidFunction<Tuple2<SortObj,String>>() {//遍历RDD2中每一条数据
/**
* SortObj RDD2的key的类型 String RDD2的value类型
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<SortObj, String> t) throws Exception {
// TODO Auto-generated method stub
System.out.println(t);
}
/**
* SortObj RDD2 Key的类型
*/
});
sc.stop();
}
}
其中SortObj用来寻找温度相同的元素
public class SortObj implements Serializable,Comparable<SortObj> {
private Integer temperature;
private Integer shidu;
public SortObj() {
super();
}
public SortObj(Integer temperature, Integer shidu) {
super();
this.temperature = temperature;
this.shidu = shidu;
}
public Integer getTemperature() {
return temperature;
}
public void setTemperature(Integer temperature) {
this.temperature = temperature;
}
public Integer getShidu() {
return shidu;
}
public void setShidu(Integer shidu) {
this.shidu = shidu;
}
@Override
public int compareTo(SortObj o) {
if(o.getTemperature() - getTemperature() == 0){
return o.getShidu() - getShidu();
}else{
return o.getTemperature() - getTemperature();
}
}
}
问题:
在scala中如何将一个非KV格式的RDD变成KV格式的RDD?
原则: 只要是xxToPair这样的方法,他的返回值一定是一个KV格式的RDD
网友评论