1.介绍
存储在hdfs中的文件,分布在集群各节点上,将数据汇总处理显然是不现实的。通过mapreduce,可以将计算移至各节点进行同步计算,然后在汇总结果。通俗讲,就是讲用于处理数据的jar包或其他语言可以执行文件分发至hdfs各DataNode,各DataNode运行jar包,最后汇总数据。
2.过程
mapreduce分为map、shuffle和reduce三个过程。其中map阶段对数据进行提取,分离出关心的数据并进行排序,shuffle阶段按照给定的key分发进行reduce处理,其中key值相同的将分发到相同节点。
比如,要统计url的调用量情况。map中将url从数据集中分离出来,并统计数量,用HashMap存的话,key是url,value就是出现的次数,然后以url为key输出。reduce阶段,相同的url数据都会汇总到同一节点,对接收到的值进行累加就行。流程如下:
![](https://img.haomeiwen.com/i15716504/a1b1c685c27e9bd4.png)
3.实例
新建java工程读取已有hdfs文件,依赖如下:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<optional>true</optional>
</dependency>
</dependencies>
定义输出类型,只需要统计次数:
@Data
public class UrlWritable implements Writable {
private int count;
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(count);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.count = dataInput.readInt();
}
@Override
public String toString() {
return count;
}
}
定义mapper,从数据中获取url,并置次数为1,输出key为url。
public class UrlMapper extends Mapper<LongWritable, Text, Text, UrlWritable> {
private final Text keyText = new Text();
private final UrlWritable urlWritable = new UrlWritable();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
String url = fields[2];
this.urlWritable.setCount(1);
keyText.set(url);
context.write(keyText, this.urlWritable);
}
}
reducer汇总次数
public class UrlReducer extends Reducer<Text, UrlWritable, Text, UrlWritable> {
private final UrlWritable urlWritable = new UrlWritable();
public void reduce(Text key, Iterable<UrlWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (UrlWritable val : values) {
count += val.getCount();
}
urlWritable.setCount(count);
context.write(key, urlWritable);
}
}
main方法:
public class MainClass {
public static void main(String[] args) {
Configuration conf = new Configuration();
// 任务名称
Job job = Job.getInstance(conf, "test_job");
job.setJarByClass(MainClass.class);
// mapper
job.setMapperClass(UrlMapper.class);
// mapper结束后提前对结果进行汇总(这里都是去和,与reducer一致)
job.setCombinerClass(UrlReducer.class);
// reducer
job.setReducerClass(UrlReducer.class);
// 输出文件类型,文本文件
job.setOutputKeyClass(Text.class);
// 输出对象
job.setOutputValueClass(UrlWritable.class);
// 输入文件类型,根据hdfs文件类型确定
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// hdfs中输出路径
FileOutputFormat.setOutputPath(job, new Path("/flume/test/output"));
// hdfs中输出入路径,可以有多个
FileInputFormat.addInputPath(job, new Path("/flume/test/logs"));
jobs.add(job);
job.waitForCompletion(true)
}
}
打包工程生产jar包,test.jar。在hadoop上提交jar包:
/data0/apps/hadoop-3.2.1/bin/hadoop jar test.jar test.com.MainClass
注:若工程中依赖了本地jar包,比如需要发送邮件的mail.jar,那么需要使用fatjar。配置里面添加:
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<includeSystemScope>true</includeSystemScope>
</configuration>
</plugin>
</plugins>
</build>
生产的target目录下,使用origin jar包。
网友评论