配置工程
-
在maven官网上下载最新的maven压缩包并解压。
-
下载IntelliJ IDEA并安装。
-
在IDEA中新建工程,选择maven,sdk选择java jdk的目录,勾选上Create form archetype,选择quickstart,下一步。
-
填写GroupId和ArtifactId,version填写1.0,下一步。
-
User settings file配置选择下载解压后的maven目录下的conf文件夹的settings.xml,然后下一步,完成。
-
IDEA创建工程,创建好了之后,修改根目录下的
pom.xm
文件,设置以下2个内容:<!--配置hadoop的远程仓库--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.7.0</version> </dependency> <!--配置hadoop版本,初次配置需要下载,要等一段时间--> <repositories> <repository> <id>cloudera</id> <url>http://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>
配置完成后的文件长这个样子:
然后我们通过View—>ToolWindows—>Maven Projects调出Maven窗口,可以看到hadoop需要的包我们已经导入进来了。
-
配置完成,接下来在src中新建类,开始写我们的wordcount处理程序。
Mapper&Redeucer
写MapReduce不可避免的要用到这两个类:Mapper
和Reducer
,通过IDEA我们可以查看这两个类的代码。
首先是Mapper
:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* The <code>Context</code> passed on to the {@link org.apache.hadoop.mapreduce.Mapper} implementations.
*/
public abstract class Context
implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
}
/**
* Called once at the beginning of the task.
*/
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* Called once at the end of the task.
*/
protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
*
* @param context
* @throws IOException
*/
public void run(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
Mapper
类中的方法还是不多的,首先是setup方法和cleanup方法,从名字就能看出来这两个方法一个是初始化时调用的,一个是结束的时候调用的,初始化可以处理一些像打开数据库连接这样的操作,结束可以关闭数据库连接,回收一些不用的资源等等。这两个方法都只会执行一次。
然后是map方法,显然这才是我们继承Mapper
类需要重点重写的方法,将map工作的处理逻辑都要放在这个方法中,我们看到map方法有三个参数,这就要看Mapper
类的第一行,它有四个泛型变量,KEYIN
, VALUEIN
, KEYOUT
, VALUEOUT
分别代表输入的键值类型和输出的键值类型,那么这里map参数的变量也不难理解,一个是输入的键,一个是输入的值,context是用于最后的write方法的,这样map方法就说完了。
最后是run方法,看到这个run方法的实现,会有一种似曾相识的感觉,没错就是设计模式中的模板方法模式,Mapper
类中定义了一些方法,用户可以继承这个类重写方法以应对不同的需求,但是这些方法内部的执行顺序是确定好的,它封装了程序的算法,让用户能集中精力处理每一部分的具体逻辑。run方法是程序在执行中会默认调用的,从他的执行流程来看也给常符合我们的预期,先进行初始化,如果还有输入的数据,那么调用map方法处理每一个键值对,最终执行结束方法。这就是整个map任务的流程。
有了Mapper
分析的基础,Reducer
也就不难理解了,下面是Reducer
的源码:
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* The <code>Context</code> passed on to the {@link org.apache.hadoop.mapreduce.Reducer} implementations.
*/
public abstract class Context
implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
}
/**
* Called once at the start of the task.
*/
protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context
) throws IOException, InterruptedException {
for (VALUEIN value : values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
/**
* Called once at the end of the task.
*/
protected void cleanup(org.apache.hadoop.mapreduce.Reducer.Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if (iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>) iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
Reducer
中跟Mapper
中相同的就不说了,重点在于reduce方法的参数,它将map处理后的结果作为它的输入参数,回想一下wordcount的处理流程,经过map任务的处理后,变成了每个单词对应一个list,每个list是一系列的1,1,1,1,表明这个单词出现的记录。输入的键当然是单词,输入的值实际上是一个列表,java集合中对于列表提供了一个迭代器用于遍历,使用迭代器进行遍历在速度上会快很多,因此这里的参数是一个迭代器也不难理解。而reduce方法的默认实现也是通过迭代器去遍历每个输入的结果。
到这里我们已经看完了Mapper
和Reducer
类,并且也知道了每个方法是干什么的了,写起来自然也就简单了。
写wordcount
首先我们需要分别继承Mapper
和Reducer
,并且要分别重写map和reduce方法用来放map和reduce阶段的逻辑。
考虑map任务中的输入输出键值对,我们之前讨论过,map输入的键为文件的偏移量,值为一行的内容(这里有个有意思的问题,为什么是一行呢?你会发现等我们写完了所有的代码后都没有看到哪里有定义是按行读取的,文章最后我们会讨论这个问题)。文件的偏移量是LongWritable
类型,实现了WritableComparable
接口,我们上篇文章说过每个作为键的类都必须实现这个接口。这个类可以理解成MapReduce框架对long数据的一种封装。至于每行的内容当然是String了,Text同样可以理解成是对String的封装。知道输入后我们不难想出思路,先将读取一行的内容转成字符串,然后按照空格拆分成String数组,再对数组进行遍历,每项就是一个单词,最后按<单词,1>键值对的形式写出去。下面是代码:
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//接收到的每一行数据
String line = value.toString();
//按照指定的分隔符进行拆分
String[] words = line.split(" ");
for (String word : words) {
//通过上下文把map的处理结果输出
context.write(new Text(word), one);
}
}
}
看完了MyMapper
类,下面我们来考虑Reducer
,Reducer
输入的键是单词,Text类不用说了;值是一个迭代器,这个迭代器所对应的列表中是单词出现的记录,也就是很多个1。reduce要做的任务也很简单,就是将这些1求和输出。下面是MyReducer
类
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
//最终统计结果的输出
context.write(key, new LongWritable(sum));
}
最后就是main方法了:
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建Configuration
Configuration configuration = new Configuration();
//准备清理已存在的输出目录
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath);
}
//创建Job
Job job = Job.getInstance(configuration, "wordcount");
//设置Job的处理类
job.setJarByClass(WordCountApp.class);
//设置作业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置map相关参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置reduce相关参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置作业输出目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
有几点要提一下:
- 清理已存在的输出目录的作用是为了防止程序多次执行时报错,因为如果输出目录已存在的话,程序执行时会报错;
- 创建job的第二个参数“wordcount”是作业名;
- 设置的map参数分别是map处理类,map输出的键值对类型,reduce参数同理;
- 以上所有的代码都是在WordCountApp一个类中的,最后会贴出总的代码。
至此,整个wordcount的例子就写完了,其实还是很简单的,主要是理解MapReduce的框架的思想和写法,以后再写别的例子的时候就可以举一反三了。下面就是运行过程了。
提交运行
-
在shell的
.bash_profile
文件中配置maven根目录,并将maven根目录下的bin目录加入到PATH变量中。这样shell中就可以使用mvn
命令打包了。 -
进入helloworld根目录,或者直接在IDEA最下面打开Terminal,输入
mvn clean package -DskipTests
。将代码打成jar包。
打包完成后会显示
BUILD SUCCESS
,并且会在项目目录下产生一个target
文件夹,jar包就放在这个地方。
-
将jar包放到服务器根目录上去,使用scp命令
scp helloworld-1.0.jar wangsheng@localhost:~/
。
-
运行,这一步是接在上一步进入localhost之后的,运行的命令格式为:
hadoop jar jar包地址 主程序完整类名 输入文件目录 输出目录
所以可以写出来命令为:
hadoop jar /Users/wangsheng/helloworld-1.0.jar com.wangsheng.hadoop.WordCountApp hdfs://localhost:8020/test/hello.txt hdfs://localhost:8020/test/output
如果运行成功,那么在output中会有以下文件,然后我们检验一下,hello.txt中的内容与wordcount计算的结果是否一致。
到这里wordcount的例子就完全完成了。
有意思的问题
前面我们提到,我全篇都没有看到一个按行处理的代码,为什么在map方法中value的值就是一行文本呢?而且怎么知道LongWritable
对应的就是文件的偏移量呢?
我们回头去看上一篇文章说MapReduce执行过程的那张图,文件经过InputFormat
后被拆分成了split,然后有个关键的步骤是通过RecordReader
进行读取,并且我们之前说InputFormat
里面有一个getRecordReader
方法,这个方法就是得到InputFormat
对应的RecordReader
。
于是我们从InputFormat
入手,InputFormat
是个抽象类,我们看看它有哪些实现类:
它的实现类对应各种不同的文件的处理,InputFormat
提供了不同实现类去处理,在这些子类中我们发现了一个TextInputFormat
类,顾名思义应该是处理文本文件的,进入这个类看一下:
我们发现这个类在获取RecordReader
的时候,返回了一个LineRecordReader
对象,这是不是跟我们的目的有些接近了,其实这个类就是从split中按行读取内容,而这个LineRecorderReader
是RecordReader
的实现类,所以针对不同的文件,有不同的InputFormat
实现类来处理拆分文件,同样从拆分后的split中读取内容也对应着不同的RecordReader
,在明白了这个原理后,那么如果我们想在文本文件中一次读取多行的需求是不是也就不难满足了,也就是说我们可以根据自己的策略定义RecordReader
的读取规则。
至于为什么LongWritable
可以作为文件的偏移量,这个在LineRecordReader
中的nextKeyValue
方法中已经很明显体现出来了:
这个小问题也是我在写博客的过程中偶然发现的,通过看源码的方式加深了对MapReduce框架的理解。
网友评论