浅谈Hadoop

作者: 一只小哈 | 来源:发表于2016-07-02 11:19 被阅读711次

    Hadoop是一个十分流行的分布式存储和计算框架,也是业内大数据处理和分析最通用的框架之一。


    hadoop_icon.png

    Hadoop2.0 由HDFS(Hadoop Distributed File System)、MapReduce和Yarn三部分组成。
    Hadoop的设计原型来源于google的三篇论文,即GFS、MapReduce和BigTable,同时作为Lucene的子项目Nutch的一部分在2005年引入Apache,Hadoop的得名是Hadoop之父Doug Cutting儿子的玩具,值得一提的是他的妻子叫Lucene。
    Hadoop生态

    hbase生态.png

    "永远不把鸡蛋放在一个篮子里"——HDFS

    在分布式的文件系统之前,往往使用大型机和存储服务器去做存储,无论大型机和存储服务器都是十分昂贵的,同时也是有瓶颈的,横向扩展能力很差。而分布式文件系统的横向扩展能力以及容错性十分好,也越来越受到人们的青睐。HDFS的定位是用比较廉价的机器,做高可用的海量数据的存储。主要采用多副本的分块存储机制,在部分机器宕机或数据损坏的情况下,依然能提供可靠服务。

    1. 集群拓扑
    hdfs.jpg
    • NameNode:文件元数据存储节点,Hadoop1中存在单点问题,Hadoop2中通过备用节点实现高可用,同时有元数据存储瓶颈,不适于存储小文件。

    • DataNode:数据节点,单文件被分成多块,每一块多副本跨机架、机房存储,保证数据的高可用。

    • Block:文件的存储单位,单个文件可被分成多块,默认为128M。同时也是MapReduce默认的输入块大小。
      文件读取流程:

    hdfsread.png

    open DistributedFileSystem 去NameNode获取文件的块列表,NameNode根据Client距离各节点的网络距离给出Block列表。
    根据Block列表去一次读取文件,读取后在Client进行文件汇总。

    1. 写入流程:
    hdfs_write.png
    • 实例化DistributeFileSystem,在NameNode进行写文件的申请,申请成功后创建元数据,并返回数据存放的位置信息。

    • 过位置信息,对DataNode进行流式写入,将数据分成多个数据包,作为一个数据队列。写入时每次取一个数据包,写入全部副本,且三个节点均写入成功,则返回ACK信号表名当前buff写入成功,Client内部维护着数据包的Ack队列,收到Ack之后会移除这个数据包。

    • 最后想NameNode发送Complete信号,确认文件写入全部成功。
      如果中途节点写失败:写入部分数据的节点将在管线中移除,同时后续恢复正常后会删除这部分数据。随后写入后续两个节点,原则上写入一个块的时候就可以写成功,因为namenode发现数据不一致会做复制操作。

    "分而治之"——MapReduce###

    </br>
    分布式计算出现之前,数据的计算往往依靠性能比较好的单机计算。但是单机受限于本身的计算资源,往往计算速度都不如人意。
    一天小明接到产品的一个需求:
    产品:小明啊,这里有一天的日志信息,大概5个G,我要统计一下一共有多少。
    小明:OK啊,就5个G,一个shell搞定,看我 cat * | wc -l,我简直就是个天才。
    产品:对不起啊小明,需求变了,一天的看不出来效果,我需要统计1个月的数据,大概有150G。
    小明:有点大啊,不怕,我线上服务器内存120G,40核,看我用多线程搞定,过了2个小时,终于搞 定了还有点费劲。
    产品:我保证这是我最后一次变更需求,我想要最近一年的数据1800G左右。
    小明:数据上T了,搞不定了啊。
    上面的例子告诉我们,在大数量的场景下,高性能的单机有时也是解决不了问题。所以我们就需要MapReduce帮助我们。
    </br>
    MapReduce是一种采用分治和规约的一种并行的批处理框架,先将数据做分割计算,最后汇总结果。看上去和多线程的处理机制一样,但是Hadoop将它封装在了框架中,编程十分简单,吞吐量十分高,目前支持Java、C++、Python等多种API编程。
    1.MapReduce运行模型总体概览:

    mapreduceAllGraph.png
    • InputSplit: InputSplit是单个map任务的输入文件片,默认文件的一个block。

    • Map函数:数据处理逻辑的主体,用开发者开发。

    • Partition:map的结果发送到相应的reduce。

    • Combain:reduce之前进行一次预合并,减小网络IO。当然,部分场景不适合。

    • Shuffle:map输出数据按照Partition分发到各个reduce。

    *reduce:将不同map汇总来的数据做reduce逻辑。

    2.多reduce:

    datatrans.png

    3.经典wordcount:

    wordcountdatatrans.png mapreducedataStream.png

    4.Map类的实现:

    • 必须继承org.apache.hadoop.mapreduce.Mapper 类

    • map()函数,对于每一个输入K/V都会调用一次map函数,逻辑实现(必须)。

    • setup()函数,在task开始前调用一次,做maptask的一些初始化工作,如连接数据库、加载配置(可选)。

    • cleanup()函数,在task结束前调用一次,做maptask的收尾清理工作,如批处理的收尾,关闭连接等(可选)

    • Context上下文环境对象,包含task相关的配置、属性和状态等。

    5.Reduce类的实现:

    • 必须继承org.apache.hadoop.mapreduce.Reducer类。

    • reduce(key, Iterable<>values,Context context)对于每一个key值调用一次reduce函数。

    • setup():在task开始前调用一次,做reducetask的一些初始化工作。

    • cleanup():在task结束时调用一次,做reducetask的收尾清理工作。

    6.作业整体配置:

    • 参数解析:String[]otherArgs= new GenericOptionsParser(conf, args).getRemainingArgs();

    • 创建job: Jobjob= Job.getInstance(conf, "word count");

    • 设置map类,reduce类。

    • 设置map和reduce输出的KV类型,二者输出类型一致的话则可以只设置Reduce的输出类型。

    • 设置reduce的个数 :默认为1,综合考虑,建议单个reduce处理数据量<10G。不想启用reduce设置为0即可。

    • 设置InputFormat

    • 设置OutputFormat

    • 设置输入,输出路径。

    • job.waitForCompletion(true) (同步提交)和job.submit()(异步提交)

    wordcount:
    public class WordCountTask { private static final Logger logger = Logger.getLogger(WordCountTask.class); public static class WordCountMap extends Mapper<Object, Text, Text, IntWritable>{ private static final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void cleanup(Context context) throws IOException, InterruptedException { logger.info("mapTaskEnd....."); } protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(this.word, one); } } protected void setup(Context context) throws IOException, InterruptedException { logger.info("mapTaskStart....."); } } public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } this.result.set(sum); context.write(key, this.result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCountTask.class); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileSystem fs = FileSystem.get(conf); for (int i = 0; i < otherArgs.length - 1; i++) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } if(fs.exists(new Path(otherArgs[otherArgs.length - 1]))){ fs.delete(new Path(otherArgs[otherArgs.length - 1])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)])); job.setNumReduceTasks(1); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

    6.提交:
    ​hadoop jar hadoop-examples.jar demo.wordcount(主类名) Dmapreduce.job.queuename=XX(系统参数) input output
    ​缺点:无定时调度

    1. 常用的InputFormat:
    • TextInputFormat key:行偏移 value:文本内容,split计算:splitSize=max("mapred.min.split.size",min("mapred.max.split.size",blockSize)) mapred.min.split.size 在大量文本输入的情况下,需要控制map的数量,可以调此选项。

    • CombineTextInputFormat(集群默认),多个小文件分片送到一个map中处理,主要解决多个小文件消耗map资源的问题。

    • sequenceFileInputFormat,采用自己的序列化方式,通常文件名为key,value为文件内容,可在存储上解决小文件对namenode的影响。

    相关文章

      网友评论

        本文标题:浅谈Hadoop

        本文链接:https://www.haomeiwen.com/subject/dijqjttx.html