这里写得是,如果利用mapreduce分布式的计算框架来写一个单词计数的demo。比如说,给出一个文件,然后,输出是统计文件里面所有的单词出现的次数。
map 函数
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* KEYIN 读到的一行数据的偏移量 lONG
* KEYOUT 读到的一行数据的内容 STRING
* keyout 输出一个单词,是一个string的类型
* valueout int值
* hadoop 有自己的一套序列化的机制,他的序列化比jdk的序列化更加精简,
* 可以提高网络的传输效率
* long --》 longwritable
* string --》 text
* integer --》 intwritable
* null --> nullwritable
*/
public class worldcount extends Mapper<LongWritable,Text,Text,IntWritable>{
//重载Mapper类的map方法
//key 其实对应 keyin
// value 就是 keyout
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
// 将拿到的这行数据按照空格切分
String line=value.toString();
String [] linewords = line.split(" ");
for(String word:linewords){
// 所以在context里面写的内容就是 key:word,value 是1
context.write(new Text(word), new IntWritable(1));
}
}
}
reduce 函数
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class worldcountreduce extends Reducer <Text,IntWritable,Text,IntWritable> {
// 一组相同的key,调用一次reduce
//相当于调用一次 ,计算一个key对应的个数
protected void reduce (Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
//统计单词数
int count=0;
for(IntWritable value :values){
count=count+value.get();
}
//将输出的结果放到context 里面
context.write(key,new IntWritable(count));
}
}
jobclient 函数
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
*job 提交器 是yarn集群的一个客户端,他负责将mr程序需要的信息全部封装成一个配置文件里面
*然后连同我们mr程序所在的一个jar包,一起提交给yarn,有yarn去启动mr程序中的mrappmaster
*/
public class jobclient {
public static void main(String []args) throws IOException, ReflectiveOperationException, InterruptedException{
Configuration conf=new Configuration();
//conf.set("yarn.resoucemanager.hostname", value);
Job job=Job.getInstance(conf);
//job.setJar("~/code/WordCount.jar");
//告知客户端的提交器 mr程序所在的jar包
//这样就不必使用setjar 这样的方法了
job.setJarByClass(jobclient.class);
// 告知mrapp master ,map 和reduce 对应的实现类
job.setMapperClass(worldcount.class);
job.setReducerClass(worldcountreduce.class);
//告知输入,和输出的数据结构的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//告知mrappmaster 我们启动的reduce tash的数量
//启动maptask 的数量 是yarn 会自动的计算
job.setNumReduceTasks(3);
//指定一个目录而不是文件
FileInputFormat.setInputPaths(job, new Path("hdfs://localhost/wordcount/"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://localhost/wordcount/output/"));
// job.submit()
//这个要比job.submit 要好,因为这个client并不会在提交任务之后,就退出,而是创建一个线程去监控 map和reduce的运行
boolean res=job.waitForCompletion(true);
// 执行成功 状态吗 是0,执行失败 状态码是100
// 通过echo $? 显示状态码
System.out.println("wakakka ");
System.exit(res?0:100);
}
}
小结:
其实代码的业务逻辑很简单,并不是很复杂。但是刚刚开始学的时候,不会熟悉,如何在hadoop这样的环境下去运行代码,走了很多弯路。所以,我决定,写成一个每一步的截图的教程。
一个简明的Mapreduce 原理分析:
http://www.jianshu.com/p/6b6a42a0740c
Hadoop入门—基本原理简介:
http://www.jianshu.com/p/a8b08350960f
1.创建一个java项目
然后导入需要的jar包(都是关于hadoop的包),在dfs里面是使用了maven,可以很方便的就需要的包导入进来,但是也可以收到的去导入一下。
hadoop所有的jar包都在hadoop/share/目录下面
hdfs是关于文件系统的包,mapreduce是关于mpreduce计算框架的包
image.png
进入common目录下面,还有之目录lib下面的包
image.png
子目录lib下面的包
image.png
反正,如果为了不出上面错误,可以都导入进来。
image.png
-
step2 然后就可以完成写代码的任务了
image.png
image.png
image.png
step3 :然后将代码打包成一个jar 包
image.png image.png
执行jar包:(input文件已经上传到 dfs上面了)
image.png
image.png
image.png
image.png
image.png
查看输出的文件夹,查看输出结果,因为指定了reduce task ,所以可以看到结果是这样的,有三个输出文件:
mapreduce 的运行流程:
a. runjar 启动客户端
b. 启动mrappmaster (mr的 主控节点)
c. yarnchild 启动,maptask 节点
d. yarnchild 启动,reduce节点
e. 关闭yarnchild
f. 关闭runjar
执行程序的时候:
maptask的节点是程序自己顶的,无法设置。maptask的个数,取决于文件的个数和文件的大小。reducetask的节点是可以自己设定的。
网友评论