5.自定义 inputFotmat
通过自定义 inputFormat 来实现“将小文件批量合并成 SequenceFIle 格式的单个文件(文件内容是:文件名 bytes流(二进制流))!”
(1),MyInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/**
* 自定义 InputFormat
* 这里的 key value 类型的数据是 NullWritable, ByteWritable(byte[] 数组)
*/
public class MyInputFormat extends FileInputFormat<NullWritable, ByteWritable> {
@Override
public RecordReader<NullWritable, ByteWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// 调用自定义读取文件的类
MyRecordReader myRecordReader = new MyRecordReader();
// 初始化 RecordReader
myRecordReader.initialize(split, context);
// 将自定义RecorderReader返回
return myRecordReader;
}
/**
* 注意这个方法,是决定我们的文件是否切分的,如果不切分直接返回false
* 那么在读取这个文件的时候,就会一次性的将文件中的内容全部读取出来
* @param context
* @param filename
* @return
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
(2).MyRecordReader
核心实现方法,主要实现思路就是根据依次读取每个文件,而且一次将文件中的内容全部读完;读取的方式是以二进制流的方式进行读取。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class MyRecordReader extends RecordReader {
private FileSplit fileSplit; // 文件切分的类
private Configuration configuration;
private BytesWritable bytesWritable;
// 定义读取文件是否结束的标志位
private boolean flag = false;
/**
* 初始化方法: 只在程序初始化的时候调用一次,只要拿到了文件的切片,就拿到了文件的内容
* @param split
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// 初始化文件切片对象
this.fileSplit = (FileSplit) split;
// 初始化配置文件对象
this.configuration = context.getConfiguration();
// 初始化流对象
bytesWritable = new BytesWritable();
}
/**
* 读取数据
* 返回值是 boolean 类型,如果返回 true,表示文件已经读完了,不能再继续往下读了
* 如果返回是 false ,表示文件没有读取完成,继续读取
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!flag){
long length = fileSplit.getLength();
byte[] bytes = new byte[(int) length];
// 先去获取指定路径下的文件输入流
Path path = fileSplit.getPath();
FileSystem fileSystem = path.getFileSystem(configuration);
FSDataInputStream open = fileSystem.open(path);
// 将流当中的数据拷贝到字节数组当中(inputStream --> bytes[])
IOUtils.readFully(open, bytes, 0 ,(int)length);
// bytes[] ---> BytesWritable
bytesWritable.set(bytes, 0, (int)length);
flag = true;
return true;
}
return false;
}
/**
* 获取当前的Key1 : hadoop序列化的空
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public Object getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
/**
* 获取当前的value1 : hadoop序列化的字节数组
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}
/**
* 读取文件的进度,我们反正要么不读,要么全部读完,所以此时没用
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return flag ? 1.0f : 0.0f;
}
/**
* 关闭资源的连接啥的,此时我们也不用关闭,所以直接默认实现一个空方法即可
* @throws IOException
*/
@Override
public void close() throws IOException {
}
}
(3).MyInputFormatMapper
主要逻辑是 直接采用读取文件切分的类,依次将所有的文件读取到map过程,然后在map过程中实现自定义逻辑,也就是实现需求中,文件和文件内容合并存储为SequenceFile格式的文件
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class MyInputFormatMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
// 创建文件切分的类
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String name = inputSplit.getPath().getName();
context.write(new Text(name), value);
}
}
(4).MyInputFormatMain
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyInputFormatMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(),"mergeSmallFile");
// 此时我们调用的输入格式是自定义的读取小文件的格式化类
job.setInputFormatClass(MyInputFormat.class);
MyInputFormat.addInputPath(job, new Path("file:///D:\\BigData\\testFileDir\\in"));
job.setMapperClass(MyInputFormatMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 此时没有Reduce过程,所以没有设置Reduce函数
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job, new Path("file:///D:\\BigData\\testFileDir\\out"));
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new MyInputFormatMain(), args);
System.exit(run);
}
}
网友评论