美文网首页我爱编程
MapReduce如何处理不可分割文件

MapReduce如何处理不可分割文件

作者: IT_小白 | 来源:发表于2018-06-21 10:19 被阅读0次

    有时会有这样的逻辑需求,一个    map   任务需要处理一个文件中的所有内容或是

    把整个文件作为一条记录处理。

    即使不分割文件,仍然需要一个    RecordReader    来读取文件的所有内容作为record的值。

    Hadoop-version    :    2.7.1

    Jdk-version            :    1.8

    Maven-version      :    3.3.3

    完成此功能需要重写两个类

    1.    继承    FileInputFormat    类  重写其中的两个方法以下代码为列子

    public class WholeFileInputFormatextends FileInputFormat {

    //    createRecordReader    方法是返回一个定制的RecordReader实现

    @Override

        public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {

    WholeFileRecordReader reader =new WholeFileRecordReader();

    reader.initialize(split, context);

    return reader;

    }

    //    isSplitable    方法是说明此文件是否分割    返回 true    为分割    返回 false 为不分割

    @Override

        protected boolean isSplitable(JobContext context, Path filename) {

    return false;

    }

    }

    2.继承    RecordReader    类    重写其中的三个方法

    public class WholeFileRecordReader extends RecordReader {

    private FileSplitfileSplit;

    private Configurationconf;

    private Textkey =new Text();

    private BytesWritablevalue =new BytesWritable();

    private boolean processed =false;

    @Override

        public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {

    this.fileSplit = (FileSplit) split;

    this.conf = context.getConfiguration();

    }

    /**

    *  这个方法会被调用两次

    *  第一次读取整个文件流 将其转换为    BytesWritable对象

    *  并且将 processed  设置为 true

    *  第二次 在  processed 为true时

    *  将不会对数据进行加载

    *  并且返回    false 代表这个文件已经加载完成

    *

        * @return

        * @throws IOException

        * @throws InterruptedException

    */

        @Override

        public boolean nextKeyValue()throws IOException, InterruptedException {

    if (!processed){

    byte[] contents =new byte[(int)fileSplit.getLength()];

    Path file =fileSplit.getPath();

    FileSystem fs = file.getFileSystem(conf);

    FSDataInputStream in =null;

    try {

    in = fs.open(file);

    IOUtils.readFully(in, contents,0, contents.length);

    key.set(file.toString());

    value.set(contents,0 ,contents.length);

    }finally {

    IOUtils.closeStream(in);

    }

    processed =true;

    return true;

    }

    return false;

    }

    @Override

        public Text getCurrentKey()throws IOException, InterruptedException {

    return key;

    }

    @Override

        public BytesWritable getCurrentValue()throws IOException, InterruptedException {

    return value;

    }

    @Override

        public float getProgress()throws IOException, InterruptedException {

    return processed ?1.0f :0.0f;

    }

    @Override

        public void close()throws IOException {

    }

    }

    WholeFileRecordReader    类负责将FileSplit转换成一条记录,该记录的键是文件的路径

    如果不需要可以不对Key进行初始化返回    Null    即可,值是这个文件的内容,因为只有一条记录

    WholeFileRecordReader 要么处理这条数据,要么不处理,所以他维护一个名称为    processed

    的布尔变来表示这个文件是否被处理过。如果当    nextKeyValue()    方法被调用时,文件没有被处理过,

    就打开文件,产生一个长度是文件长度的字节数组,并用    Hadoop    的    IOUtils    类把文件的内容放入字节数组。

    然后在被传递到    next()    方法的    BytesWritable    实例上设置数组,返回    true    则表示成功读取记录。

                    以上为所有的代码实例以及说明,下面来说一下所遇到的问题

    1.此程序无法    Dubug    也就是说无法调试    (至今未解决)

    2.在mapper阶段获取    value    时这个值不能直接使用,因为在调用    nextKeyValue()    方法时已经转换为    BytesWritable

      类型值的长度变长了,这是由于    Hadoop    里面    BytesWritable    的实现机制造成的,BytesWritable    的实现中,

    保存了一个    byte[]    存放内容,一个    int size    表示    byte    数组里面前多少位是有效的,后面的是无效的,但是    ByteWritable    的getBytes()    方法返回的确实    byte    数组的全部内容(长度很可能大于size),所以在    Mapper    中进行处理的时候应该只操纵size大小的内容后面的应该无视掉,

    转换为String类型如下:String str = new String(value.getBytes(),0,value.getLength());   

    或是                                value.setCapacity(value.getLength());

    如果文件过大程序将不能调试,会卡死文件晓得话可以调试,具体原因不清楚

    刚刚开始写简书,有很多地方还不是很懂,有不好的地方还请您指出!

    在此感谢您能看完此篇文章!!!

    相关文章

      网友评论

        本文标题:MapReduce如何处理不可分割文件

        本文链接:https://www.haomeiwen.com/subject/mxtqyftx.html