Hadoop是一个十分流行的分布式存储和计算框架,也是业内大数据处理和分析最通用的框架之一。
hadoop_icon.png
Hadoop2.0 由HDFS(Hadoop Distributed File System)、MapReduce和Yarn三部分组成。
Hadoop的设计原型来源于google的三篇论文,即GFS、MapReduce和BigTable,同时作为Lucene的子项目Nutch的一部分在2005年引入Apache,Hadoop的得名是Hadoop之父Doug Cutting儿子的玩具,值得一提的是他的妻子叫Lucene。
Hadoop生态
"永远不把鸡蛋放在一个篮子里"——HDFS
在分布式的文件系统之前,往往使用大型机和存储服务器去做存储,无论大型机和存储服务器都是十分昂贵的,同时也是有瓶颈的,横向扩展能力很差。而分布式文件系统的横向扩展能力以及容错性十分好,也越来越受到人们的青睐。HDFS的定位是用比较廉价的机器,做高可用的海量数据的存储。主要采用多副本的分块存储机制,在部分机器宕机或数据损坏的情况下,依然能提供可靠服务。
- 集群拓扑
-
NameNode:文件元数据存储节点,Hadoop1中存在单点问题,Hadoop2中通过备用节点实现高可用,同时有元数据存储瓶颈,不适于存储小文件。
-
DataNode:数据节点,单文件被分成多块,每一块多副本跨机架、机房存储,保证数据的高可用。
-
Block:文件的存储单位,单个文件可被分成多块,默认为128M。同时也是MapReduce默认的输入块大小。
文件读取流程:
open DistributedFileSystem 去NameNode获取文件的块列表,NameNode根据Client距离各节点的网络距离给出Block列表。
根据Block列表去一次读取文件,读取后在Client进行文件汇总。
- 写入流程:
-
实例化DistributeFileSystem,在NameNode进行写文件的申请,申请成功后创建元数据,并返回数据存放的位置信息。
-
过位置信息,对DataNode进行流式写入,将数据分成多个数据包,作为一个数据队列。写入时每次取一个数据包,写入全部副本,且三个节点均写入成功,则返回ACK信号表名当前buff写入成功,Client内部维护着数据包的Ack队列,收到Ack之后会移除这个数据包。
-
最后想NameNode发送Complete信号,确认文件写入全部成功。
如果中途节点写失败:写入部分数据的节点将在管线中移除,同时后续恢复正常后会删除这部分数据。随后写入后续两个节点,原则上写入一个块的时候就可以写成功,因为namenode发现数据不一致会做复制操作。
"分而治之"——MapReduce###
</br>
分布式计算出现之前,数据的计算往往依靠性能比较好的单机计算。但是单机受限于本身的计算资源,往往计算速度都不如人意。
一天小明接到产品的一个需求:
产品:小明啊,这里有一天的日志信息,大概5个G,我要统计一下一共有多少。
小明:OK啊,就5个G,一个shell搞定,看我 cat * | wc -l,我简直就是个天才。
产品:对不起啊小明,需求变了,一天的看不出来效果,我需要统计1个月的数据,大概有150G。
小明:有点大啊,不怕,我线上服务器内存120G,40核,看我用多线程搞定,过了2个小时,终于搞 定了还有点费劲。
产品:我保证这是我最后一次变更需求,我想要最近一年的数据1800G左右。
小明:数据上T了,搞不定了啊。
上面的例子告诉我们,在大数量的场景下,高性能的单机有时也是解决不了问题。所以我们就需要MapReduce帮助我们。
</br>
MapReduce是一种采用分治和规约的一种并行的批处理框架,先将数据做分割计算,最后汇总结果。看上去和多线程的处理机制一样,但是Hadoop将它封装在了框架中,编程十分简单,吞吐量十分高,目前支持Java、C++、Python等多种API编程。
1.MapReduce运行模型总体概览:
-
InputSplit: InputSplit是单个map任务的输入文件片,默认文件的一个block。
-
Map函数:数据处理逻辑的主体,用开发者开发。
-
Partition:map的结果发送到相应的reduce。
-
Combain:reduce之前进行一次预合并,减小网络IO。当然,部分场景不适合。
-
Shuffle:map输出数据按照Partition分发到各个reduce。
*reduce:将不同map汇总来的数据做reduce逻辑。
2.多reduce:
datatrans.png3.经典wordcount:
wordcountdatatrans.png mapreducedataStream.png4.Map类的实现:
-
必须继承org.apache.hadoop.mapreduce.Mapper 类
-
map()函数,对于每一个输入K/V都会调用一次map函数,逻辑实现(必须)。
-
setup()函数,在task开始前调用一次,做maptask的一些初始化工作,如连接数据库、加载配置(可选)。
-
cleanup()函数,在task结束前调用一次,做maptask的收尾清理工作,如批处理的收尾,关闭连接等(可选)
-
Context上下文环境对象,包含task相关的配置、属性和状态等。
5.Reduce类的实现:
-
必须继承org.apache.hadoop.mapreduce.Reducer类。
-
reduce(key, Iterable<>values,Context context)对于每一个key值调用一次reduce函数。
-
setup():在task开始前调用一次,做reducetask的一些初始化工作。
-
cleanup():在task结束时调用一次,做reducetask的收尾清理工作。
6.作业整体配置:
-
参数解析:String[]otherArgs= new GenericOptionsParser(conf, args).getRemainingArgs();
-
创建job: Jobjob= Job.getInstance(conf, "word count");
-
设置map类,reduce类。
-
设置map和reduce输出的KV类型,二者输出类型一致的话则可以只设置Reduce的输出类型。
-
设置reduce的个数 :默认为1,综合考虑,建议单个reduce处理数据量<10G。不想启用reduce设置为0即可。
-
设置InputFormat
-
设置OutputFormat
-
设置输入,输出路径。
-
job.waitForCompletion(true) (同步提交)和job.submit()(异步提交)
wordcount:
public class WordCountTask { private static final Logger logger = Logger.getLogger(WordCountTask.class); public static class WordCountMap extends Mapper<Object, Text, Text, IntWritable>{ private static final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void cleanup(Context context) throws IOException, InterruptedException { logger.info("mapTaskEnd....."); } protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(this.word, one); } } protected void setup(Context context) throws IOException, InterruptedException { logger.info("mapTaskStart....."); } } public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } this.result.set(sum); context.write(key, this.result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCountTask.class); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileSystem fs = FileSystem.get(conf); for (int i = 0; i < otherArgs.length - 1; i++) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } if(fs.exists(new Path(otherArgs[otherArgs.length - 1]))){ fs.delete(new Path(otherArgs[otherArgs.length - 1])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)])); job.setNumReduceTasks(1); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
6.提交:
hadoop jar hadoop-examples.jar demo.wordcount(主类名) Dmapreduce.job.queuename=XX(系统参数) input output
缺点:无定时调度
- 常用的InputFormat:
-
TextInputFormat key:行偏移 value:文本内容,split计算:splitSize=max("mapred.min.split.size",min("mapred.max.split.size",blockSize)) mapred.min.split.size 在大量文本输入的情况下,需要控制map的数量,可以调此选项。
-
CombineTextInputFormat(集群默认),多个小文件分片送到一个map中处理,主要解决多个小文件消耗map资源的问题。
-
sequenceFileInputFormat,采用自己的序列化方式,通常文件名为key,value为文件内容,可在存储上解决小文件对namenode的影响。
网友评论