美文网首页
离线计算组件篇-MapReduce-自定义InputFormat

离线计算组件篇-MapReduce-自定义InputFormat

作者: CoderInsight | 来源:发表于2022-11-26 16:43 被阅读0次

5.自定义 inputFotmat

通过自定义 inputFormat 来实现“将小文件批量合并成 SequenceFIle 格式的单个文件(文件内容是:文件名 bytes流(二进制流))!”

(1),MyInputFormat

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * 自定义 InputFormat
 * 这里的 key value 类型的数据是 NullWritable, ByteWritable(byte[] 数组)
 */
public class MyInputFormat extends FileInputFormat<NullWritable, ByteWritable> {


    @Override
    public RecordReader<NullWritable, ByteWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // 调用自定义读取文件的类
        MyRecordReader myRecordReader = new MyRecordReader();
        // 初始化 RecordReader
        myRecordReader.initialize(split, context);
        // 将自定义RecorderReader返回
        return myRecordReader;
    }


    /**
     * 注意这个方法,是决定我们的文件是否切分的,如果不切分直接返回false
     * 那么在读取这个文件的时候,就会一次性的将文件中的内容全部读取出来
     * @param context
     * @param filename
     * @return
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
}

(2).MyRecordReader

核心实现方法,主要实现思路就是根据依次读取每个文件,而且一次将文件中的内容全部读完;读取的方式是以二进制流的方式进行读取。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyRecordReader extends RecordReader {

    private FileSplit fileSplit;  // 文件切分的类
    private Configuration configuration;
    private BytesWritable bytesWritable;

    // 定义读取文件是否结束的标志位
    private boolean flag = false;


    /**
     * 初始化方法: 只在程序初始化的时候调用一次,只要拿到了文件的切片,就拿到了文件的内容
     * @param split
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // 初始化文件切片对象
        this.fileSplit = (FileSplit) split;
        // 初始化配置文件对象
        this.configuration = context.getConfiguration();
        // 初始化流对象
        bytesWritable = new BytesWritable();
    }

    /**
     * 读取数据
     * 返回值是 boolean 类型,如果返回 true,表示文件已经读完了,不能再继续往下读了
     * 如果返回是 false ,表示文件没有读取完成,继续读取
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (!flag){
            long length = fileSplit.getLength();
            byte[] bytes = new byte[(int) length];

            // 先去获取指定路径下的文件输入流
            Path path = fileSplit.getPath();
            FileSystem fileSystem = path.getFileSystem(configuration);
            FSDataInputStream open = fileSystem.open(path);

            // 将流当中的数据拷贝到字节数组当中(inputStream  -->  bytes[])
            IOUtils.readFully(open, bytes, 0 ,(int)length);
            // bytes[]  --->  BytesWritable
            bytesWritable.set(bytes, 0, (int)length);

            flag = true;
            return true;
        }
        return false;
    }

    /**
     * 获取当前的Key1 : hadoop序列化的空
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public Object getCurrentKey() throws IOException, InterruptedException {

        return NullWritable.get();
    }

    /**
     * 获取当前的value1 : hadoop序列化的字节数组
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public Object getCurrentValue() throws IOException, InterruptedException {
        return bytesWritable;
    }

    /**
     * 读取文件的进度,我们反正要么不读,要么全部读完,所以此时没用
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return flag ? 1.0f : 0.0f;
    }

    /**
     * 关闭资源的连接啥的,此时我们也不用关闭,所以直接默认实现一个空方法即可
     * @throws IOException
     */
    @Override
    public void close() throws IOException {
    }
}

(3).MyInputFormatMapper

主要逻辑是 直接采用读取文件切分的类,依次将所有的文件读取到map过程,然后在map过程中实现自定义逻辑,也就是实现需求中,文件和文件内容合并存储为SequenceFile格式的文件

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;


import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyInputFormatMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {

    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        // 创建文件切分的类
        FileSplit inputSplit = (FileSplit) context.getInputSplit();

        String name = inputSplit.getPath().getName();
        context.write(new Text(name), value);
    }
}

(4).MyInputFormatMain

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyInputFormatMain  extends Configured implements Tool {


    @Override
    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(super.getConf(),"mergeSmallFile");
        // 此时我们调用的输入格式是自定义的读取小文件的格式化类
        job.setInputFormatClass(MyInputFormat.class);
        MyInputFormat.addInputPath(job, new Path("file:///D:\\BigData\\testFileDir\\in"));

        job.setMapperClass(MyInputFormatMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        // 此时没有Reduce过程,所以没有设置Reduce函数
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setOutputPath(job, new Path("file:///D:\\BigData\\testFileDir\\out"));

        boolean b = job.waitForCompletion(true);

        return b ? 0 : 1;
    }


    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new MyInputFormatMain(), args);

        System.exit(run);
    }
}

相关文章

  • 离线计算组件篇-MapReduce-自定义InputFormat

    5.自定义 inputFotmat 通过自定义 inputFormat 来实现“将小文件批量合并成 Sequenc...

  • 离线计算组件篇-MapReduce基础

    1.mapreduce编程指导思想 本文的核心是带领大家去了解的MapReduce的核心设计思想,以及最基础的编程...

  • Android自定义View之自定义布局

    自定义分为自定义View与自定义布局: 需要掌握的技术:onMeasure 父子组件大小计算onLayout 组件...

  • Vue.js实战初阅

    一、基础篇 包括数据的双向绑定、计算属性、基本指令、自定义指令及组件等。 Vue在设计上使用MVVM(Model-...

  • hadoop 切片

    Hadoop的切片计算是通过调用InputFormat接口的getSplits方法完成的 TextInputFor...

  • 大数据发展历程

    离线计算 实时计算 内存计算

  • 自定义InputFormat案例

    需求: 将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进...

  • Apache Flink 简介

    前言 计算引擎 大数据计算引擎分为离线计算和实时计算,离线计算就是我们通常说的批计算,代表是Hadoop MapR...

  • 离线计算成本节省的神兵利器

    摘要:对于创业成长型的企业来说,离线计算已经必不可少了,通过离线计算我们可以生成复杂的业务报表,通过离线计算我们也...

  • Kyuubi 解锁 Spark SQL on CDH 6

    背景 CDH 最后一个免费版 6.3.2 发布一年有余,离线计算核心组件版本停在了 Hadoop 3.0.0,Hi...

网友评论

      本文标题:离线计算组件篇-MapReduce-自定义InputFormat

      本文链接:https://www.haomeiwen.com/subject/egnffdtx.html