无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。(对外是一个整文件,对内仍是原先的小文件,节省MapTask)
需求如下:
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
1)输入数据
image2)期望输出文件格式
image步骤
image程序实现
(1)自定义InputFromat
package cn.mark.mrInputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
//存储的形式为文件路径+名称为key,文件内容为value。读全部文件用到流,byte
//故 输入Key类型为Text,输入Value类型为BytesWritable
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {
// 定义类继承FileInputFormat
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;//单个文件不允许再切片
}
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(split,context);
return recordReader;
}
}
(2)自定义RecordReader类(核心)
package cn.mark.mrInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
//RecordReader<Text, BytesWritable> 固有的输入KV格式
public class WholeRecordReader extends RecordReader<Text, BytesWritable> {
// 主要针对缺什么补什么
FileSplit split;
Configuration configuration;
Text k = new Text();
BytesWritable v = new BytesWritable();
// 标记位
boolean isProgress = true;
//************************************
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// 初始化
this.split = (FileSplit) split;
configuration = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// 核心业务逻辑处理 对key和value进行封装
if (isProgress){
/** The number of bytes in the file to process. 获取文件字节的全部数量
public long getLength() { return fs.getLength(); } */
byte[] buf = new byte[(int) split.getLength()];
// 1.获取fs对象
Path path = split.getPath();
FileSystem fs = path.getFileSystem(configuration);
// 2.获取输入流
FSDataInputStream fis = fs.open(path);
// 3.拷贝
// readFully(InputStream in, byte buf[], int off, int len)
// 4各参数: 1.要读的流 2.目的地 3.读的大小的起始位置 4.读的长度
/**
* Reads len bytes in a loop.
* @param in InputStream to read from
* @param buf The buffer to fill
* @param off offset from the buffer :缓冲区的偏移量,即开始位置
* @param len the length of bytes to read
先开辟一段相应长度的字节缓冲区,再读内容进去 */
IOUtils.readFully(fis,buf,0,buf.length);
// 4.封装v v是文件的内容
/**Set the value to a copy of the given byte range
* @param newData the new values to copy in
* @param offset the offset in newData to start at
* @param length the number of bytes to copy
public void set(byte[] newData, int offset, int length)
再将之前的缓冲区内容通过set方法,设置成v的值 */
v.set(buf,0,buf.length);
// 5.封装K ,k本身就是路径名 path.toString():既有路径又有文件名称
k.set(path.toString());
// 6.关闭资源
IOUtils.closeStream(fis);
// 能进来说明能读到数据,而且每次调用nextKeyValue函数时候是说明已经新读一个文件,
// 本WholeRecordReader类会重新创建对象,重新初始化,isProgress都会重新设为true
isProgress = false;//说明本文件已经读完!
return true; //只有return true才会执行下面的函数
/** public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) { //<往常只读一行,有数据则true>
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
} 下如果只写ture 则会无限循环,如果只写false则会不进循环,不进行读写操作,
故需要一个标记*/
}
return false;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return k;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return v;
}
//***********************************
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
}
}
(3)编写SequenceFileMapper类处理流程
package cn.mark.mrInputFormat;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SequenceFileMapper extends Mapper<Text, BytesWritable,Text,BytesWritable> {
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
// 不是一次读取一行,是一次读取一整个文件
context.write(key,value);
}
}
(4)编写SequenceFileReducer类处理流程
package cn.mark.mrInputFormat;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
// 要知道传过来的是什么数据及其类型
// 传过来a.txt , b.txt 输出 : <路径名文件名,文件内容>
public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
// 循环写出 每次都是一个文件的全部内容
for (BytesWritable value :
values) {
context.write(key,value);
}
}
}
(5)编写SequenceFileDriver类处理流程
package cn.mark.mrInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import java.io.IOException;
public class SequenceFileDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "C:\\Users\\Administrator\\Downloads\\input\\123",
"C:\\Users\\Administrator\\Downloads\\input\\output" };
// 1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar包存储位置、关联自定义的mapper和reducer
job.setJarByClass(SequenceFileDriver.class);
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);
// 7设置输入的inputFormat
job.setInputFormatClass(WholeFileInputformat.class);
// 8设置输出的outputFormat 默认是Text.class
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 3 设置map输出端的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 4 设置最终输出端的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交job
job.waitForCompletion(true);
}
}
image
网友评论