美文网首页
1.2.2.5MapReduce实例

1.2.2.5MapReduce实例

作者: 寒暄_HX | 来源:发表于2020-03-21 11:42 被阅读0次

    总目录:https://www.jianshu.com/p/e406a9bc93a9

    Hadoop - 子目录:https://www.jianshu.com/p/9428e443b7fd

    天气案例

    经典案例

    myclient.java :客户端

    package com.SL.tq;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.fs.Path;
    
    public class MyClient {
        
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            
            Configuration conf = new Configuration(true);
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(com.SL.tq.MyClient.class);
            
            //-----conf-----
            //++map:
    //      输入格式化:job.setInputFormatClass(ooxx.class);
            
            job.setMapperClass(TMaper.class);
            job.setMapOutputKeyClass(TQ.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            job.setPartitionerClass(TPartitioner.class);
            
            job.setSortComparatorClass(TsortComparator.class);
            
    //      job.setCombinerClass(TCombiner.class);
            
            //++reduce:
            
            job.setGroupingComparatorClass(TGroupingComparator.class);
            
            job.setReducerClass(TReducer.class);
            
            
            //++输入输出
            
            Path input = new Path("hdfs://192.168.110.110:9000/data/tq/input");
            FileInputFormat.addInputPath(job,input);
            
            Path output = new Path("hdfs://192.168.110.110:9000/data/tq/output");
            if(output.getFileSystem(conf).exists(output)) {
                output.getFileSystem(conf).delete(output,true);
            }
            FileOutputFormat.setOutputPath(job,output);
            
            job.setNumReduceTasks(2); 
            
            //--------------
            
            
            job.waitForCompletion(true);
        }
    
    }
    

    TMaper.java : map程序

    package com.SL.tq;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    import java.text.ParseException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    
    public class TMaper extends Mapper<LongWritable,Text,TQ,IntWritable>{
        TQ mkey = new TQ();
        IntWritable mval = new IntWritable();
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TQ, IntWritable>.Context context)
                throws IOException, InterruptedException {
    //  1949-10-01  14:21:02    34c
            try{
                String[] strs = StringUtils.split(value.toString(),"\t");
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                Date date = sdf.parse(strs[0]);
                Calendar cal = Calendar.getInstance();
                cal.setTime(date);
                
                mkey.setYear(cal.get(Calendar.YEAR));
                mkey.setMonth(cal.get(Calendar.MONTH)+1);
                mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
                
                int wd = Integer.parseInt(strs[1].substring(0,strs[1].length()-1));
                mkey.setWd(wd);
                mval.set(wd);
                System.out.println(mkey+"  "+mval);
                context.write(mkey, mval);
            } catch (ParseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        
        }
        
    }
    

    TPartitioner.java :分区器

    package com.SL.tq;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class TPartitioner extends Partitioner<TQ, IntWritable> {
        @Override
        public int getPartition(TQ key, IntWritable value, int numPartitions) {
            
            return key.hashCode() % numPartitions;
        }
    }
    

    TsortComparator.java :排序比较器

    package com.SL.tq;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class TsortComparator extends WritableComparator{
        
        public TsortComparator() {
            super(TQ.class,true);
        }
        
        @Override
        public int compare(WritableComparable a, WritableComparable b){
            TQ t1 = (TQ)a;
            TQ t2 = (TQ)b;
            
            int c1 = Integer.compare(t1.getYear(), t2.getYear());
            if (c1 == 0) {
                int c2 = Integer.compare(t1.getMonth(),t2.getMonth());
                if(c2 == 0) {
                    return Integer.compare(t1.getWd(),t2.getWd());
                }
                return c2;
            }
            return c1;
        }
    
    }
    
    

    TQ.java :序列化

    package com.SL.tq;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class TQ implements WritableComparable<TQ>{
        
        
        private int year;
        private int month;
        private int day;
        private int wd;
        
        
    
        public int getYear() {
            return year;
        }
    
        public void setYear(int year) {
            this.year = year;
        }
    
        public int getMonth() {
            return month;
        }
    
        public void setMonth(int month) {
            this.month = month;
        }
    
        public int getDay() {
            return day;
        }
    
        public void setDay(int day) {
            this.day = day;
        }
    
        public int getWd() {
            return wd;
        }
    
        public void setWd(int wd) {
            this.wd = wd;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(year);
            out.writeInt(month);
            out.writeInt(day);
            out.writeInt(wd);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.year=in.readInt();
            this.month=in.readInt();
            this.day=in.readInt();
            this.wd=in.readInt();
        }
    
        @Override
        public int compareTo(TQ that) {
            // 约定俗称:日期正序
            
            int c1 = Integer.compare(this.year,that.getYear());
            
            if(c1 == 0) {
                int c2 = Integer.compare(this.month,that.getMonth());
                if (c2 == 0) {
                    return Integer.compare(this.day, that.getDay());
                }
                return c2;
            }
            
            
            return c1;
        }
        
        
    
    }
    
    

    TsortComparator .java :自定义排序

    package com.SL.tq;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class TsortComparator extends WritableComparator{
        
        public TsortComparator() {
            super(TQ.class,true);
        }
        
        @Override
        public int compare(WritableComparable a, WritableComparable b){
            TQ t1 = (TQ)a;
            TQ t2 = (TQ)b;
            
            int c1 = Integer.compare(t1.getYear(), t2.getYear());
            if (c1 == 0) {
                int c2 = Integer.compare(t1.getMonth(),t2.getMonth());
                if(c2 == 0) {
                    return Integer.compare(t1.getWd(),t2.getWd());
                }
                return c2;
            }
            return c1;
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:1.2.2.5MapReduce实例

          本文链接:https://www.haomeiwen.com/subject/tvlyyhtx.html