首先,我们需要弄懂为什么要为什么要用RecordReader,还要知道为什么要自定义RecordReader。
在我们对文件切片之后,我们需要将切片后的文件转为key-value的键值对。RecordReader的作用就是这个。
RecordReader是一个抽象类,在实际运用中,常见的子类有LineRecordReader、SequenceFileRecordReader、CombineFileRecordReader、KeyValueLineRecordReader、DBRecordReader1.LineRecordReader:是用每行的偏移量作为map的key,每行的内容作为map的value
2.CombineFileRecordReader:处理CombineInputSplit里的每一个chunk的RecordReader,CombineInputSplit包含不同的小文件chunk信息
3.KeyValueLineRecordReader:根据指定的分割符却切分每一行数据,如果没有指定分割符,那么key就是整行的文本,value就是空
4.DBRecordReader:从数据库表中读取数据
上述的这些内置的RecordReader能解决部分问题,但是在实际运用中还会有许多特殊的使用场景,此时我们就需要自定义RecordReader来处理问题。
1.自定义RecordReader:
继承抽象类RecordReader,实现RecordReader的一个实例;
实现自定义InputFormat类,重写InputFormat中createRecordReader()方法,返回值是自定义的RecordReader实例;
配置job.setInputFormatClass()设置自定义的InputFormat实例;
image.png
2.RecordReader类为一个抽象类,共有下面这些抽象方法,
//实现了java7的新特性AuotCloseable
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
//初始化RecordReader,只能被调用一次。
public abstract void initialize(InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException;
//读取下一个key/value键值对
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
//获取当前的key
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
//获取当前的vlaue
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
//进度
public abstract float getProgress() throws IOException, InterruptedException;
//关闭RecordReader
public abstract void close() throws IOException;
}
3.案例
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
// 定义类继承FileInputFormat
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(split, context);
return recordReader;
}
}
public class WholeRecordReader extends RecordReader<Text, BytesWritable>{
private Configuration configuration;
private FileSplit split;
private boolean isProgress= true;
private BytesWritable value = new BytesWritable();
private Text k = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//InputSplit类里面没有split这个切片的详细信息,经查看在他的子类FileSplit中有这个属性,所以要强转
this.split = (FileSplit)split;
configuration = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProgress) {
// 1 定义缓存区
byte[] contents = new byte[(int)split.getLength()];
FileSystem fs = null;
FSDataInputStream fis = null;
try {
// 2 获取文件系统
Path path = split.getPath();
fs = path.getFileSystem(configuration);
// 3 读取数据
fis = fs.open(path);
// 4 读取文件内容
IOUtils.readFully(fis, contents, 0, contents.length);
// 5 输出文件内容
value.set(contents, 0, contents.length);
// 6 获取文件路径及名称
String name = split.getPath().toString();
// 7 设置输出的key值
k.set(name);
} catch (Exception e) {
}finally {
IOUtils.closeStream(fis);
}
isProgress = false;
return true;
}
return false;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return k;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
}
}
public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, values.iterator().next());
}
}
public class SequenceFileDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "e:/input/inputinputformat", "e:/output1" };
// 1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar包存储位置、关联自定义的mapper和reducer
job.setJarByClass(SequenceFileDriver.class);
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);
// 7设置输入的inputFormat
job.setInputFormatClass(WholeFileInputformat.class);
// 8设置输出的outputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 3 设置map输出端的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 4 设置最终输出端的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
网友评论