大家好,我是Iggi。
今天我给大家分享的是MapReduce2-3.1.1版本的Word Count Ver1.0实验示例。
首先用一段文字简介MapReduce:
MapReduce最早是由Google公司研究提出的一种面向大规模数据处理的并行计算模型和方法。Google公司设计MapReduce的初衷主要是为了解决其搜索引擎中大规模网页数据的并行化处理。Google公司发明了MapReduce之后首先用其重新改写了其搜索引擎中的Web文档索引处理系统。但由于MapReduce可以普遍应用于很多大规模数据的计算问题,因此自发明MapReduce以后,Google公司内部进一步将其广泛应用于很多大规模数据处理问题。到目前为止,Google公司内有上万个各种不同的算法问题和程序都使用MapReduce进行处理。
2004年,开源项目Lucene(搜索索引程序库)和Nutch(搜索引擎)的创始人Doug Cutting发现MapReduce正是其所需要的解决大规模Web数据处理的重要技术,因而模仿Google MapReduce,基于Java设计开发了一个称为Hadoop的开源MapReduce并行计算框架和系统。自此,Hadoop成为Apache开源组织下最重要的项目,自其推出后很快得到了全球学术界和工业界的普遍关注,并得到推广和普及应用。
MapReduce的推出给大数据并行处理带来了巨大的革命性影响,使其已经成为事实上的大数据处理的工业标准。尽管MapReduce还有很多局限性,但人们普遍公认,MapReduce是到目前为止最为成功、最广为接受和最易于使用的大数据并行处理技术。MapReduce的发展普及和带来的巨大影响远远超出了发明者和开源社区当初的意料,以至于马里兰大学教授、2010年出版的《Data-Intensive Text Processing with MapReduce》一书的作者Jimmy Lin在书中提出:MapReduce改变了我们组织大规模计算的方式,它代表了第一个有别于冯·诺依曼结构的计算模型,是在集群规模而非单个机器上组织大规模计算的新的抽象模型上的第一个重大突破,是到目前为止所见到的最为成功的基于大规模计算资源的计算模型。
MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:
1)MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。
2)MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
3)MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理。
image.png好,下面进入正题。介绍Java操作MapReduce2组件完成Word Count Ver2.0的操作。
首先,使用IDE建立Maven工程,建立工程时没有特殊说明,按照向导提示点击完成即可。重要的是在pom.xml文件中添加依赖包,内容如下图:
image.png待系统下载好依赖的jar包后便可以编写程序了。
展示实验代码:
package linose.mapreduce;
import java.io.IOException;
import java.io.OutputStreamWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
//import org.apache.log4j.BasicConfigurator;
/**
* Hello MapReduce!
* Word Count V1.0
* 本示例演示如何使用MapReduce组件统计单词出现的个数
* 关于示例中出现的API方法可以参考如下连接:http://hadoop.apache.org/docs/r3.1.1/api/index.html
*/
public class AppVer1
{
public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException
{
/**
* 设定MapReduce示例拥有HDFS的操作权限
*/
System.setProperty("HADOOP_USER_NAME", "hdfs");
/**
* 为了清楚的看到输出结果,暂将集群调试信息缺省。
* 如果想查阅集群调试信息,取消注释即可。
*/
//BasicConfigurator.configure();
/**
* MapReude实验准备阶段:
* 定义HDFS文件路径
*/
String defaultFS = "hdfs://master2.linose.cloud.beijing.com:8020";
String inputPath = defaultFS + "/index.dirs/input.txt";
String outputPath = defaultFS + "/index.dirs/output";
/**
* 生产配置,并获取HDFS对象
*/
Configuration conf = new Configuration();
conf.set("fs.defaultFS", defaultFS);
FileSystem system = FileSystem.get(conf);
/**
* 定义输入路径,输出路径
*/
Path inputHdfsPath = new Path(inputPath);
Path outputHdfsPath = new Path(outputPath);
/**
* 如果实验数据文件不存在则创建数据文件
*/
if (!system.exists(inputHdfsPath)) {
FSDataOutputStream inputStream = system.create(inputHdfsPath);
OutputStreamWriter outputStream = new OutputStreamWriter(inputStream);
outputStream.write("芒果 菠萝 西瓜 橘子 草莓 \n");
outputStream.write("草莓 橘子 苹果 荔枝 蓝莓 \n");
outputStream.write("天天 菇娘 释迦 软枣子 癞瓜 蛇皮果 \n");
outputStream.write("香蕉 菠萝 鸭梨 柚子 苹果 \n");
outputStream.write("草莓 橘子 桂圆 荔枝 香蕉 \n");
outputStream.write("苹果 菠萝 草莓 弥猴桃 芒果 \n");
outputStream.write("苹果 香蕉 提子 橘子 菠萝 \n");
outputStream.write("西瓜 苹果 香蕉 橙子 提子 \n");
outputStream.write("香蕉 鸭梨 西瓜 葡萄 芒果 \n");
outputStream.write("苹果 樱桃 香蕉 葡萄 橘子 \n");
outputStream.write("西瓜 葡萄 桃 车厘子 香蕉 榴莲 瓜 火龙果 荔枝 \n");
outputStream.close();
inputStream.close();
}
/**
* 如果实验结果目录存在,遍历文件内容全部删除
*/
if (system.exists(outputHdfsPath)) {
RemoteIterator<LocatedFileStatus> fsIterator = system.listFiles(outputHdfsPath, true);
LocatedFileStatus fileStatus;
while (fsIterator.hasNext()) {
fileStatus = fsIterator.next();
system.delete(fileStatus.getPath(), false);
}
system.delete(outputHdfsPath, false);
}
/**
* 创建MapReduce任务并设定Job名称
*/
JobConf jobConf = new JobConf(conf, WordCountVer1.class);
jobConf.setJobName("Word Count Ver1.0:");
/**
* 指定输入输出的默认格式类
*/
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
/**
* 设置输入文件与输出文件
*/
FileInputFormat.setInputPaths(jobConf, inputHdfsPath);
FileOutputFormat.setOutputPath(jobConf, outputHdfsPath);
/**
* 指定Reduce类输出类型Key类型与Value类型
*/
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
/**
* 指定自定义Map类,Reduce类,开启Combiner函数。
*/
jobConf.setMapperClass(WordCountVer1.Map.class);
jobConf.setCombinerClass(WordCountVer1.Reduce.class);
jobConf.setReducerClass(WordCountVer1.Reduce.class);
/**
* 提交作业
*/
RunningJob run = JobClient.runJob(jobConf);
/**
* 然后轮询进度,直到作业完成。
*/
float progress = 0.0f;
do {
progress = run.setupProgress();
System.out.println("Word Count Ver1.0: 的当前进度:" + progress * 100);
Thread.sleep(1000);
} while (progress != 1.0f && !run.isComplete());
/**
* 如果成功,查看输出文件内容
*/
if (run.isSuccessful()) {
RemoteIterator<LocatedFileStatus> fsIterator = system.listFiles(outputHdfsPath, true);
LocatedFileStatus fileStatus;
while (fsIterator.hasNext()) {
fileStatus = fsIterator.next();
FSDataInputStream outputStream = system.open(fileStatus.getPath());
IOUtils.copyBytes(outputStream, System.out, conf, false);
outputStream.close();
System.out.println("--------------------------------------------");
}
}
}
}
展示MapReduce2-3.1.1组件编写Word Count Ver1.0测试类:
package linose.mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* WordCount示例
* @author Iggi
*
*/
public class WordCountVer1 {
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter report) throws IOException {
int counter = 0;
while(values.hasNext()) {
counter += values.next().get();
}
output.collect(key, new IntWritable(counter));
}
}
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter report) throws IOException {
String[] words = value.toString().split(" ");
for(String word: words){
output.collect(new Text(word), new IntWritable(1));
}
}
}
}
下图为测试结果:
image.png
至此,MapReduce2-3.1.1 Word Count Ver1.0 实验示例演示完毕。
网友评论