美文网首页
16利用hadoop进行天气情况的统计(附源码及详解)

16利用hadoop进行天气情况的统计(附源码及详解)

作者: 文茶君 | 来源:发表于2019-12-08 10:10 被阅读0次
要求

思考:
要确定K,V.年月怎么分?年月应该排序,正序排出,而关于温度希望在map端本身倒序输出。这样就可以直接取出最顶两个。一天可能有多个高温。字符串34c,怎么把c去掉。自定义类应该实现比较器
对于这个例子,我先贴出代码,然后再分部分进行分析
首先,新建工程,我们这里使用15提到的工程,在此基础上新建项目


新建项目

代码如下

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TQMR {
    
    
    public static void main(String[] args) throws Exception {
        
        
        
        //1,conf
        Configuration conf = new Configuration(true);
        
        //2,job
        Job job = Job.getInstance(conf);
        job.setJarByClass(TQMR.class);
        
        //3,input,output
        
        Path input = new Path("/tq/input");
        FileInputFormat.addInputPath(job, input);
        
        Path output = new Path("/tq/output");
        if(output.getFileSystem(conf).exists(output)){
            output.getFileSystem(conf).delete(output,true);
        }
        FileOutputFormat.setOutputPath(job, output );
        
        
        //4,map
        job.setMapperClass(TqMapper.class);
        job.setMapOutputKeyClass(TQ.class);
        job.setMapOutputValueClass(Text.class);
        
        
        //5,reduce
        job.setReducerClass(TqReducer.class);
        job.setNumReduceTasks(2);
        
        //6,other:sort,part..,group...
        job.setPartitionerClass(TqPartitioner.class);//分区
        job.setSortComparatorClass(TqSortComparator.class);//比较器
        job.setGroupingComparatorClass(TqGroupingComparator.class);//组排序器
        
        job.setCombinerClass(TqReducer.class);
        job.setCombinerKeyGroupingComparatorClass(TqGroupingComparator.class);
        //7,submit
        
        job.waitForCompletion(true);
        
        
        
        
        
        
        
        
    }
    

}

首先进行配置,和15类似,new一个conf,第二设置输入输出路径,在这里进行一个判断,如果存在文件递归的删除。要保证输出的文件目录里没有文件


接下来设置mapper。因为job.setMapperClass()先定义key.就是mapper类
然后就是天气(tq)类,必须要实现WritableComprable接口,因为他的作用就是序列化反序列化输入输出这种。泛型类就应该是自己。作为自定义的key,就应该对属性进行规范。类似于javabean。该项目中key需要年月日,温度。这里设置为private int。然后get,set方法。

@Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.getYear());
        out.writeInt(this.getMonth());
        out.writeInt(this.getDay());
        out.writeInt(this.getWd());
        
    }
@Override
    public void readFields(DataInput in) throws IOException {
        this.year=in.readInt();
        this.month=in.readInt();
        this.day=in.readInt();
        this.wd=in.readInt();
    }

重写write方法,这里out,把内容写出去.这里最好用this.getxx()。通过getset来保证他的权限访问。readint表示读进来,完成反序列化,还原了这些属性。this.setYear(in.readInt());比较好。

这里注意写的顺序和读的顺序必须一致,

如果类型不一致,比如一个是int,另一个是string,不匹配就会报eof异常。
接下来写比较器

@Override
    public int compareTo(TQ that) {
        
        int c1=Integer.compare(this.getYear(), that.getYear());
        if(c1==0){
            int c2=Integer.compare(this.getMonth(), that.getMonth());
            if(c2==0){
                return Integer.compare(this.getDay(), that.getDay());
            }
            return c2;
        }
    return c1;
    }
    

在这里先设定一个标准的排序规则。c1==0就表示两个值相等。这是基本比较器,和业务相关不大

import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class TQ implements  WritableComparable<TQ>{

    private int year;
    private int month;
    private int day;
    private int wd;
    
    
    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public int getDay() {
        return day;
    }

    public void setDay(int day) {
        this.day = day;
    }

    public int getWd() {
        return wd;
    }

    public void setWd(int wd) {
        this.wd = wd;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeInt(month);
        out.writeInt(day);
        out.writeInt(wd);
        
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.year=in.readInt();
        this.month=in.readInt();
        this.day=in.readInt();
        this.wd=in.readInt();
    }

    @Override
    public int compareTo(TQ that) {
        
        int c1=Integer.compare(this.getYear(), that.getYear());
        if(c1==0){
            int c2=Integer.compare(this.getMonth(), that.getMonth());
            if(c2==0){
                return Integer.compare(this.getDay(), that.getDay());
            }
            return c2;
        }
        
        return c1;
    }
    

}

接下来可以写mapper类了


public class TqMapper extends Mapper<LongWritable, Text, TQ, Text>

在这里可以看到,输出的是天气类型,至于valueout如果是text类型就是输出34c,如果是intwritable就是34。不过下面的方法也把c截取掉了,输出了text类型
接下来就重写map方法、
为了方便我将注释写在代码里

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

public class TqMapper extends Mapper<LongWritable, Text, TQ, Text> {
    
    
    
    TQ tq= new TQ();//好处是:因为map循环调用,写在内部的话每次循环会重新调用
    Text vwd = new Text();
    
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        
        
        //数据:value:  1949-10-01 14:21:02    34c  >>  TQ

        try {
            String[] strs = StringUtils.split(value.toString(), '\t');//用制表符来切
            
            SimpleDateFormat  sdf = new SimpleDateFormat("yyyy-MM-dd");
            Date date = null;
        
            date = sdf.parse(strs[0]);//就是第一个value:  1949-10-01 14:21:02    34c  
            
            Calendar  cal = Calendar.getInstance();//日历类
            cal.setTime(date);//拿到时间
            
            tq.setYear(cal.get(Calendar.YEAR));//得到年份
            tq.setMonth(cal.get(Calendar.MONTH)+1);//cal月份少一个
            tq.setDay(cal.get(Calendar.DAY_OF_MONTH));//这是月的第几天
            
            int wd = Integer.parseInt(strs[1].substring(0, strs[1].length()-1));
            tq.setWd(wd);//设置温度,把c去掉strs[1].substring(0, strs[1].lastIndexof("c"))也可以
            vwd.set(wd+"");//因为输出为text类型,所以加“”,也可以把输出设置为intwritabl,就不用加“了”
            
            context.write(tq, vwd);
            
        } catch (ParseException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
                
        
    }
    

}

此时

                job.setMapperClass(TqMapper.class);
                job.setMapOutputKeyClass(TQ.class);
                job.setMapOutputValueClass(Text.class);

已经设置完成,这里要注意设置 job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(Text.class);。

接下来应该自定义比较器。而不应忙着reduce。继承WritableComparator。
重写compare方法
实现天气年月正序,温度倒序

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TqSortComparator  extends  WritableComparator {

    
    
    public TqSortComparator() {
        super(TQ.class,true);
    }
    
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        
        TQ t1 = (TQ)a;//向下转型,强转
        TQ t2 = (TQ)b;

        int c1=Integer.compare(t1.getYear(), t2.getYear());//按照正序来比较
        if(c1==0){
            int c2=Integer.compare(t1.getMonth(), t2.getMonth());
            if(c2==0){
                return -Integer.compare(t1.getWd(), t2.getWd());//因为想要逆序输出附上compare方法代码 return (x < y) ? -1 : ((x == y) ? 0 : 1);
    
            }
            return c2;
        }
        
        return c1;
        
        
    }
    
    
    
    
}

接下来写自定义分区器。分区器定义数据分发策略,默认使用哈希取模的方式

 job.setPartitionerClass(TqPartitioner.class);

key是天气tq

package com.sxt.hadoop.mr.tq;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class TqPartitioner  extends  Partitioner<TQ, Text> {

    @Override
    public int getPartition(TQ key, Text value, int numPartitions) {
        //获取当前分区所属编号。因为分区的数量来自于reduce,所以先设置reducetask数量,程序中设置job.setNumReduceTasks(2);说明分区数两个
        
        return key.getYear() % numPartitions;//模2分区就是0和1
    }
    

}

接下来就是reduce端发挥作用了。因为年月日正序,温度倒序,所以要二次排序。自定义组排序器

job.setGroupingComparatorClass(TqGroupingComparator.class);
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TqGroupingComparator  extends WritableComparator {
    
    
    public TqGroupingComparator() {
        super(TQ.class,true);
    }

    public int compare(WritableComparable a, WritableComparable b) {
            
            TQ t1 = (TQ)a;
            TQ t2 = (TQ)b;
    
            int c1=Integer.compare(t1.getYear(), t2.getYear());
            if(c1==0){
                return Integer.compare(t1.getMonth(), t2.getMonth());//相同的年月做一组,并没有改变修改原则,只是对界定产生了修改
            }           
            return c1;
            
            
    }
    
}

开始设置reduce
因为map结构如下所示
protected void map(LongWritable key, Text value,Context context)

         int wd = Integer.parseInt(strs[1].substring(0, strs[1].length()-1));
            tq.setWd(wd);//设置温度,把c去掉strs[1].substring(0, strs[1].lastIndexof("c"))也可以
            vwd.set(wd+"");//因为输出为text类型,所以加“”,也可以把输出设置为intwritabl,就不用加“了”
            
            context.write(tq, vwd);
数据传过来有可能的情况
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TqReducer extends Reducer<TQ, Text, Text, Text> {
    
    Text rkey = new Text();
    Text rval = new Text();
    
    
    @Override
    protected void reduce(TQ key, Iterable<Text> vals, Context context)
            throws IOException, InterruptedException {
        
        
        int flg=0;
        int day=0;
        
        for (Text v : vals) {
            
            if(flg==0){
                day=key.getDay();   
                
                rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                rval.set(key.getWd()+"");//转为text。在这里注意一点,也是我想解释的一点,一般rval.set(v.get()),但是这里v:vals只是遍历,记录次数。但是这里我在map端天气类里set了,所以可以用key.getWd()
                context.write(rkey,rval );
                flg++;
                
            }
            if(flg!=0 && day != key.getDay()){//判断是不是同一天
                
                rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                rval.set(key.getWd()+"");
                context.write(rkey,rval );
                break;//得到两条后达到目标了就应该退出了  或者使用return也可以

            }
                            
            
        }
        
    }

}

启动集群


1.png 2.png 3.png 4.png
5.png
6.png
7.png

处理


8.png
得到结果

相关文章

网友评论

      本文标题:16利用hadoop进行天气情况的统计(附源码及详解)

      本文链接:https://www.haomeiwen.com/subject/fyuugctx.html