排序
利用MapReduce默认的对Key进行排序
继承Partitioner类,重写getPartition使Mapper结果整体有序分到相应的Partition,
输入到Reduce分别排序。
利用全局变量统计位置
头文件:
. import java.io.IOException;
4.
5. import org.apache.hadoop.conf.Configuration;
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.io.IntWritable;
8. import org.apache.hadoop.io.Text;
9. import org.apache.hadoop.mapreduce.Job;
10. import org.apache.hadoop.mapreduce.Mapper;
11. import org.apache.hadoop.mapreduce.Partitioner;
12. import org.apache.hadoop.mapreduce.Reducer;
13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15. import org.apache.hadoop.util.GenericOptionsParser;
16.
其他部分:
public class Sort {
26. public static class SortMapper extends Mapper<Object, Text, IntWritable, IntWritable>{
27.
28. //直接输出key,value,key为需要排序的值,value任意
29. @Override
30. protected void map(Object key, Text value,
31. Context context)throws IOException, InterruptedException {
32. System.out.println("Key: "+key+" "+"Value: "+value);
33. context.write(new IntWritable(Integer.valueOf(value.toString())),new IntWritable(1));
34.
35. }
36. }
37.
38. public static class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
39. public static IntWritable lineNum = new IntWritable(1);//记录该数据的位置
40.
41. //查询value的个数,有多少个就输出多少个Key值。
42. @Override
43. protected void reduce(IntWritable key, Iterable<IntWritable> value,
44. Context context) throws IOException, InterruptedException {
45.
46. System.out.println("lineNum: "+lineNum);
47.
48. for(IntWritable i:value){
49. context.write(lineNum, key);
50. }
51. lineNum = new IntWritable(lineNum.get()+1);
52. }
53. }
54.
55.
56. public static class SortPartitioner extends Partitioner<IntWritable, IntWritable>{
57.
58. //根据key对数据进行分派
59. @Override
60. public int getPartition(IntWritable key, IntWritable value, int partitionNum) {
61. System.out.println("partitionNum: "+partitionNum);
62. int maxnum = 23492;//输入的最大值,自己定义的。mapreduce 自带的有采样算法和partition的实现可以用,此例没有用。
63. int bound = maxnum/partitionNum;
64. int keyNum = key.get();
65. for(int i=0;i<partitionNum;i++){
66. if(keyNum>bound*i&&keyNum<=bound*(i+1)){
67. return i;
68. }
69. }
70. return -1;
71. }
72.
73. }
74.
75.
76. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
77. Configuration conf = new Configuration();
78. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
79.
80. if(otherArgs.length<2){
81. System.out.println("input parameters errors");
82. System.exit(2);
83. }
84.
85. Job job= new Job(conf);
86. job.setJarByClass(Sort.class);
87. job.setMapperClass(SortMapper.class);
88. job.setPartitionerClass(SortPartitioner.class);//此例不许要combiner,需要设置Partitioner
89. job.setReducerClass(SortReducer.class);
90. job.setOutputKeyClass(IntWritable.class);
91. job.setOutputValueClass(IntWritable.class);
92.
93. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
94. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
95.
96. System.exit(job.waitForCompletion(true)?0:1);
97. }
98.
99. }
网友评论