美文网首页大数据BigData
MapReduce-API(2)找出每月气温最高的2天

MapReduce-API(2)找出每月气温最高的2天

作者: geekAppke | 来源:发表于2018-11-21 14:40 被阅读5次
    思路:
        每年、每个月
        最高
        2天
        1天多条记录?
    进一部思考
        年月分组
        温度升序
        key中要包含时间和温度呀!
    MR原语:相同的key分到一组
        通过GroupCompartor设置分组规则
    

    思考

    1,MR
        * 保证原语
        怎样划分数据,怎样定义一组
    2,k:v映射的设计
        考虑reduce的计算复杂度
    3,能不能多个reduce
        倾斜:抽样
        集群资源情况
    4,自定义数据类型
    

    自定义类型
    分区
    排序比较器

    数据案例
    1949-10-01 14:21:02  34c
    1949-10-01 19:21:02  38c
    1949-10-02 14:01:02  36c
    1950-01-01 11:21:02  32c
    1950-10-01 12:21:02  37c
    1951-12-01 12:21:02  23c
    1950-10-02 12:21:02  41c
    1950-10-03 12:21:02  27c
    1951-07-01 12:21:02  45c
    1951-07-02 12:21:02  46c
    1951-07-03 12:21:03  47c
    

    top-K:找出每月气温最高的2天

    public class MyTQ {
        public static void main(String[] args) throws Exception {
            
            //1,conf
            Configuration conf = new Configuration(true);
            
            //2,job
            Job job = Job.getInstance(conf);
            job.setJarByClass(MyTQ.class);
            
            //3,输入源输出元
            Path input = new Path("/data/tq/input");
            FileInputFormat.addInputPath(job, input);
            
            Path output = new Path("/data/tq/output");
            if(output.getFileSystem(conf).exists(output)){
                output.getFileSystem(conf).delete(output, true);
            }
            FileOutputFormat.setOutputPath(job, output );
            
            //4,map
            job.setMapperClass(TqMapper.class);
            job.setMapOutputKeyClass(TQ.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setPartitionerClass(TqPartitioner.class);
            job.setSortComparatorClass(TqSortComparator.class);
            job.setCombinerClass(TqReducer.class);
    
            //5,reduce
            // 分组比较器
            job.setGroupingComparatorClass(TqGroupingComparator.class);
            job.setReducerClass(TqReducer.class);
            job.setNumReduceTasks(2);
            job.setCombinerKeyGroupingComparatorClass(TqGroupingComparator.class);
            
            //7,submit
            job.waitForCompletion(true);        
        }
    }
    

    自定义类型

    // 可比较、序列化的 数据结构
    // 实现接口
    public class TQ implements  WritableComparable<TQ>{
    
        private int year;
        private int month;
        private int day;
        private int wd;
        
        public int getYear() {
            return year;
        }
    
        public void setYear(int year) {
            this.year = year;
        }
    
        public int getMonth() {
            return month;
        }
    
        public void setMonth(int month) {
            this.month = month;
        }
    
        public int getDay() {
            return day;
        }
    
        public void setDay(int day) {
            this.day = day;
        }
    
        public int getWd() {
            return wd;
        }
    
        public void setWd(int wd) {
            this.wd = wd;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(year);
            out.writeInt(month);
            out.writeInt(day);
            out.writeInt(wd);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.year=in.readInt();
            this.month=in.readInt();
            this.day=in.readInt();
            this.wd=in.readInt();
        }
    
        @Override
        public int compareTo(TQ that) {
            // 约定俗成:日期正序
            int c1=Integer.compare(this.getYear(), that.getYear());
            if(c1==0) {
                int c2 = Integer.compare(this.getMonth(), that.getMonth());
                if(c2==0) {
                    // 比完日期,就没事了
                    return Integer.compare(this.getDay(), that.getDay());
                }
                return c2;
            }
            return c1;
        }
    }
    

    Map阶段

    public class TqMapper extends Mapper<LongWritable, Text, TQ, IntWritable> {
        // 放外面,不用每次都创建!
        TQ mkey = new TQ();
        IntWritable mval = new IntWritable();
        
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws Exception {
               
            try {
               // value:  1949-10-01 14:21:02   34c  >>  TQ
               String[] strs = StringUtils.split(value.toString(), '\t');
    
               SimpleDateFormat  sdf = new SimpleDateFormat("yyyy-MM-dd");
               Date date = sdf.parse(strs[0]);
                
               Calendar  cal = Calendar.getInstance();
               cal.setTime(date);
                
               mkey.setYear(cal.get(Calendar.YEAR));
               mkey.setMonth(cal.get(Calendar.MONTH)+1);
               mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
                
               int wd = Integer.parseInt(strs[1].substring(0, strs[1].length()-1));
               mkey.setWd(wd);
               mval.set(wd);
               context.write(mkey, mval);
            } catch (ParseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    

    分组

    对key做分组,规划
    map输出的每一条记录<key, value>都要调用Partitioner 方法一次
    不应该很复杂,不要造成数据倾斜!数据抽样!

    public class TqPartitioner  extends  Partitioner<TQ, IntWritable> {
        @Override
        public int getPartition(TQ key, IntWritable value, int numPartitions) { 
            // return key.hashCode() % numPartitions;   
            return key.getYear() % numPartitions;
        }
    }
    

    排序

    达到阈值时,开始溢写

    // 继承类
    public class TqSortComparator  extends  WritableComparator {
        public TqSortComparator() {
            super(TQ.class, true);
        }
        
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            TQ t1 = (TQ)a;
            TQ t2 = (TQ)b;
    
            int c1=Integer.compare(t1.getYear(), t2.getYear());
            if(c1==0){
                int c2=Integer.compare(t1.getMonth(), t2.getMonth());
                if(c2==0){
                    // 从大到小,倒序
                    return -Integer.compare(t1.getWd(), t2.getWd());
                }
                return c2;
            }   
            return c1;  
        }
    }
    

    分组比较器

    月份相同就是1组,并过滤同一天的数据

    public class TqGroupingComparator  extends WritableComparator {
        public TqGroupingComparator() {
            super(TQ.class,true);
        }
    
        public int compare(WritableComparable a, WritableComparable b) {
                TQ t1 = (TQ)a;
                TQ t2 = (TQ)b;
        
                int c1=Integer.compare(t1.getYear(), t2.getYear());
                if(c1==0){
                    return Integer.compare(t1.getMonth(), t2.getMonth());
                } 
                return c1;
        }
    }
    

    Reduce阶段

    只找前2条记录!

    public class TqReducer extends Reducer<TQ, IntWritable, Text, IntWritable> {
        Text rkey = new Text();
        IntWritable rval = new IntWritable();
        
        @Override
        protected void reduce(TQ key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {  
            int flg = 0;
            int day = 0;
            
            // 1970 01 20  34     34
            // 1970 01 12  28     28
            for (IntWritable v : values) {  // 根本就不用v,key跟着变动的
                if (flg == 0) {
                    // 1970-01-20:34
                    rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                    rval.set(key.getWd());
                    context.write(rkey,rval );
    
                    day = key.getDay(); 
                    flg++;
                }
                // 将同一天,多条记录排除
                if(flg!=0 && day != key.getDay()){
                    rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                    rval.set(key.getWd());
                    context.write(rkey,rval);
    
                    break;
                }       
            }   
        }
    }
    

    输出结果

    1949-10-1   38
    1949-10-2   36
    1950-1-1    32
    1950-10-2   41
    1950-10-1   37
    1951-7-3    47
    1951-7-2    46
    1951-12-1   23
    

    总结

    自定义数据类型Weather
        包含时间
        包含温度
        自定义排序比较规则
    自定义分组比较
        年月相同被视为相同的key
    那么reduce迭代时,相同年月的记录有可能是同一天的
        reduce中需要判断是否同一天
        注意OOM
    数据量很大
        全量数据可以切分成最少按一个月份的数据量进行判断
        这种业务场景可以设置多个reduce
        通过实现partition
    
    image.png

    相关文章

      网友评论

        本文标题:MapReduce-API(2)找出每月气温最高的2天

        本文链接:https://www.haomeiwen.com/subject/wtsqqqtx.html