美文网首页
Shell脚本实现MapReduce统计单词数程序

Shell脚本实现MapReduce统计单词数程序

作者: 鹅鹅鹅_ | 来源:发表于2019-01-02 09:55 被阅读0次

    一、原理介绍


    • 概述
      Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:
      采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer)
    $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper cat \
    -reducer wc
    
    • Hadoop Streaming原理
      Streaming的原理是用Java实现一个包装用户程序的MapReduce程序,该程序负责调用MapReduce Java接口获取key/value对输入,创建一个新的进程启动包装的用户程序,将数据通过管道传递给包装的用户程序处理,然后调用MapReduce Java接口将用户程序的输出切分成key/value对输出。
      mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。
      如果一个文件(可执行或者脚本)作为mapper,mapper初始化时,每一个mapper任务会把该文件作为一个单独进程启动,mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。如果没有tab,整行作为key值,value值为null。
      对于reducer,类似。
      以上是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。
    • Streaming优点
    1. 发效率高,便于移植。只要按照标准输入输出格式进行编程,就可以满足hadoop要求。因此单机程序稍加改动就可以在集群上进行使用。 同样便于测试-只要按照 cat input | mapper | sort | reducer > output 进行单机测试即可。如果单机测试通过,大多数情况是可以在集群上成功运行的,只要控制好内存就好了。

    2. 提高程序效率-有些程序对内存要求较高,如果用java控制内存毕竟不如C/C++。

    • Streaming不足
      1.Streaming中的mapper和reducer默认只能向标准输出写数据,不能方便地处理多路输出

    • Hadoop Streaming用法

    Usage: $HADOOP_HOME/bin/hadoop jar \
    $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar [options]
    

    options:
    (1)-input:输入文件路径
    (2)-output:输出文件路径
    (3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本
    (4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本
    (5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。
    (6)-partitioner:用户自定义的partitioner程序
    (7)-combiner:用户自定义的combiner程序(必须用java实现)
    (8)-D:作业的一些属性(以前用的是-jonconf),具体有:
    1)mapred.map.tasks:map task数目
    2)mapred.reduce.tasks:reduce task数目
    3)stream.map.input.field.separator/stream.map.output.field.separator: map task输入/输出数
    据的分隔符,默认均为\t。
    4)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
    5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task输入/输出数据的分隔符,默认均为\t。
    6)stream.num.reduce.output.key.fields:指定reduce task输出记录中key所占的域数目
    另外,Hadoop本身还自带一些好用的Mapper和Reducer:
    (1) Hadoop聚集功能
    Aggregate提供一个特殊的reducer类和一个特殊的combiner类,并且有一系列的“聚合器”(例如“sum”,“max”,“min”等)用于聚合一组value的序列。用户可以使用Aggregate定义一个mapper插件类,这个类用于为mapper输入的每个key/value对产生“可聚合项”。Combiner/reducer利用适当的聚合器聚合这些可聚合项。要使用Aggregate,只需指定“-reducer aggregate”。
    (2)字段的选取(类似于Unix中的‘cut’)
    Hadoop的工具类org.apache.hadoop.mapred.lib.FieldSelectionMapReduc帮助用户高效处理文本数据,就像unix中的“cut”工具。工具类中的map函数把输入的key/value对看作字段的列表。 用户可以指定字段的分隔符(默认是tab),可以选择字段列表中任意一段(由列表中一个或多个字段组成)作为map输出的key或者value。 同样,工具类中的reduce函数也把输入的key/value对看作字段的列表,用户可以选取任意一段作为reduce输出的key或value。

    二、shell脚本实现实例


    • 首先准备测试文件
      用下面的脚本生成NG大小的test.txt
    #! /bin/sh
    while [ "1" == "1" ]
    do
        echo "noe two three" >> test.txt
    done
    
    • 将测试文件上传到HDFS文件系统
    [hadoop@master ~]$ hdfs dfs -mkdir /test
    [hadoop@master ~]$ hdfs dfs -put test.txt /test/
    [hadoop@master ~]$ hdfs dfs -ls /test
    Found 1 items
    -rw-r--r--   2 hadoop supergroup 4880841000 2017-03-20 10:45 /test/test.txt
    
    • mapper脚本代码
    #! /bin/bash
    while read LINE; do
      for word in $LINE
      do
        #-e使得\t转义(escape)为tab
        echo -e "$word\t1"
      done
    done
    
    • reducer脚本代码
    #! /bin/sh
    count=0
    started=0
    word=""
    while read LINE;do
      newword=`echo $LINE | cut -d ' '  -f 1`
      if [ "$word" != "$newword" ];then
        [ $started -ne 0 ] && echo -e "$word\t$count"
        word=$newword
        count=1
        started=1
      else
        count=$(( $count + 1 ))
      fi
    done
    echo -e "$word\t$count"
    
    • 执行任务
    hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -input test -output output2 -mapper mapper.sh -reducer reducer.sh -file mapper.sh -file reducer.sh
    
    • 可以在slave节点上看到相关的进程,这些mapper进程在master节点上是不存在的
      PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                           
     7822 hadoop    20   0  103m 1628 1076 R 100.0  0.0   2:53.34 mapper.sh                                                         
     7819 hadoop    20   0  103m 1632 1076 R 100.0  0.0   2:53.33 mapper.sh                                                         
     7836 hadoop    20   0  103m 1636 1076 R 100.0  0.0   2:53.21 mapper.sh                                                         
     7833 hadoop    20   0  103m 1628 1076 R 100.0  0.0   2:53.23 mapper.sh                                                         
    29993 root      20   0  164g 346m  10m R 100.2  0.3 113:27.07 gpu_executor                                                      
     7653 hadoop    20   0  912m 224m  18m S 81.4  0.2   2:34.85 java                                                               
     7650 hadoop    20   0  899m 229m  18m S 80.4  0.2   2:33.40 java                                                               
     7651 hadoop    20   0  922m 235m  18m S 79.4  0.2   2:32.60 java                                                               
     7652 hadoop    20   0  915m 237m  18m S 79.1  0.2   2:31.61 java                                                               
    28961 root      20   0  564m   9m 6252 S 18.8  0.0  23:43.42 TaskTracker                                                        
    31606 hadoop    20   0 1838m 338m  18m S  4.3  0.3   0:25.58 java                                                               
     6102 liuhao    20   0  931m  28m  18m S  1.6  0.0 132:25.21 knotify4      
    
    • map完成后,reduce阶段会看到reduce进程
      PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                           
    29993 root      20   0  164g 346m  10m R 97.7  0.3 222:36.66 gpu_executor                                                       
    28961 root      20   0  564m   9m 6252 S 19.7  0.0  46:10.45 TaskTracker                                                        
    17016 hadoop    20   0  103m 1664 1100 S 11.8  0.0   0:33.40 reducer.sh 
    
    • 相关问题
    1. 首先遇到“找不到或无法加载类hadoop-streaming-2.7.3.jar”的问题,后来发现是因为命令丢了“jar”这个参数
    2. 然后又遇到job发不下去,一直卡在
    INFO mapreduce.Job: Running job: job_1489999749396_0004
    

    后来将slaves节点的hostname也修正为IP映射表内对应的名字,解决?

    1. 运行阶段卡在reduce
    17/03/20 19:50:29 INFO mapreduce.Job:  map 74% reduce 0%
    17/03/20 19:50:30 INFO mapreduce.Job:  map 77% reduce 0%
    17/03/20 19:50:33 INFO mapreduce.Job:  map 84% reduce 0%
    17/03/20 19:50:36 INFO mapreduce.Job:  map 91% reduce 0%
    17/03/20 19:50:39 INFO mapreduce.Job:  map 97% reduce 0%
    17/03/20 19:50:40 INFO mapreduce.Job:  map 98% reduce 0%
    17/03/20 19:50:41 INFO mapreduce.Job:  map 100% reduce 0%
    17/03/20 19:50:50 INFO mapreduce.Job:  map 100% reduce 67%
    
    

    根据一位外国友人的说明,在reduce阶段 ,0-33%阶段是 shuffle 阶段,就是根据键值 来讲本条记录发送到指定的reduce,这个阶段应该是在map还没有完全完成的时候就已经开始了,因为我们会看到map在执行到一个百分比后reduce也启动了,这样做也提高了程序的执行效率。
    34%-65%阶段是sort阶段,就是reduce根据收到的键值进行排序。map阶段也会发生排序,map的输出结果是以键值为顺序排序后输出,可以通过只有map阶段处理的输出来验证(以前自己验证过,貌似确有这么回事,大家自己再验证下,免得我误人子弟啊)。
    66%-100%阶段是处理阶段,这个阶段才是真正的处理阶段,如果程序卡在这里,估计就是你的reduce程序有问题了。
    索性等了一晚上,第二天终于有动静了

    17/03/21 07:01:53 INFO mapreduce.Job:  map 100% reduce 77%
    

    和上面的记录对比发现,从%67到%77用了11个小时!这明显是reduce程序效率太慢了。也可能是数据倾斜问题。中间也试过增加reducer的数量,但无果。最终我索性减少了输入文件的行数,使其只有三行:

    one two three
    one two three
    one two three
    

    然后重新运行程序,瞬间得到了结果:

    [hadoop@master ~]$ hdfs dfs -cat output2/part-00002
    one\t3  
    three\t3    
    two\t3  
    

    可见,结果是正确的。


    Python版本

    • mapper脚本
    #!/usr/bin/env python
     
    import sys
     
    # input comes from STDIN (standard input)
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
        # split the line into words while removing any empty strings
        words = filter(lambda word: word, line.split())
        # increase counters
        for word in words:
            # write the results to STDOUT (standard output);
            # what we output here will be the input for the
            # Reduce step, i.e. the input for reducer.py
            #
            # tab-delimited; the trivial word count is 1
            print '%s\t%s' % (word, 1)
    
    • reducer脚本
    #!/usr/bin/env python
     
    from operator import itemgetter
    import sys
     
    # maps words to their counts
    word2count = {}
     
    # input comes from STDIN
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
     
        # parse the input we got from mapper.py
        word, count = line.split()
        # convert count (currently a string) to int
        try:
            count = int(count)
            word2count[word] = word2count.get(word, 0) + count
        except ValueError:
            # count was not a number, so silently
            # ignore/discard this line
            pass
     
    # sort the words lexigraphically;
    #
    # this step is NOT required, we just do it so that our
    # final output will look more like the official Hadoop
    # word count examples
    sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
     
    # write the results to STDOUT (standard output)
    for word, count in sorted_word2count:
        print '%s\t%s'% (word, count)
    
    • 单机测试
    [hadoop@master ~]$ echo "one two three" | ./mapper.py | sort | ./reducer.py 
    one 1
    three   1
    two 1
    
    • 运行
    [hadoop@master ~]$ hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=3 -input test -output output3 -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py 
    

    令人诧异的是很快就执行完了,难道真的是shell脚本不适合做类似统计这样的事情吗?

    [hadoop@master ~]$ hdfs dfs -ls output3
    Found 4 items
    -rw-r--r--   2 hadoop supergroup          0 2017-03-22 10:12 output3/_SUCCESS
    -rw-r--r--   2 hadoop supergroup         14 2017-03-22 10:12 output3/part-00000
    -rw-r--r--   2 hadoop supergroup          0 2017-03-22 10:12 output3/part-00001
    -rw-r--r--   2 hadoop supergroup         24 2017-03-22 10:12 output3/part-00002
    [hadoop@master ~]$ hdfs dfs -cat output3/part-00002
    noe 1166202
    two 1166202
    [hadoop@master ~]$ hdfs dfs -cat output3/part-00000
    three   1166202
    [hadoop@master ~]$ 
    
    

    相关文章

      网友评论

          本文标题:Shell脚本实现MapReduce统计单词数程序

          本文链接:https://www.haomeiwen.com/subject/kkrnlqtx.html