![](https://img.haomeiwen.com/i13219358/340c7aec7cf24315.png)
思考:
要确定K,V.年月怎么分?年月应该排序,正序排出,而关于温度希望在map端本身倒序输出。这样就可以直接取出最顶两个。一天可能有多个高温。字符串34c,怎么把c去掉。自定义类应该实现比较器
对于这个例子,我先贴出代码,然后再分部分进行分析
首先,新建工程,我们这里使用15提到的工程,在此基础上新建项目
![](https://img.haomeiwen.com/i13219358/53eb795eaa0a3dae.png)
代码如下
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TQMR {
public static void main(String[] args) throws Exception {
//1,conf
Configuration conf = new Configuration(true);
//2,job
Job job = Job.getInstance(conf);
job.setJarByClass(TQMR.class);
//3,input,output
Path input = new Path("/tq/input");
FileInputFormat.addInputPath(job, input);
Path output = new Path("/tq/output");
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output,true);
}
FileOutputFormat.setOutputPath(job, output );
//4,map
job.setMapperClass(TqMapper.class);
job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(Text.class);
//5,reduce
job.setReducerClass(TqReducer.class);
job.setNumReduceTasks(2);
//6,other:sort,part..,group...
job.setPartitionerClass(TqPartitioner.class);//分区
job.setSortComparatorClass(TqSortComparator.class);//比较器
job.setGroupingComparatorClass(TqGroupingComparator.class);//组排序器
job.setCombinerClass(TqReducer.class);
job.setCombinerKeyGroupingComparatorClass(TqGroupingComparator.class);
//7,submit
job.waitForCompletion(true);
}
}
首先进行配置,和15类似,new一个conf,第二设置输入输出路径,在这里进行一个判断,如果存在文件递归的删除。要保证输出的文件目录里没有文件
![](https://img.haomeiwen.com/i13219358/f69779e44d73000b.png)
接下来设置mapper。因为job.setMapperClass()先定义key.就是mapper类
然后就是天气(tq)类,必须要实现WritableComprable接口,因为他的作用就是序列化反序列化输入输出这种。泛型类就应该是自己。作为自定义的key,就应该对属性进行规范。类似于javabean。该项目中key需要年月日,温度。这里设置为private int。然后get,set方法。
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.getYear());
out.writeInt(this.getMonth());
out.writeInt(this.getDay());
out.writeInt(this.getWd());
}
@Override
public void readFields(DataInput in) throws IOException {
this.year=in.readInt();
this.month=in.readInt();
this.day=in.readInt();
this.wd=in.readInt();
}
重写write方法,这里out,把内容写出去.这里最好用this.getxx()。通过getset来保证他的权限访问。readint表示读进来,完成反序列化,还原了这些属性。this.setYear(in.readInt());比较好。
这里注意写的顺序和读的顺序必须一致,
如果类型不一致,比如一个是int,另一个是string,不匹配就会报eof异常。
接下来写比较器
@Override
public int compareTo(TQ that) {
int c1=Integer.compare(this.getYear(), that.getYear());
if(c1==0){
int c2=Integer.compare(this.getMonth(), that.getMonth());
if(c2==0){
return Integer.compare(this.getDay(), that.getDay());
}
return c2;
}
return c1;
}
在这里先设定一个标准的排序规则。c1==0就表示两个值相等。这是基本比较器,和业务相关不大
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class TQ implements WritableComparable<TQ>{
private int year;
private int month;
private int day;
private int wd;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getDay() {
return day;
}
public void setDay(int day) {
this.day = day;
}
public int getWd() {
return wd;
}
public void setWd(int wd) {
this.wd = wd;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeInt(day);
out.writeInt(wd);
}
@Override
public void readFields(DataInput in) throws IOException {
this.year=in.readInt();
this.month=in.readInt();
this.day=in.readInt();
this.wd=in.readInt();
}
@Override
public int compareTo(TQ that) {
int c1=Integer.compare(this.getYear(), that.getYear());
if(c1==0){
int c2=Integer.compare(this.getMonth(), that.getMonth());
if(c2==0){
return Integer.compare(this.getDay(), that.getDay());
}
return c2;
}
return c1;
}
}
接下来可以写mapper类了
![](https://img.haomeiwen.com/i13219358/0a57ff970351cc41.png)
public class TqMapper extends Mapper<LongWritable, Text, TQ, Text>
在这里可以看到,输出的是天气类型,至于valueout如果是text类型就是输出34c,如果是intwritable就是34。不过下面的方法也把c截取掉了,输出了text类型
接下来就重写map方法、
为了方便我将注释写在代码里
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
public class TqMapper extends Mapper<LongWritable, Text, TQ, Text> {
TQ tq= new TQ();//好处是:因为map循环调用,写在内部的话每次循环会重新调用
Text vwd = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//数据:value: 1949-10-01 14:21:02 34c >> TQ
try {
String[] strs = StringUtils.split(value.toString(), '\t');//用制表符来切
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = null;
date = sdf.parse(strs[0]);//就是第一个value: 1949-10-01 14:21:02 34c
Calendar cal = Calendar.getInstance();//日历类
cal.setTime(date);//拿到时间
tq.setYear(cal.get(Calendar.YEAR));//得到年份
tq.setMonth(cal.get(Calendar.MONTH)+1);//cal月份少一个
tq.setDay(cal.get(Calendar.DAY_OF_MONTH));//这是月的第几天
int wd = Integer.parseInt(strs[1].substring(0, strs[1].length()-1));
tq.setWd(wd);//设置温度,把c去掉strs[1].substring(0, strs[1].lastIndexof("c"))也可以
vwd.set(wd+"");//因为输出为text类型,所以加“”,也可以把输出设置为intwritabl,就不用加“了”
context.write(tq, vwd);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
此时
job.setMapperClass(TqMapper.class);
job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(Text.class);
已经设置完成,这里要注意设置 job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(Text.class);。
接下来应该自定义比较器。而不应忙着reduce。继承WritableComparator。
重写compare方法
实现天气年月正序,温度倒序
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TqSortComparator extends WritableComparator {
public TqSortComparator() {
super(TQ.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TQ t1 = (TQ)a;//向下转型,强转
TQ t2 = (TQ)b;
int c1=Integer.compare(t1.getYear(), t2.getYear());//按照正序来比较
if(c1==0){
int c2=Integer.compare(t1.getMonth(), t2.getMonth());
if(c2==0){
return -Integer.compare(t1.getWd(), t2.getWd());//因为想要逆序输出附上compare方法代码 return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
return c2;
}
return c1;
}
}
接下来写自定义分区器。分区器定义数据分发策略,默认使用哈希取模的方式
job.setPartitionerClass(TqPartitioner.class);
key是天气tq
package com.sxt.hadoop.mr.tq;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class TqPartitioner extends Partitioner<TQ, Text> {
@Override
public int getPartition(TQ key, Text value, int numPartitions) {
//获取当前分区所属编号。因为分区的数量来自于reduce,所以先设置reducetask数量,程序中设置job.setNumReduceTasks(2);说明分区数两个
return key.getYear() % numPartitions;//模2分区就是0和1
}
}
接下来就是reduce端发挥作用了。因为年月日正序,温度倒序,所以要二次排序。自定义组排序器
job.setGroupingComparatorClass(TqGroupingComparator.class);
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TqGroupingComparator extends WritableComparator {
public TqGroupingComparator() {
super(TQ.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
TQ t1 = (TQ)a;
TQ t2 = (TQ)b;
int c1=Integer.compare(t1.getYear(), t2.getYear());
if(c1==0){
return Integer.compare(t1.getMonth(), t2.getMonth());//相同的年月做一组,并没有改变修改原则,只是对界定产生了修改
}
return c1;
}
}
开始设置reduce
因为map结构如下所示
protected void map(LongWritable key, Text value,Context context)
int wd = Integer.parseInt(strs[1].substring(0, strs[1].length()-1));
tq.setWd(wd);//设置温度,把c去掉strs[1].substring(0, strs[1].lastIndexof("c"))也可以
vwd.set(wd+"");//因为输出为text类型,所以加“”,也可以把输出设置为intwritabl,就不用加“了”
context.write(tq, vwd);
![](https://img.haomeiwen.com/i13219358/d0bcef7c598fba46.png)
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TqReducer extends Reducer<TQ, Text, Text, Text> {
Text rkey = new Text();
Text rval = new Text();
@Override
protected void reduce(TQ key, Iterable<Text> vals, Context context)
throws IOException, InterruptedException {
int flg=0;
int day=0;
for (Text v : vals) {
if(flg==0){
day=key.getDay();
rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
rval.set(key.getWd()+"");//转为text。在这里注意一点,也是我想解释的一点,一般rval.set(v.get()),但是这里v:vals只是遍历,记录次数。但是这里我在map端天气类里set了,所以可以用key.getWd()
context.write(rkey,rval );
flg++;
}
if(flg!=0 && day != key.getDay()){//判断是不是同一天
rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
rval.set(key.getWd()+"");
context.write(rkey,rval );
break;//得到两条后达到目标了就应该退出了 或者使用return也可以
}
}
}
}
启动集群
![](https://img.haomeiwen.com/i13219358/be22e688dc23d96b.png)
![](https://img.haomeiwen.com/i13219358/a2f0644b8de7d928.png)
![](https://img.haomeiwen.com/i13219358/b69a5d876b92a275.png)
![](https://img.haomeiwen.com/i13219358/f41513acb2a5d618.png)
![](https://img.haomeiwen.com/i13219358/72028d4348129897.png)
![](https://img.haomeiwen.com/i13219358/75ff35da7159e1d1.png)
![](https://img.haomeiwen.com/i13219358/d630b01ab06cb095.png)
处理
![](https://img.haomeiwen.com/i13219358/4ff1fd482c465f69.png)
![](https://img.haomeiwen.com/i13219358/aec563601a9c8d69.png)
网友评论