美文网首页
离线计算组件篇-MapReduce-自定义outputForma

离线计算组件篇-MapReduce-自定义outputForma

作者: CoderInsight | 来源:发表于2022-12-02 21:44 被阅读0次

10.自定义 outputFormat

(1). 需求

  • 现在有一些订单的评论数据,需求,将订单的好评与其他评论(中评、差评)进行区分开来,将最终的数据分开到不同的文件夹下面去,数据内容参见资料文件夹,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评

(2). 分析

  • 程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现

(3). 实现

  • 实现要点:

    1、在mapreduce中访问外部资源

    2、 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()

(4),代码实现

第一步:编写自定义输出类: MyOutputFormat

package top.wangyq.myoutputFormat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 1, 继承接口 FileOutputFormat<K,V>
 * 2, 重写方法: getRecordWriter()
 * 3, 自定义 RecordWriter 类, 可以使用内部类的形式定义,并且继承自 RecordWriter
 * 4, 根据我们要自定义输出的文件个数初始化基本 文件输出流 (FSDataOutputStream) 对象
 * 5, 重写 write() 方法 和 close() 方法;分别用来指定写入文件的内容 和 关闭 文件输出流对象
 */
public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {


    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {

        FileSystem fs = FileSystem.get(context.getConfiguration());


        Path goodCommont = new Path("D:\\BigData\\开课吧\\9-第九章-Hadoop\\1-MapReduce\\mr-录播\\5、自定义outputFormat\\mygood\\1.txt");
        Path bacCommonnt = new Path("D:\\BigData\\开课吧\\9-第九章-Hadoop\\1-MapReduce\\mr-录播\\5、自定义outputFormat\\mybad\\2.txt");

        FSDataOutputStream goodOutputStream = fs.create(goodCommont);
        FSDataOutputStream badOutputstream = fs.create(bacCommonnt);

        return new MyRecorderWriter(goodOutputStream, badOutputstream);
    }



    static class MyRecorderWriter extends RecordWriter<Text, NullWritable>{

        FSDataOutputStream goodStream = null;
        FSDataOutputStream badStream = null;

        public MyRecorderWriter(FSDataOutputStream goodStream, FSDataOutputStream badStream) {
            this.goodStream = goodStream;
            this.badStream = badStream;
        }

        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            if (key.toString().split("\t")[9].equals("0")){
                // 好评
                goodStream.write(key.toString().getBytes());
                goodStream.write("\r\n".getBytes());
            }else {
                // 中评或者差评
                badStream.write(key.toString().getBytes());
                badStream.write("\r\n".getBytes());
            }
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if (badStream != null){
                badStream.close();
            }
            if (goodStream != null){
                goodStream.close();
            }
        }
    }
}

第二步:编写Main函数

package top.wangyq.myoutputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class MyOwnOutputFormatMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf, MyOwnOutputFormatMain.class.getSimpleName());

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(args[0]));

        job.setMapperClass(MyOwnMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(MyOutputFormat.class);
        MyOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setNumReduceTasks(2);
        boolean b = job.waitForCompletion(true);
        return b ? 0 :1;
    }


    public static class MyOwnMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration, new MyOwnOutputFormatMain(), args);
        System.exit(run);

    }
}

相关文章

网友评论

      本文标题:离线计算组件篇-MapReduce-自定义outputForma

      本文链接:https://www.haomeiwen.com/subject/zqoffdtx.html