这次尝试区分器的使用
很多证件的号码会根据前缀的几个字符来确定一些信息,比如省份等,手机号也有这样的特征,通过前缀来区分是移动还是电信联通等,将号码根据不通的前缀汇总到不通的文件输出,这就是区分器的用途,还和上次一样,咱们先来创建一个pom.xml,然后创建FlowBean对象用来保存上传下载流量。接下来做的就不同了,我们要自己定义一个区分器:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
/**
* 区分器,
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
public static HashMap<String, Integer> proviceDict = new HashMap<String, Integer>();
static{
proviceDict.put("137",0);
proviceDict.put("133",1);
proviceDict.put("138",2);
proviceDict.put("135",3);
}
@Override
public int getPartition(Text key, FlowBean value, int numPartitions){
String prefix = key.toString().substring(0,3);
Integer province = proviceDict.get(prefix);
return province==null?4:province;
}
}
区分器一般都是通过一个hashMap完成的,这里我们分成5个区,为啥不是4个?因为有其他前缀的号码会被归为最后一类,然后就可以来写mapreduce主程序了:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phone = fields[0];
long upFlow = Long.parseLong(fields[1]);
long dFlow = Long.parseLong(fields[2]);
context.write(new Text(phone), new FlowBean(upFlow,dFlow));
}
}
static class FlowCountReducer extends Reducer<Text, FlowBean, Text,Text>{
@Override
protected void reduce(Text key, Iterable<FlowBean> value, Context context ) throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_dFlow = 0;
for (FlowBean bean:value){
sum_upFlow+=bean.getUpFlow();
sum_dFlow+=bean.getdFlow();
}
context.write(key, new Text((new FlowBean(sum_upFlow,sum_dFlow)).toString()));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(FlowBean.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
//指定分区器
job.setPartitionerClass(ProvincePartitioner.class);
//指定相应分区数量的reduceTask
job.setNumReduceTasks(5);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
需要注意,主程序里面指定了区分器和数量
然后打包上传,让我们来看看效果怎么样吧
ubuntu@hadoop1:~/text$ hdfs dfs -ls /output/partitioner1
Found 6 items
-rw-r--r-- 3 ubuntu supergroup 0 2019-07-31 20:06 /output/partitioner1/_SUCCESS
-rw-r--r-- 3 ubuntu supergroup 28 2019-07-31 20:06 /output/partitioner1/part-r-00000
-rw-r--r-- 3 ubuntu supergroup 84 2019-07-31 20:06 /output/partitioner1/part-r-00001
-rw-r--r-- 3 ubuntu supergroup 28 2019-07-31 20:06 /output/partitioner1/part-r-00002
-rw-r--r-- 3 ubuntu supergroup 28 2019-07-31 20:06 /output/partitioner1/part-r-00003
-rw-r--r-- 3 ubuntu supergroup 80 2019-07-31 20:06 /output/partitioner1/part-r-00004
输出文件从之前的1个变成了5个,可见是成功了的,然后查看其中的一个,果然只保存他的分区的内容
ubuntu@hadoop1:~/text$ hdfs dfs -cat /output/partitioner1/part-r-00004
14838244322 (900 500 1400)
18273723427 (300 800 1100)
19283413241 (500 200 700)
网友评论