Hadoop MapReduce 学习笔记

作者: LY丶Smile | 来源:发表于2018-07-02 20:07 被阅读22次

    前言

    本文是个人之前纪录的MapReduce学习笔记,主要涉及到MapReduce基本概念、Hadoop 经典示例WordCount的使用解析、hdfs与hbase的简单了解使用。现在整理了一下分享出来,希望对别人有所帮助。

    学习MapReduce一定要理解这种Map、Reduce的编程模型以及Mapper、Reducer数据处理的原理,否则只是一味的复制粘贴可能比较难上手。

    同时学习大数据的知识,一定要将自己对分布式的理解研究透彻。

    一、概念理解

    • MapReduce 是一种线性的可伸缩的编程模型,用于大规模数据集(大于1TB)的并行运算
    • 在MapReduce里,Map处理的是原始数据,每条数据之间互相没有关系(这一点一定要注意)。Reduce阶段,以key为标识,对同一个key下的value进行统计,类似{key,[value1,value2……]}
    • 可以把MapReduce理解为,把一堆杂乱无章的数据按照某种特征归纳起来,然后处理并得到最后的结果。
      Map面对的是杂乱无章的互不相关的数据,它解析每个数据,从中提取出key和value,也就是提取了数据的特征。
      经过MapReduce的Shuffle阶段之后,在Reduce阶段看到的都是已经归纳好的数据了,在此基础上我们可以做进一步的处理以便得到结果。
    • 缺点:不适用于实时计算,实时计算一般最低都是要求秒级响应的,MR很难满足这个要求,实时计算一般采用storm等流式计算系统
    • MapReduce计算流程


      MapReduce计算流程--来源网络

    二、编程模型

    • 每个应用程序称为一个作业(Job),每个Job是由一系列的Mapper和Reducer来完成
    • 任务过程分为两个阶段,map和reduce阶段,两个阶段都是使用键值对(key/value)作为输入输出的
    • 每个Mapper处理一个Split,每个split对应一个map线程。Split中的数据作为map的输入,map的输出一定在map端
    • Map方法:Map(k1,v1) -> list(k2,v2) ,并行应用于每一个输入的数据集,每一次调用都会产生一个(k2,v2)的队列 。
    • Reduce方法:Reduce(k2,list(v2)) -> list(k3,v3)。收集map端输出队列list(k2,v2)中有相同key的数据对,把它们聚集在一起,输出时形成目的数据 list(k3,v3)。
    • 新旧版本API的区别:
      • 新的api放在:org.apache.hadoop.mapreduce,旧版api放在:org.apache.hadoop.mapred
      • 新API使用虚类,旧版使用的是接口,虚类更加利于扩展

    三、运行机制

    1. 输入分片(input split)

      map计算之前,MapReduce会根据输入文件计算输入分片(input -> spliting),每个input split针对一个map任务。split存储的并不是数据,而是一个分片长度和一个记录数据的位置的数组

    2. map阶段

      map阶段的操作一般都是在数据存储节点上操作,所以有时候为了能够减轻数据传输的网络压力,可以先combiner阶段处理一下数据,在进行reduce

    3. combiner阶段

      此阶段是可选的,不是必须经过的一个阶段,combiner其实也是一种reduce操作,可以说combiner是一种本地化的reduce操作,是map运算的后续操作,可以减轻网络传输的压力。但是combiner的使用需要注意不要影响到reduce的最终结果,比如计算平均值的时候如果使用combiner就会影响最终的结果,但是计算总数的话则对最终结果没影响

    4. shuffle阶段

      将map的输出作为reduce的输入,这个过程就是shuffle,是MapReduce优化的重要阶段。

    5. reduce阶段

      reducer阶段,输入是shuffle阶段的输出,对每个不同的键和该键对应的值的数据流进行独立、并行的处理。

    四、WordCount--官方提供的example

    代码

    package com.smile.test;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class WordCount {
    
        private static final String INPUT_PATH = "/user/cdh/yjq/input/words.txt";
        //hdfs输出路径
        private static final String OUTPUT_PATH = "/user/cdh/yjq/output/";
        
        public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    
            private final static IntWritable one = new IntWritable(1);
            // Text 实现了BinaryComparable类可以作为key值
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    
                // 解析键值对
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
            
            public void reduce(Text key, Iterable<IntWritable> values, Context context)
                    throws IOException, InterruptedException {
                int sum = 0;
                
                for (IntWritable val : values) {
                    sum += val.get();
                }
    
                result.set(sum);
                context.write(key, result);
            }
        }
    
        @SuppressWarnings("deprecation")
        public static void main(String[] args) throws Exception {
    
            String[] paths = {INPUT_PATH,OUTPUT_PATH};
            //获得Configuration配置 Configuration: core-default.xml, core-site.xml 
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, paths).getRemainingArgs();
            if (otherArgs.length != 2) {
                System.err.println("Usage: wordcount <in> <out>");
                System.exit(2);
            }
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(WordCount.class);
            // 设置Mapper类
            job.setMapperClass(TokenizerMapper.class);
            // 设置Combiner类 
            job.setCombinerClass(IntSumReducer.class); 
            // 设置Reduce类
            job.setReducerClass(IntSumReducer.class); 
             // 设置输出key的类型,注意跟reduce的输出类型保持一致
            job.setOutputKeyClass(Text.class);
            // 设置输出value的类型,注意跟reduce的输出类型保持一致
            job.setOutputValueClass(IntWritable.class);
            // 设置输入路径
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
          // 设置输出路径
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    

    解析

    • MapReduce的输出路径一定要保证文件夹不存在,最好的解决方法时在代码中添加判断,执行之前删除output文件夹(具体方法见下面的hdfs操作

    • MapReduce可以没有输出,但必须设置输出路径

    • MapReduce的输入路径可以直接写hdfs的目录路径,然后放在集群下执行,

        hadoop jar **.jar java类名 参数1 参数2 ...
      
    • Mapper

        //map
        public void map(Object key, Text value, Context context)
      

      前面两个参数分别是输入的key,value,Context context可以记录输入的key和value,context也可以记录map运算的状态
      map中的context记录了map执行的上下文,在mapper类中,context可以存储一些job conf的信息,也就是说context是作为参数传递的载体。比如runner中configuration的set信息[conf.set(Str, strValue)],map中可以get到[context.getConfiguration().get(Str)]

        //setup
        protected void setup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
        //cleanup
        protected void cleanup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
      

      MapReduce框架内的setup和cleanup方法只会执行一次,所以一些相关变量或者是资源的初始化和释放最好是在setup中执行,如果放在map中执行,则在解析每一行数据的时候都会执行一次,严重影响程序运行效率。

    • Reducer

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
      

      reduce的输入也是key/value形式,不过是values,也就是一个key对应的一组value,例如key,value1;key,value2...
      reducer不是必须的,如果用不到reducer阶段可以不写

      reduce会接收到不同map传递过来的数据 ,并且每个map传递过来的数据都是有序的。如果reduce端接收到的数据量比较小,那么会存储在内存中,如果超出缓冲区大小一定比例,则会合并后写到磁盘上

    • 调用 runner

        Configuration conf = new Configuration();
        //连接hbase,操作hbase
        Configuration conf = HBaseConfiguration.create();
      

      MapReduce运行之前都要初始化Configuration,主要是读取MapReduce系统配置,如core-site.xml、hdfs-site.xml、mapred-site.xml、hbase-site.xml

        scan.setCaching(500); 
      

      增加缓存读取条数(一次RPC调用返回多行纪录,也就是每次从服务器端读取的行数),加快scanner读取速度,但耗费内存增加,设太大会响应慢、超时或者OOM。

        setBatch(int batch)
      

      设置获取纪录的列个数,默认无限制,也就是返回所有的列。实际上就是控制一次next()传输多少个columns,如batch为5表示每个result实例返回5个columns
      setBatch使用场景为,用客户端的scanner缓存进行批量交互从而提高性能时,非常大的行可能无法放入客户端的内存,这时需要用HBase客户端API中进行batching处理。

        scan.setCacheBlocks(false); 
    

    默认是true,分内存,缓存和磁盘,三种方式,一般数据的读取为内存->缓存->磁盘;

    setCacheBlocks不适合MapReduce工作:
    MR程序为非热点数据,不需要缓存,因为Blockcache is LRU,也就是最近最少访问算法(扔掉最少访问的),那么,前一个请求(比如map读取)读入Blockcache的所有记录在后一个请求(新的map读取)中都没有用,就必须全部被swap,那么RegionServer要不断的进行无意义的swapping data,也就是无意义的输入和输出BlockCache,增加了无必要的IO。而普通读取时局部查找,或者查找最热数据时,会有提升性能的帮助。

    runner方法中可以写定义多个job,job会顺序执行。

    五、常用hadoop fs命令 (类似Linux的文件操作命令,可类比学习使用)

    -help
    功能:输出这个命令参数手册
    
    -ls
    功能:显示目录信息
    示例: hadoop fs -ls /yjq
    
    -mkdir 
    功能:在hdfs上创建目录
    示例:hadoop fs -mkdir -p /yjq/test
    
    -moveFromLocal
    功能:从本地剪切粘贴到hdfs
    示例:hadoop fs -moveFromLocal /home/cdh/a.txt /yjq/test
    
    -moveToLocal
    功能:从hdfs剪切粘贴到本地
    示例:hadoop fs -moveToLocal /yjq/test/a.txt /home/cdh/ 
    
    -copyFromLocal
    功能:从本地文件系统中拷贝文件到hdfs路径去
    示例:hadoop fs -copyFromLocal /home/cdh/a.txt /yjq/test
    
    -copyToLocal
    功能:从hdfs拷贝到本地
    示例:hadoop fs -copyToLocal /yjq/test/a.txt /home/cdh/ 
    
    -get
    功能:等同于copyToLocal,从hdfs下载文件到本地路径(.表示当前路径)
    示例:hadoop fs -get /yjq/test/a.txt .
    
    -getmerge
    功能:合并下载多个文件
    示例:将目录下所有的TXT文件下载到本地,并合并成一个文件
    hadoop fs -getmerge /yjq/test/*.txt /home/cdh/test.txt
    
    -put
    功能:等同于copyFromLocal
    示例:hadoop fs -put /home/cdh/a.txt /yjq/test
    
    -cp
    功能:从hdfs的一个路径拷贝hdfs的另一个路径
    示例: hadoop fs -cp /yjq/test1/a.txt /yjq/test2/
    
    -mv
    功能:在hdfs目录中移动文件
    示例: hadoop fs -mv /yjq/test1/a.txt /yjq/test2/
    
    -appendToFile
    功能:追加一个文件到已经存在的文件末尾(本地文件追加到hdfs)
    示例:Hadoop fs -appendToFile /home/cdh/a.txt /yjq/test1/a.txt
    
    -cat
    功能:显示文件内容
    示例:hadoop fs -cat /yjq/test1/a.txt
    
    -tail
    功能:显示一个文件的末尾
    示例:hadoop fs -tail /yjq/test1/a.txt
    
    -text
    功能:以字符形式打印一个文件的内容
    示例:hadoop fs -text /yjq/test1/a.txt
    
    -chgrp、-chmod、-chown
    功能:修改文件所属权限 
    示例:
    hadoop fs -chmod 666 /yjq/test1/a.txt
    # cdh为用户名,hadoop为用户组
    hadoop fs -chown cdh:group /yjq/test1/a.txt
    
    -rm
    功能:删除文件或文件夹
    示例:hadoop fs -rm -r /yjq/test/a.txt
    
    -df
    功能:统计文件系统的可用空间信息
    示例:hadoop fs -df -h /
    
    -du
    功能:统计文件夹的大小信息
    示例:
    hadoop fs -du -s -h /yjq/*
    
    -count
    功能:统计一个指定目录下的文件节点数量
    示例:hadoop fs -count /yjq/
    

    六、HBase 相关操作

    1. 简介
      • HBase是一个分布式的、面向列的开源数据库
      • 表由行和列组成,列划分为多个列族/列簇(column family)
      • RowKey:是Byte array,是表中每条记录的“主键”,方便快速查找,Rowkey的设计非常重要。
      • Column Family:列族,拥有一个名称(string),包含一个或者多个相关列
      • Column:属于某一个columnfamily,familyName:columnName,每条记录可动态添加
      • Hbase--图片来源网络
    1. 编码

       Configuration conf = HBaseConfiguration.create();
      

      会自动读取hbase-site.xml配置文件

       Scan scan = new Scan();
       scan.setCaching(1000);
       scan.setStartRow(getBytes(startDate));
       scan.setStopRow(getBytes(endDate));
      
       TableMapReduceUtil.initTableMapperJob(HB_TABLE_NAME, scan, NewsStreamUrlMapper.class, Text.class, Text.class, job);
      

      参数:hbase table name,scan,mapper class,outputKeyClass,outputValueClass,job

    七、hdfs操作

    1. 运算之前清除hdfs上的文件夹

       FileSystem fs = FileSystem.get(new Configuration());
       Path outputDir = new Path(OUTPUT_PATH);
       //运算之前如果文件夹存在则清除文件夹
       if(fs.exists(outputDir))
           fs.delete(outputDir, true);
      
    2. HDFS读流程

      • 客户端向NameNode发起读数据请求
      • NameNode找出距离最近的DataNode节点信息
      • 客户端从DataNode分块下载文件
    3. HDFS写流程

      • 客户端向NameNode发起写数据请求
      • 分块写入DataNode节点,DataNode自动完成副本备份
      • DataNode向NameNode汇报存储完成,NameNode通知客户端

    八、多表操作

    MultiTableInputFormat 支持多个mapper的输出混合到一个shuffle,一个reducer,其中每个mapper拥有不同的inputFormat和mapper处理类。
    所有的mapper需要输出相同的数据类型,对于输出value,需要标记该value来源,以便reducer识别

    List<Scan> scans = new ArrayList<Scan>();  
    
    Scan scan1 = new Scan();  
    scan1.setCaching(100);  
    scan1.setCacheBlocks(false);  
    scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inTable.getBytes());  
    scans.add(scan1);  
      
    Scan scan2 = new Scan();  
    scan2.setCaching(100);  
    scan2.setCacheBlocks(false);  
    scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inPhoneImsiTable.getBytes());  
    scans.add(scan2);   
    
    TableMapReduceUtil.initTableMapperJob(scans, ReadHbaseMapper.class, Text.class,Result.class, job);
    

    九、错误处理

    1. ScannerTimeoutException:org.apache.hadoop.hbase.client.ScannerTimeoutException

      这是当从服务器传输数据到客户端的时间,或者客户端处理数据的时间大于了scanner设置的超时时间,scanner超时报错,可在客户端代码中设置超时时间

       Configuration conf = HBaseConfiguration.create()              
       conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,120000) 
      

      如果Mapper阶段对每条数据的处理时间过长,可以将scan.setCaching(1000)的值设置小一点,如果值设置太大,则处理时间会很长就会出现超时错误。

    写在最后

    很久之前写的学习笔记了,资料来源网络及项目组内的讨论,参考文献就不一一标注了,侵删~

    如果您觉得本文对您有帮助,点个赞吧~~

    相关文章

      网友评论

        本文标题:Hadoop MapReduce 学习笔记

        本文链接:https://www.haomeiwen.com/subject/kwtruftx.html