美文网首页Spark开发代码
根据温度排序,温度相同按照湿度排序

根据温度排序,温度相同按照湿度排序

作者: 0_9f3a | 来源:发表于2018-01-03 16:51 被阅读0次

    原始数据

    1949-10-01 14:21:02 34  23
    1949-10-01 19:21:02 38  34
    1949-10-02 14:01:02 36  56
    1950-01-01 11:21:02 32  67
    1950-10-01 12:21:02 37  11
    1951-12-01 12:21:02 23  78
    1950-10-02 12:21:02 41  39
    1950-10-03 12:21:02 27  88
    。。。。。
    

    思路:
    1.将数据读取到RDD1中
    2.将RDD1中的数据转换成K-V格式的RDD2
    3.对RDD2使用sortByKey排序
    代码

    public class SecondSort {
        public static void main(String[] args) {
    //获取温度 湿度信息
            SparkConf conf = new SparkConf().setAppName("SecondSort").setMaster("local[1]");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> weatherRdd = sc.textFile("weather");
            
            /**
             * mapToPair 算子是java api只能够独有的,在scala api中没有这个算子 在scala中相当于map
             * mapToPair可以返回一个KV格式的RDD
             * 泛型解释:
             *  String:wetherRDD中每一条元素的类型, SortObj:返回的RDD的key类型, String:返回的RDD的value类型
             */
            weatherRdd.mapToPair(new PairFunction<String, SortObj, String>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Tuple2<SortObj, String> call(String log) throws Exception {
                    //log = weatherRdd中每一条记录
                    String[] splited = log.split("\t");
                    Integer temperature = Integer.parseInt(splited[1].trim());
                    Integer shidu = Integer.parseInt(splited[2]);
                    SortObj sortObj = new SortObj(temperature,shidu);
                    return new Tuple2<SortObj, String>(sortObj,log);
                }
            }).sortByKey()//对RDD2的温度进行排序
            .foreach(new VoidFunction<Tuple2<SortObj,String>>() {//遍历RDD2中每一条数据
    
                /**
                 * SortObj RDD2的key的类型 String RDD2的value类型
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Tuple2<SortObj, String> t) throws Exception {
                    // TODO Auto-generated method stub
                    System.out.println(t);
                }
                /**
                 * SortObj RDD2 Key的类型
                 */
            });
            sc.stop();
        }
    }
    

    其中SortObj用来寻找温度相同的元素

    public class SortObj implements Serializable,Comparable<SortObj> {
        private Integer temperature;
        private Integer shidu;
        public SortObj() {
            super();
        }
        public SortObj(Integer temperature, Integer shidu) {
            super();
            this.temperature = temperature;
            this.shidu = shidu;
        }
        public Integer getTemperature() {
            return temperature;
        }
        public void setTemperature(Integer temperature) {
            this.temperature = temperature;
        }
        public Integer getShidu() {
            return shidu;
        }
        public void setShidu(Integer shidu) {
            this.shidu = shidu;
        }
        @Override
        public int compareTo(SortObj o) {
            if(o.getTemperature() - getTemperature() == 0){
                return o.getShidu() - getShidu();
            }else{
                return o.getTemperature() - getTemperature();
            }
        }
    }
    

    问题:
    在scala中如何将一个非KV格式的RDD变成KV格式的RDD?
    原则: 只要是xxToPair这样的方法,他的返回值一定是一个KV格式的RDD

    相关文章

      网友评论

        本文标题:根据温度排序,温度相同按照湿度排序

        本文链接:https://www.haomeiwen.com/subject/wczqnxtx.html