MapReduce中排序组件与序列化
1、什么是序列化?
将结构化对象转换成字节流以便于进行网络传输或写入持久存储的过程。
2、什么是反序列化?
将字节流转换为一系列结构化对象的过程。
3、序列化作用
1、作为一种持久化格式。
2、作为一种通信的数据格式。
3、作为一种数据拷贝、克隆机制。
3.1. 概述
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信 息(各种校验信息,header,继承体系等),不便于在网络中高效传输;所以,Hadoop 自己开发了一 套序列化机制(参与序列化的对象的类都要实现 Writable 接口),精简,高效 Hadoop 中的序列化框架已经对基本类型和 null 提供了序列化的实现了。分别是:
JAVA | Hadoop |
---|---|
byte | ByteWritable |
short | ShortWritable |
int | IntWritable |
long | LongWritable |
float | FloatWritable |
double | DoubleWritable |
String | Text |
null | NullWritable |
3.2. Java序列化
以案例为例说明:
public class Employee implements java.io.Serializable
{
public String name;
public String address;
public transient int SSN;
public int number;
public void mailCheck()
{
System.out.println("Mailing a check to " + name
+ " " + address);
}
}
3.3. Hadoop自定义序列化
案例
Writable 有一个子接口是 WritableComparable,WritableComparable 是既可实现序列化, 也可以对key进行比较,我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功 能。
下面通过一个案例来看一下具体的排序组件功能
需求:数据格式如下,要求第一列按照字典顺序进行排列,第一列相同的时候, 第二列按照升序进行排 列。

思路:
1、将 Mapper 端输出的<key,value>中的 key 和 value 组合成一个新的
KEY
, value值不变,也就是新的KEY
和value为:<(key,value),value>
2、在针对新的KEY
排序的时候, 如果 key 相同, 就再对value进行排序
代码实现
自定义序列化:MySortBean.java
/**
* 排序的实例化对象
*/
public class MySortBean implements WritableComparable<MySortBean> {
private String word;
private int num;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
// 输出文本的结果可以在这里改
@Override
public String toString() {
return "MySortBean{" +
"word='" + word + '\'' +
", num=" + num +
'}';
}
/**
* 比较器,按照我们自己指定的规则进行排序
* 排序规则:
* 要求第一列按照字典顺序进行排列,第一列相同的时候, 第二列按照升序进行排列。
* @param o
* @return
*/
@Override
public int compareTo(MySortBean o) {
//1、先对第一列进行排序 word
int result = this.word.compareTo(o.word);
//2、第一列相同的时候,num排序
if (result == 0){
return this.num - o.num;
}
return result;
}
//实现序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(word);
out.writeInt(num);
}
//实现反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.word = in.readUTF();
this.num = in.readInt();
}
}
排序的Mapper:SortMapper.java
/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN:文本偏移量
* VALUEIN:一行的文本
* KEYOUT:MySortBean
* VALUEOUT:NullWritable
*/
public class SortMapper extends Mapper<LongWritable,Text,MySortBean,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、拆分,将一行文本进行拆分
String[] split = value.toString().split(" ");
//2、将相应的值给写入到MySortBean对象中
MySortBean mySortBean = new MySortBean();
mySortBean.setWord(split[0]);
mySortBean.setNum(Integer.parseInt(split[1]));
//3、写入到上下文
context.write(mySortBean,NullWritable.get());
}
}
排序的Reducer:SortReducer.java
/**
* 排序的Reducer
* Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
* KEYIN:MySortBean map阶段的输出的key
* VALUEIN:NullWritable map阶段的输出的value
* KEYOUT:MySortBean
* VALUEOUT:NullWritable
*/
public class SortReducer extends Reducer<MySortBean,NullWritable,MySortBean,NullWritable>{
@Override
protected void reduce(MySortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//1、将k2 v2 转化为k3 v3 进行输出
context.write(key,NullWritable.get());
}
}
排序的Driver:JobMain.java
/**
* 排序的主类,将map和reducer串联起来
*/
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
//一、初始化一个job
Job job = Job.getInstance(configuration, "sort");
//二、设置job的相关信息 8个小步骤
//1、设置输入的路径,让程序找到源文件
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("D://input/test3.txt"));
//2、设置Mapper类,并设置k2 v2
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(MySortBean.class);
job.setMapOutputValueClass(NullWritable.class);
//3 4 5 6
//7、设置Reducer类,并设置k3 v3
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(MySortBean.class);
job.setOutputValueClass(NullWritable.class);
//8、设置输出的路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("D://wordoutsort"));
//三、等待完成
boolean b = job.waitForCompletion(true);
System.out.println(b);
System.exit(b ? 0 : 1);
}
}

结果

修改自定义序列化输出
@Override
public String toString() {
return word + ' ' + num ;
}

网友评论