10.自定义 outputFormat
(1). 需求
- 现在有一些订单的评论数据,需求,将订单的好评与其他评论(中评、差评)进行区分开来,将最终的数据分开到不同的文件夹下面去,数据内容参见资料文件夹,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评
(2). 分析
- 程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现
(3). 实现
-
实现要点:
1、在mapreduce中访问外部资源
2、 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
(4),代码实现
第一步:编写自定义输出类: MyOutputFormat
package top.wangyq.myoutputFormat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 1, 继承接口 FileOutputFormat<K,V>
* 2, 重写方法: getRecordWriter()
* 3, 自定义 RecordWriter 类, 可以使用内部类的形式定义,并且继承自 RecordWriter
* 4, 根据我们要自定义输出的文件个数初始化基本 文件输出流 (FSDataOutputStream) 对象
* 5, 重写 write() 方法 和 close() 方法;分别用来指定写入文件的内容 和 关闭 文件输出流对象
*/
public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
Path goodCommont = new Path("D:\\BigData\\开课吧\\9-第九章-Hadoop\\1-MapReduce\\mr-录播\\5、自定义outputFormat\\mygood\\1.txt");
Path bacCommonnt = new Path("D:\\BigData\\开课吧\\9-第九章-Hadoop\\1-MapReduce\\mr-录播\\5、自定义outputFormat\\mybad\\2.txt");
FSDataOutputStream goodOutputStream = fs.create(goodCommont);
FSDataOutputStream badOutputstream = fs.create(bacCommonnt);
return new MyRecorderWriter(goodOutputStream, badOutputstream);
}
static class MyRecorderWriter extends RecordWriter<Text, NullWritable>{
FSDataOutputStream goodStream = null;
FSDataOutputStream badStream = null;
public MyRecorderWriter(FSDataOutputStream goodStream, FSDataOutputStream badStream) {
this.goodStream = goodStream;
this.badStream = badStream;
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
if (key.toString().split("\t")[9].equals("0")){
// 好评
goodStream.write(key.toString().getBytes());
goodStream.write("\r\n".getBytes());
}else {
// 中评或者差评
badStream.write(key.toString().getBytes());
badStream.write("\r\n".getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (badStream != null){
badStream.close();
}
if (goodStream != null){
goodStream.close();
}
}
}
}
第二步:编写Main函数
package top.wangyq.myoutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class MyOwnOutputFormatMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();
Job job = Job.getInstance(conf, MyOwnOutputFormatMain.class.getSimpleName());
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(MyOwnMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(MyOutputFormat.class);
MyOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(2);
boolean b = job.waitForCompletion(true);
return b ? 0 :1;
}
public static class MyOwnMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new MyOwnOutputFormatMain(), args);
System.exit(run);
}
}
网友评论