美文网首页
自定义输入格式

自定义输入格式

作者: 苏坡闷 | 来源:发表于2019-03-11 20:59 被阅读0次

    首先,我们需要弄懂为什么要为什么要用RecordReader,还要知道为什么要自定义RecordReader。

    在我们对文件切片之后,我们需要将切片后的文件转为key-value的键值对。RecordReader的作用就是这个。
    RecordReader是一个抽象类,在实际运用中,常见的子类有LineRecordReader、SequenceFileRecordReader、CombineFileRecordReader、KeyValueLineRecordReader、DBRecordReader

    1.LineRecordReader:是用每行的偏移量作为map的key,每行的内容作为map的value
    2.CombineFileRecordReader:处理CombineInputSplit里的每一个chunk的RecordReader,CombineInputSplit包含不同的小文件chunk信息
    3.KeyValueLineRecordReader:根据指定的分割符却切分每一行数据,如果没有指定分割符,那么key就是整行的文本,value就是空
    4.DBRecordReader:从数据库表中读取数据
    上述的这些内置的RecordReader能解决部分问题,但是在实际运用中还会有许多特殊的使用场景,此时我们就需要自定义RecordReader来处理问题。

    1.自定义RecordReader:

    继承抽象类RecordReader,实现RecordReader的一个实例;
    实现自定义InputFormat类,重写InputFormat中createRecordReader()方法,返回值是自定义的RecordReader实例;
    配置job.setInputFormatClass()设置自定义的InputFormat实例;


    image.png

    2.RecordReader类为一个抽象类,共有下面这些抽象方法,

    //实现了java7的新特性AuotCloseable
    public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    
        //初始化RecordReader,只能被调用一次。
        public abstract void initialize(InputSplit split,
                                      TaskAttemptContext context
                                      ) throws IOException, InterruptedException;
        //读取下一个key/value键值对
        public abstract boolean nextKeyValue() throws IOException, InterruptedException;
    
        //获取当前的key
        public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
      
        //获取当前的vlaue
        public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
      
        //进度
        public abstract float getProgress() throws IOException, InterruptedException;
      
        //关闭RecordReader
        public abstract void close() throws IOException;
    }
    

    3.案例

    将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

    // 定义类继承FileInputFormat
    public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{
        
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }
    
        @Override
        public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)   throws IOException, InterruptedException {
            
            WholeRecordReader recordReader = new WholeRecordReader();
            recordReader.initialize(split, context);
            
            return recordReader;
        }
    }
    
    
    public class WholeRecordReader extends RecordReader<Text, BytesWritable>{
    
        private Configuration configuration;
        private FileSplit split;
        
        private boolean isProgress= true;
        private BytesWritable value = new BytesWritable();
        private Text k = new Text();
    
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            //InputSplit类里面没有split这个切片的详细信息,经查看在他的子类FileSplit中有这个属性,所以要强转
            this.split = (FileSplit)split;
            configuration = context.getConfiguration();
        }
    
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            
            if (isProgress) {
                // 1 定义缓存区
                byte[] contents = new byte[(int)split.getLength()];
                
                FileSystem fs = null;
                FSDataInputStream fis = null;
                
                try {
                    // 2 获取文件系统
                    Path path = split.getPath();
                    fs = path.getFileSystem(configuration);
                    
                    // 3 读取数据
                    fis = fs.open(path);
                    
                    // 4 读取文件内容
                    IOUtils.readFully(fis, contents, 0, contents.length);
                    
                    // 5 输出文件内容
                    value.set(contents, 0, contents.length);
    
                    // 6 获取文件路径及名称
                    String name = split.getPath().toString();
    
                     // 7 设置输出的key值
                     k.set(name);
    
                } catch (Exception e) {
                    
                }finally {
                    IOUtils.closeStream(fis);
                }
                
                isProgress = false;
                
                return true;
            }
            
            return false;
        }
    
        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return k;
        }
    
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
    
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
    
        @Override
        public void close() throws IOException {
        }
    }
    
    
    public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
        
        @Override
        protected void map(Text key, BytesWritable value,           Context context)        throws IOException, InterruptedException {
    
            context.write(key, value);
        }
    }
    
    
    public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<BytesWritable> values, Context context)        throws IOException, InterruptedException {
    
            context.write(key, values.iterator().next());
        }
    }
    
    
    public class SequenceFileDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            
           // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
            args = new String[] { "e:/input/inputinputformat", "e:/output1" };
    
           // 1 获取job对象
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
           // 2 设置jar包存储位置、关联自定义的mapper和reducer
            job.setJarByClass(SequenceFileDriver.class);
            job.setMapperClass(SequenceFileMapper.class);
            job.setReducerClass(SequenceFileReducer.class);
    
           // 7设置输入的inputFormat
            job.setInputFormatClass(WholeFileInputformat.class);
            // 8设置输出的outputFormat
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
           
            // 3 设置map输出端的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(BytesWritable.class);
            
           // 4 设置最终输出端的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
    
           // 5 设置输入输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
           // 6 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    
    

    相关文章

      网友评论

          本文标题:自定义输入格式

          本文链接:https://www.haomeiwen.com/subject/oknjpqtx.html