有时会有这样的逻辑需求,一个 map 任务需要处理一个文件中的所有内容或是
把整个文件作为一条记录处理。
即使不分割文件,仍然需要一个 RecordReader 来读取文件的所有内容作为record的值。
Hadoop-version : 2.7.1
Jdk-version : 1.8
Maven-version : 3.3.3
完成此功能需要重写两个类
1. 继承 FileInputFormat 类 重写其中的两个方法以下代码为列子
public class WholeFileInputFormatextends FileInputFormat {
// createRecordReader 方法是返回一个定制的RecordReader实现
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {
WholeFileRecordReader reader =new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
// isSplitable 方法是说明此文件是否分割 返回 true 为分割 返回 false 为不分割
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
2.继承 RecordReader 类 重写其中的三个方法
public class WholeFileRecordReader extends RecordReader {
private FileSplitfileSplit;
private Configurationconf;
private Textkey =new Text();
private BytesWritablevalue =new BytesWritable();
private boolean processed =false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
/**
* 这个方法会被调用两次
* 第一次读取整个文件流 将其转换为 BytesWritable对象
* 并且将 processed 设置为 true
* 第二次 在 processed 为true时
* 将不会对数据进行加载
* 并且返回 false 代表这个文件已经加载完成
*
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue()throws IOException, InterruptedException {
if (!processed){
byte[] contents =new byte[(int)fileSplit.getLength()];
Path file =fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in =null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents,0, contents.length);
key.set(file.toString());
value.set(contents,0 ,contents.length);
}finally {
IOUtils.closeStream(in);
}
processed =true;
return true;
}
return false;
}
@Override
public Text getCurrentKey()throws IOException, InterruptedException {
return key;
}
@Override
public BytesWritable getCurrentValue()throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress()throws IOException, InterruptedException {
return processed ?1.0f :0.0f;
}
@Override
public void close()throws IOException {
}
}
WholeFileRecordReader 类负责将FileSplit转换成一条记录,该记录的键是文件的路径
如果不需要可以不对Key进行初始化返回 Null 即可,值是这个文件的内容,因为只有一条记录
WholeFileRecordReader 要么处理这条数据,要么不处理,所以他维护一个名称为 processed
的布尔变来表示这个文件是否被处理过。如果当 nextKeyValue() 方法被调用时,文件没有被处理过,
就打开文件,产生一个长度是文件长度的字节数组,并用 Hadoop 的 IOUtils 类把文件的内容放入字节数组。
然后在被传递到 next() 方法的 BytesWritable 实例上设置数组,返回 true 则表示成功读取记录。
以上为所有的代码实例以及说明,下面来说一下所遇到的问题
1.此程序无法 Dubug 也就是说无法调试 (至今未解决)
2.在mapper阶段获取 value 时这个值不能直接使用,因为在调用 nextKeyValue() 方法时已经转换为 BytesWritable
类型值的长度变长了,这是由于 Hadoop 里面 BytesWritable 的实现机制造成的,BytesWritable 的实现中,
保存了一个 byte[] 存放内容,一个 int size 表示 byte 数组里面前多少位是有效的,后面的是无效的,但是 ByteWritable 的getBytes() 方法返回的确实 byte 数组的全部内容(长度很可能大于size),所以在 Mapper 中进行处理的时候应该只操纵size大小的内容后面的应该无视掉,
转换为String类型如下:String str = new String(value.getBytes(),0,value.getLength());
或是 value.setCapacity(value.getLength());
如果文件过大程序将不能调试,会卡死文件晓得话可以调试,具体原因不清楚
网友评论