总目录:https://www.jianshu.com/p/e406a9bc93a9
Hadoop - 子目录:https://www.jianshu.com/p/9428e443b7fd
天气案例
经典案例
myclient.java :客户端
package com.SL.tq;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
public class MyClient {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration(true);
Job job = Job.getInstance(conf);
job.setJarByClass(com.SL.tq.MyClient.class);
//-----conf-----
//++map:
// 输入格式化:job.setInputFormatClass(ooxx.class);
job.setMapperClass(TMaper.class);
job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(IntWritable.class);
job.setPartitionerClass(TPartitioner.class);
job.setSortComparatorClass(TsortComparator.class);
// job.setCombinerClass(TCombiner.class);
//++reduce:
job.setGroupingComparatorClass(TGroupingComparator.class);
job.setReducerClass(TReducer.class);
//++输入输出
Path input = new Path("hdfs://192.168.110.110:9000/data/tq/input");
FileInputFormat.addInputPath(job,input);
Path output = new Path("hdfs://192.168.110.110:9000/data/tq/output");
if(output.getFileSystem(conf).exists(output)) {
output.getFileSystem(conf).delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
job.setNumReduceTasks(2);
//--------------
job.waitForCompletion(true);
}
}
TMaper.java : map程序
package com.SL.tq;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.text.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TMaper extends Mapper<LongWritable,Text,TQ,IntWritable>{
TQ mkey = new TQ();
IntWritable mval = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TQ, IntWritable>.Context context)
throws IOException, InterruptedException {
// 1949-10-01 14:21:02 34c
try{
String[] strs = StringUtils.split(value.toString(),"\t");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = sdf.parse(strs[0]);
Calendar cal = Calendar.getInstance();
cal.setTime(date);
mkey.setYear(cal.get(Calendar.YEAR));
mkey.setMonth(cal.get(Calendar.MONTH)+1);
mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
int wd = Integer.parseInt(strs[1].substring(0,strs[1].length()-1));
mkey.setWd(wd);
mval.set(wd);
System.out.println(mkey+" "+mval);
context.write(mkey, mval);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
TPartitioner.java :分区器
package com.SL.tq;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class TPartitioner extends Partitioner<TQ, IntWritable> {
@Override
public int getPartition(TQ key, IntWritable value, int numPartitions) {
return key.hashCode() % numPartitions;
}
}
TsortComparator.java :排序比较器
package com.SL.tq;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TsortComparator extends WritableComparator{
public TsortComparator() {
super(TQ.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b){
TQ t1 = (TQ)a;
TQ t2 = (TQ)b;
int c1 = Integer.compare(t1.getYear(), t2.getYear());
if (c1 == 0) {
int c2 = Integer.compare(t1.getMonth(),t2.getMonth());
if(c2 == 0) {
return Integer.compare(t1.getWd(),t2.getWd());
}
return c2;
}
return c1;
}
}
TQ.java :序列化
package com.SL.tq;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class TQ implements WritableComparable<TQ>{
private int year;
private int month;
private int day;
private int wd;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getDay() {
return day;
}
public void setDay(int day) {
this.day = day;
}
public int getWd() {
return wd;
}
public void setWd(int wd) {
this.wd = wd;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeInt(day);
out.writeInt(wd);
}
@Override
public void readFields(DataInput in) throws IOException {
this.year=in.readInt();
this.month=in.readInt();
this.day=in.readInt();
this.wd=in.readInt();
}
@Override
public int compareTo(TQ that) {
// 约定俗称:日期正序
int c1 = Integer.compare(this.year,that.getYear());
if(c1 == 0) {
int c2 = Integer.compare(this.month,that.getMonth());
if (c2 == 0) {
return Integer.compare(this.day, that.getDay());
}
return c2;
}
return c1;
}
}
TsortComparator .java :自定义排序
package com.SL.tq;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TsortComparator extends WritableComparator{
public TsortComparator() {
super(TQ.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b){
TQ t1 = (TQ)a;
TQ t2 = (TQ)b;
int c1 = Integer.compare(t1.getYear(), t2.getYear());
if (c1 == 0) {
int c2 = Integer.compare(t1.getMonth(),t2.getMonth());
if(c2 == 0) {
return Integer.compare(t1.getWd(),t2.getWd());
}
return c2;
}
return c1;
}
}
网友评论