美文网首页
自定义InputFormat案例

自定义InputFormat案例

作者: bullion | 来源:发表于2019-01-18 18:06 被阅读0次

需求:

    将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

WholeFileInputformat 

public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {

    @Override

    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        WholeRecordReader recordReader = new WholeRecordReader();

        recordReader.initialize(split, context);

        return recordReader;

    }

}

WholeRecordReader 

public class WholeRecordReader extends RecordReader<Text, BytesWritable> {

    FileSplit split;

    Configuration configuration;

    Text k = new Text();

    BytesWritable v = new BytesWritable();

    boolean isProgress = true;

    @Override

    public void initialize(InputSplit split, TaskAttemptContext content) throws IOException, InterruptedIOException {

        //初始化

        this.split = (FileSplit) split;

        configuration = content.getConfiguration();

    }

    @Override

    public boolean nextKeyValue() throws IOException, InterruptedIOException {

        if(isProgress){

            //核心业务逻辑处理

            byte[] buf = new byte[(int) split.getLength()];

            // 1 获取fs对象

            Path path = split.getPath();

            FileSystem fs = path.getFileSystem(configuration);

            // 2 获取输入流

            FSDataInputStream fis = fs.open(path);

            // 3 拷贝

            IOUtils.readFully(fis, buf, 0, buf.length);

            // 4 封装v

            v.set(buf, 0, buf.length);

            // 5 封装k

            k.set(path.toString());

            // 6 关闭资源

            IOUtils.closeStream(fis);

            isProgress = false;

            return true;

        }

        return false;

    }

    @Override

    public Text getCurrentKey() throws IOException, InterruptedIOException {

        return k;

    }

    @Override

    public BytesWritable getCurrentValue() throws IOException, InterruptedIOException {

        return v;

    }

    @Override

    public float getProgress() throws IOException, InterruptedIOException {

        return 0;

    }

    @Override

    public void close() throws IOException {

    }

}

SequenceFileMapper 

public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {

    @Override

    protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {

        context.write(key,value);

    }

}

SequenceFileReducer 

public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

    @Override

    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {

        //循环写出

        for (BytesWritable value : values) {

            context.write(key, value);

        }

    }

}

SequenceFileDriver 

public class SequenceFileDriver {

    public static void main(String[] args) throws Exception {

        //输入输出路径需要根据自己电脑上的实际的输入输出路径设置

        args = new String[]{"e:/input/inputformat", "e:/output"};

        // 1 获取job对象

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        // 2 设置jar包存储位置 关联自定义的mapper和reducer

        job.setJarByClass(SequenceFileDriver.class);

        job.setMapperClass(SequenceFileMapper.class);

        job.setReducerClass(SequenceFileReducer.class);

        // 7 设置输入的inputFormat

        job.setInputFormatClass(WholeFileInputformat.class);

        // 8 设置输出的outputFormat

        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        // 3 设置map输入端的kv类型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(BytesWritable.class);

        // 4 设置最终输出端的kv类型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(BytesWritable.class);

        // 5 设置输入输出路径

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6 提交job

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

相关文章

网友评论

      本文标题:自定义InputFormat案例

      本文链接:https://www.haomeiwen.com/subject/atxodqtx.html