美文网首页
自定义InputFormat案例

自定义InputFormat案例

作者: bullion | 来源:发表于2019-01-18 18:06 被阅读0次

    需求:

        将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

    WholeFileInputformat 

    public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {

        @Override

        public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

            WholeRecordReader recordReader = new WholeRecordReader();

            recordReader.initialize(split, context);

            return recordReader;

        }

    }

    WholeRecordReader 

    public class WholeRecordReader extends RecordReader<Text, BytesWritable> {

        FileSplit split;

        Configuration configuration;

        Text k = new Text();

        BytesWritable v = new BytesWritable();

        boolean isProgress = true;

        @Override

        public void initialize(InputSplit split, TaskAttemptContext content) throws IOException, InterruptedIOException {

            //初始化

            this.split = (FileSplit) split;

            configuration = content.getConfiguration();

        }

        @Override

        public boolean nextKeyValue() throws IOException, InterruptedIOException {

            if(isProgress){

                //核心业务逻辑处理

                byte[] buf = new byte[(int) split.getLength()];

                // 1 获取fs对象

                Path path = split.getPath();

                FileSystem fs = path.getFileSystem(configuration);

                // 2 获取输入流

                FSDataInputStream fis = fs.open(path);

                // 3 拷贝

                IOUtils.readFully(fis, buf, 0, buf.length);

                // 4 封装v

                v.set(buf, 0, buf.length);

                // 5 封装k

                k.set(path.toString());

                // 6 关闭资源

                IOUtils.closeStream(fis);

                isProgress = false;

                return true;

            }

            return false;

        }

        @Override

        public Text getCurrentKey() throws IOException, InterruptedIOException {

            return k;

        }

        @Override

        public BytesWritable getCurrentValue() throws IOException, InterruptedIOException {

            return v;

        }

        @Override

        public float getProgress() throws IOException, InterruptedIOException {

            return 0;

        }

        @Override

        public void close() throws IOException {

        }

    }

    SequenceFileMapper 

    public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {

        @Override

        protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {

            context.write(key,value);

        }

    }

    SequenceFileReducer 

    public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

        @Override

        protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {

            //循环写出

            for (BytesWritable value : values) {

                context.write(key, value);

            }

        }

    }

    SequenceFileDriver 

    public class SequenceFileDriver {

        public static void main(String[] args) throws Exception {

            //输入输出路径需要根据自己电脑上的实际的输入输出路径设置

            args = new String[]{"e:/input/inputformat", "e:/output"};

            // 1 获取job对象

            Configuration conf = new Configuration();

            Job job = Job.getInstance(conf);

            // 2 设置jar包存储位置 关联自定义的mapper和reducer

            job.setJarByClass(SequenceFileDriver.class);

            job.setMapperClass(SequenceFileMapper.class);

            job.setReducerClass(SequenceFileReducer.class);

            // 7 设置输入的inputFormat

            job.setInputFormatClass(WholeFileInputformat.class);

            // 8 设置输出的outputFormat

            job.setOutputFormatClass(SequenceFileOutputFormat.class);

            // 3 设置map输入端的kv类型

            job.setMapOutputKeyClass(Text.class);

            job.setMapOutputValueClass(BytesWritable.class);

            // 4 设置最终输出端的kv类型

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(BytesWritable.class);

            // 5 设置输入输出路径

            FileInputFormat.setInputPaths(job, new Path(args[0]));

            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 6 提交job

            boolean result = job.waitForCompletion(true);

            System.exit(result ? 0 : 1);

        }

    }

    相关文章

      网友评论

          本文标题:自定义InputFormat案例

          本文链接:https://www.haomeiwen.com/subject/atxodqtx.html