所谓的二次排序就是对文件中先对第一个字段排序,如果第一个字段相同,则根据第一个字段再对第二个字段进行排序(即先根据键排序,然后在根据相同的键对其值进行排序),先看最基础的效果图就明白了:
image.png一、自定义一个实现 WritableComparable 接口的类型,用于对数据的排序:
public class SortWritable implements WritableComparable<SortWritable> {
//分别代表第一个字段和第二个字段
private String first;
private int second;
public SortWritable() {
}
public SortWritable(String first, int second) {
this.set(first,second);
}
//为方便调用我们创建一个方法
public void set(String first, int second){
this.first = first;
this.second = second;
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
//先根据第一个字段比较排序,如果相同在根据第二个比较排序
public int compareTo(SortWritable o) {
int comp = this.getFirst().compareTo(o.getFirst());
if(0 == comp){
return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond()));
}
return comp;
}
//序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(first);
dataOutput.writeInt(second);
}
//反序列化
public void readFields(DataInput dataInput) throws IOException {
this.first = dataInput.readUTF();
this.second = dataInput.readInt();
}
//一下三个方法都是快捷生成
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SortWritable that = (SortWritable) o;
if(second != that.second) return false;
return first != null ? first.equals(that.first) : that.first == null;
}
@Override
public int hashCode() {
int result = first != null ? first.hashCode() : 0;
result = 31 * result + second;
return result;
}
@Override
public String toString() {
return "SortWritable{" +
"first='" + first + '\'' +
", second=" + second +
'}';
}
}
二、创建驱动类基础框架:
public class MySecondSortMR extends Configured implements Tool {
public int run(String[] args) throws Exception {
//驱动
//1) 获取 Configuration
Configuration configuration = this.getConf();
//2) 创建 job
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
//3.1) 输入
Path inputPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inputPath);
//3.2 设置 map
job.setMapperClass(SecondMapper.class);
job.setMapOutputKeyClass(SortWritable.class);
job.setMapOutputValueClass(IntWritable.class);
//1.分区
//job.setPartitionerClass(FirstPartitioner.class);
//2.压缩
//configuration.set("mapreduce.map.output.compress","true");
//configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
//3.分组
//job.setGroupingComparatorClass(FirstGrouping.class);
//3.3 设置 reduce
job.setReducerClass(SecondReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置 reduce 的任务个数
//job.setNumReduceTasks(2);
//3.4 设置输出
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//4.提交
boolean sucess = job.waitForCompletion(true);
return sucess ? 0 : 1;
}
public static void main(String[] args) {
//当打包成 jar 之前 记得注释掉
args = new String[]{
"hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/secondsort.txt",
"hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output9"
};
Configuration configuration = new Configuration();
try {
//先判断文件夹是否存在
Path fileOutPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(fileOutPath)){
fileSystem.delete(fileOutPath, true); //删除
}
int status = ToolRunner.run(configuration, new MySecondSortMR(), args);
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
}
三、创建一个Mapper的子类,用于对数据的切分及逻辑的的操作(这里值得注意的是输出的键是我们自定义的类型SortWritable):
public static class SecondMapper extends Mapper<LongWritable, Text, SortWritable, IntWritable>{
private SortWritable outputKey = new SortWritable();
private IntWritable outputValue = new IntWritable();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(" ");
if(2 != values.length) return;
outputKey.set(values[0], Integer.valueOf(values[1]));
outputValue.set(Integer.valueOf(values[1]));
context.write(outputKey, outputValue);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
}
}
四、创建 Reducer 的子类(数据类型的输入要和mapper的输出类型要一致):
public static class SecondReduce extends Reducer<SortWritable, IntWritable, Text, IntWritable>{
private Text outputKey = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
}
@Override
protected void reduce(SortWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for(IntWritable value : values){
outputKey.set(key.getFirst());
context.write(outputKey, value);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
}
}
五、运行程序如果没有问题的话即可直接在网页 /user/hdfs/output9 查看生成的数据,接下来我们使用命令查看排序的结果:
bin/hdfs dfs -text /user/hdfs/output9/part*
效果图如下:
image.png
六、为达到优化效果,我们可做如下设置:
image.png由于键值的组合,为保原有的分区与分组原有的结构,我们需要去自定义分区与分组类。
七、自定义一个实现RawComparator接口的分组类:
public class FirstGrouping implements RawComparator<SortWritable> {
//通过字节数组进行对比
public int compare(byte[] bytes1, int i, int i1, byte[] bytes2, int i2, int i3) {
//int有四个字节,因此从 0 开始 到 i - 4
return WritableComparator.compareBytes(bytes1,0,i1 - 4,bytes2,0,i3-4);
}
//通过对象进行对比
public int compare(SortWritable o1, SortWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}
}
八、自定义一个继承自Partitioner的分区类:
public class FirstPartitioner extends Partitioner<SortWritable, IntWritable> {
public int getPartition(SortWritable key, IntWritable intWritable, int i) {
return (key.getFirst().hashCode() & 2147483647) % i;
}
}
接下来我们我们再去掉驱动类的 run() 方法中的 分组和分区的注释语句再运行程序,同样得到我们所需要的效果。。。
image.png
感谢老师与各位大神的指点,感恩一切。。。
网友评论