美文网首页
Hadoop-3.1.3(九)MapReduce 自定义格式输入

Hadoop-3.1.3(九)MapReduce 自定义格式输入

作者: _大叔_ | 来源:发表于2021-07-23 09:50 被阅读0次

自定义格式输入组件

自定义输入组件会在 MapperTask 之前执行,并作为MapperTask的输入key和value。

定义RecordReader
package com.example.demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.io.InputStream;

/**
 * @author big uncle
 * @date 2021/5/14 16:49
 * @module
 **/
public class LineNumberInputRecordReader extends RecordReader<IntWritable, Text> {

    /**
     * 件切片
    **/
    private FileSplit fs;
    /**
     * 输入key
    **/
    private IntWritable key;
    /**
     * 输入value
    **/
    private Text value;
    /**
     * 行读取器
    **/
    private LineReader reader;
    /**
     * 记录行号
    **/
    private int count;


    /**
     * 初始化方法,用于初始化文件切片和行读取器
    **/
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        // 初始化文件切片
        fs = (FileSplit) inputSplit;
        // 通过切片获取切片路径
        Path path = fs.getPath();
        // 获取job的环境变量参数
        Configuration conf = taskAttemptContext.getConfiguration();
        // 获取HDFS文件系统对象
        FileSystem system = path.getFileSystem(conf);
        // 获取切片对应的文件数据的输入流
        InputStream in = system.open(path);
        // 初始化航都读取器
        reader = new LineReader(in);
    }

    /**
     * 此方法会被掉用多次,如果return true会继续被调用,直到return false
     * 一行一行读取数据,直到读完为止
    **/
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        key = new IntWritable();
        value = new Text();
        Text tmp = new Text();
        // 每调一次,会读取一行
        int length = reader.readLine(tmp);
        if(length==0){
            // 文件数据以读完
            return false;
        }
        count++;
        // 将行号复赋值给输入key
        key.set(count);
        // 将每行内容赋值给value
        value.set(tmp);
        return true;
    }

    /**
     * 此方法用于将输入key传给Mapper组件
     * nextKeyValue方法被调用一次,该方法也被调用一次
    **/
    @Override
    public IntWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }
    /**
     * 此方法用于将输入value传给Mapper组件
     * nextKeyValue方法被调用一次,该方法也被调用一次
    **/
    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {
        reader = null;
    }
}
定义FileInputFormat
package com.example.demo.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/14 16:47
 * 自定义格式输入组件,决定Mapper的输入Key和输入Value类型
 * 第一个泛型是Mapper的输入Key
 * 第二个泛型是Mapper的输入Value
 **/
public class LineNumberInputFormat extends FileInputFormat<IntWritable, Text> {

    @Override
    public RecordReader<IntWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new LineNumberInputRecordReader();
    }
}
在job中设置环境
// 设置自定义格式化输入 默认是 TextInputFormat
job.setInputFormatClass(LineNumberInputFormat.class);

自定义格式输出组件

定义RecordWriter
package com.example.demo.mr;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.checkerframework.checker.units.qual.K;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/14 17:52
 * @module
 **/
public class AuthRecodeWriter<K,V> extends RecordWriter<K,V> {

    private FSDataOutputStream outputStream;

    public AuthRecodeWriter(FSDataOutputStream outputStream) {
        this.outputStream = outputStream;
    }

    @Override
    public void write(K k, V v) throws IOException, InterruptedException {
        // 将输入key挟到文件里,如果只有mapper 则是mapper的输出key,
        // 既有 mapper和 reduce,则是reduce的输出key
        outputStream.write(k.toString().getBytes());
        // 输出k v分隔符,默认是Tab制表符
        outputStream.write("|".getBytes());

        outputStream.write(v.toString().getBytes());
        // 输出行于行的分隔符
        outputStream.write("@@@".getBytes());
    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        outputStream.close();
    }
}
定义FileOutputFormat
package com.example.demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/14 17:48
 * @module
 **/
public class AuthOutputFormat<K,V>  extends FileOutputFormat<K,V> {

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        Path path = super.getDefaultWorkFile(taskAttemptContext,"");
        Configuration conf = taskAttemptContext.getConfiguration();
        FileSystem system = path.getFileSystem(conf);
        // 获取输出流
        FSDataOutputStream outputStream = system.create(path);

        return new AuthRecodeWriter<K,V>(outputStream);
    }

}

读取多个文件,把文件结果输出到同一个结果文件,结果内容会以交替的形式存在。

在job中设置环境
// 设置自定义数组组件 默认是 TextOutFormat,kv分隔符默认tab制表符,行与行默认换行符
job.setOutputFormatClass(AuthOutputFormat.class);

多输入源

package com.example.demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/11 11:13
 * @module
 **/
public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        // 获取job对象1
        Job job = Job.getInstance(configuration);
        // 设置job方法入口的驱动类
        job.setJarByClass(WordCountDriver.class);

        // 读取多个文件
        MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/aa.txt"), TextInputFormat.class,WordCountMapper1.class);
        MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/bb.txt"), TextInputFormat.class,WordCountMapper2.class);

        // 设置Mapper输出的key的类型
        job.setMapOutputKeyClass(Text.class);
        // 输出Mapper输出的value的类型
        job.setMapOutputValueClass(Flow.class);

        // 输出到指定目录,要求结果路径事先不存在
        FileOutputFormat.setOutputPath(job,new Path("hdfs://node113:9000/test1/result"));
        // 提交job
        job.waitForCompletion(true);
    }
}

多输出源

Reduce
package com.example.demo.mr;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/15 10:44
 * @module
 **/
public class MultipleOutputReduce extends Reducer<Text,Text,Text,Text> {

    /**
     * 多输出源
    **/
    private MultipleOutputs<Text,Text> outputs;

    /**
     * 初始化方法
    **/
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        outputs=new MultipleOutputs(context);
    }

    /**
     * 处理内容
    **/
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for(Text text:values){
            if("tom".equals(key)){
                // 输出到 tomFile 文件
                outputs.write("tomFile",key,text);
            }else if("rose".equals(key)){
                outputs.write("roseFile",key,text);
            }
        }
    }
}
job
package com.example.demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/11 11:13
 * @module
 **/
public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        // 获取job对象1
        Job job = Job.getInstance(configuration);
        // 设置job方法入口的驱动类
        job.setJarByClass(WordCountDriver.class);

        // 读取多个文件
        MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/aa.txt"), TextInputFormat.class,WordCountMapper1.class);
        MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/bb.txt"), TextInputFormat.class,WordCountMapper2.class);

        // 设置Mapper输出的key的类型
        job.setMapOutputKeyClass(Text.class);
        // 输出Mapper输出的value的类型
        job.setMapOutputValueClass(Flow.class);

        // 设置Reduce
        job.setReducerClass(MultipleOutputReduce.class);
        // 多输出源
        MultipleOutputs.addNamedOutput(job,"tomFile", TextOutputFormat.class,Text.class,Text.class);
        MultipleOutputs.addNamedOutput(job,"roseFile", TextOutputFormat.class,Text.class,Text.class);

        // 输出到指定目录,要求结果路径事先不存在
        FileOutputFormat.setOutputPath(job,new Path("hdfs://node113:9000/test1/result"));
        // 提交job
        job.waitForCompletion(true);
    }
}

相关文章

网友评论

      本文标题:Hadoop-3.1.3(九)MapReduce 自定义格式输入

      本文链接:https://www.haomeiwen.com/subject/lhoqjltx.html