思路:
每年、每个月
最高
2天
1天多条记录?
进一部思考
年月分组
温度升序
key中要包含时间和温度呀!
MR原语:相同的key分到一组
通过GroupCompartor设置分组规则
思考
1,MR
* 保证原语
怎样划分数据,怎样定义一组
2,k:v映射的设计
考虑reduce的计算复杂度
3,能不能多个reduce
倾斜:抽样
集群资源情况
4,自定义数据类型
自定义类型
分区
排序比较器
数据案例
1949-10-01 14:21:02 34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c
public class MyTQ {
public static void main(String[] args) throws Exception {
//1,conf
Configuration conf = new Configuration(true);
//2,job
Job job = Job.getInstance(conf);
job.setJarByClass(MyTQ.class);
//3,输入源输出元
Path input = new Path("/data/tq/input");
FileInputFormat.addInputPath(job, input);
Path output = new Path("/data/tq/output");
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output, true);
}
FileOutputFormat.setOutputPath(job, output );
//4,map
job.setMapperClass(TqMapper.class);
job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(IntWritable.class);
job.setPartitionerClass(TqPartitioner.class);
job.setSortComparatorClass(TqSortComparator.class);
job.setCombinerClass(TqReducer.class);
//5,reduce
// 分组比较器
job.setGroupingComparatorClass(TqGroupingComparator.class);
job.setReducerClass(TqReducer.class);
job.setNumReduceTasks(2);
job.setCombinerKeyGroupingComparatorClass(TqGroupingComparator.class);
//7,submit
job.waitForCompletion(true);
}
}
自定义类型
// 可比较、序列化的 数据结构
// 实现接口
public class TQ implements WritableComparable<TQ>{
private int year;
private int month;
private int day;
private int wd;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getDay() {
return day;
}
public void setDay(int day) {
this.day = day;
}
public int getWd() {
return wd;
}
public void setWd(int wd) {
this.wd = wd;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeInt(day);
out.writeInt(wd);
}
@Override
public void readFields(DataInput in) throws IOException {
this.year=in.readInt();
this.month=in.readInt();
this.day=in.readInt();
this.wd=in.readInt();
}
@Override
public int compareTo(TQ that) {
// 约定俗成:日期正序
int c1=Integer.compare(this.getYear(), that.getYear());
if(c1==0) {
int c2 = Integer.compare(this.getMonth(), that.getMonth());
if(c2==0) {
// 比完日期,就没事了
return Integer.compare(this.getDay(), that.getDay());
}
return c2;
}
return c1;
}
}
Map阶段
public class TqMapper extends Mapper<LongWritable, Text, TQ, IntWritable> {
// 放外面,不用每次都创建!
TQ mkey = new TQ();
IntWritable mval = new IntWritable();
@Override
protected void map(LongWritable key, Text value,Context context)
throws Exception {
try {
// value: 1949-10-01 14:21:02 34c >> TQ
String[] strs = StringUtils.split(value.toString(), '\t');
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = sdf.parse(strs[0]);
Calendar cal = Calendar.getInstance();
cal.setTime(date);
mkey.setYear(cal.get(Calendar.YEAR));
mkey.setMonth(cal.get(Calendar.MONTH)+1);
mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
int wd = Integer.parseInt(strs[1].substring(0, strs[1].length()-1));
mkey.setWd(wd);
mval.set(wd);
context.write(mkey, mval);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
分组
对key做分组,规划
map输出的每一条记录<key, value>都要调用Partitioner 方法一次
不应该很复杂,不要造成数据倾斜!数据抽样!
public class TqPartitioner extends Partitioner<TQ, IntWritable> {
@Override
public int getPartition(TQ key, IntWritable value, int numPartitions) {
// return key.hashCode() % numPartitions;
return key.getYear() % numPartitions;
}
}
排序
达到阈值时,开始溢写
// 继承类
public class TqSortComparator extends WritableComparator {
public TqSortComparator() {
super(TQ.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TQ t1 = (TQ)a;
TQ t2 = (TQ)b;
int c1=Integer.compare(t1.getYear(), t2.getYear());
if(c1==0){
int c2=Integer.compare(t1.getMonth(), t2.getMonth());
if(c2==0){
// 从大到小,倒序
return -Integer.compare(t1.getWd(), t2.getWd());
}
return c2;
}
return c1;
}
}
分组比较器
月份相同就是1组,并过滤同一天的数据
public class TqGroupingComparator extends WritableComparator {
public TqGroupingComparator() {
super(TQ.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
TQ t1 = (TQ)a;
TQ t2 = (TQ)b;
int c1=Integer.compare(t1.getYear(), t2.getYear());
if(c1==0){
return Integer.compare(t1.getMonth(), t2.getMonth());
}
return c1;
}
}
Reduce阶段
只找前2条记录!
public class TqReducer extends Reducer<TQ, IntWritable, Text, IntWritable> {
Text rkey = new Text();
IntWritable rval = new IntWritable();
@Override
protected void reduce(TQ key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int flg = 0;
int day = 0;
// 1970 01 20 34 34
// 1970 01 12 28 28
for (IntWritable v : values) { // 根本就不用v,key跟着变动的
if (flg == 0) {
// 1970-01-20:34
rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
rval.set(key.getWd());
context.write(rkey,rval );
day = key.getDay();
flg++;
}
// 将同一天,多条记录排除
if(flg!=0 && day != key.getDay()){
rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
rval.set(key.getWd());
context.write(rkey,rval);
break;
}
}
}
}
输出结果
1949-10-1 38
1949-10-2 36
1950-1-1 32
1950-10-2 41
1950-10-1 37
1951-7-3 47
1951-7-2 46
1951-12-1 23
总结
自定义数据类型Weather
包含时间
包含温度
自定义排序比较规则
自定义分组比较
年月相同被视为相同的key
那么reduce迭代时,相同年月的记录有可能是同一天的
reduce中需要判断是否同一天
注意OOM
数据量很大
全量数据可以切分成最少按一个月份的数据量进行判断
这种业务场景可以设置多个reduce
通过实现partition
image.png
网友评论