美文网首页
Python MapReduce HelloWorld

Python MapReduce HelloWorld

作者: tobrainto | 来源:发表于2018-08-26 00:39 被阅读0次

    1. Hadoop集群环境

    Master 192.168.40.130
    Slave1 192.168.40.131
    Slave2 192.168.40.132

    2. Python环境

    ]# python -V
    Python 2.6.6
    

    3. Python MapReduce函数的编写及厕所

    map函数的编写

    ]# pwd
    /demo/pywc
    ]# touch map.py
    ]# vim map.py

    • map.py内容
    import sys
    import time
    import re
    
    p = re.compile(r'\w+')
    for line in sys.stdin:
            ss = line.strip().split(' ')
            for s in ss:
            #time.sleep(1)
            if len(p.findall(s))<1:
                #print s
                continue     
            s = p.findall(s)[0].lower()
                if s.strip() != "":
                    print "%s\t%s" % (s, 1)
    
    • 单机验证 map.py

    cat /data/The_Man_of_Property.txt | python map.py

    reduce函数的编写

    ]# pwd
    /demo/pywc
    ]# touch reduce.py
    ]# vim reduce.py

    • reduce.py内容
    import sys
    
    current_word = None
    sum = 0
    
    for line in sys.stdin:
        word, val = line.strip().split('\t')
    
        if current_word == None:
            current_word = word
    
        if current_word != word:
            print "%s\t%s" % (current_word, sum)
            current_word = word
            sum = 0
        sum += int(val) 
    print "%s\t%s" % (current_word, str(sum))
    
    • 单机验证 reduce.py

    cat /data/The_Man_of_Property.txt | python map.py | sort -k1 | python reduce.py

    4. 准备工作

    • 启动Hadoop集群服务

    ]# cd /usr/local/src/hadoop-2.6.1/sbin/
    ]# ./start-all.sh

    • hdfs上创建pywc目录并上传文件

    ]# hdfs dfs -mkdir -p /demo/pywc
    ]# hdfs dfs -put /data/The_Man_of_Property.txt /demo/pywc/
    ]# hdfs dfs -ls /demo/pywc/
    Found 1 items
    -rw-r--r-- 1 root supergroup 632207 2018-08-25 23:23 /demo/pywc/The_Man_of_Property.txt

    • hdfs上创建output目录(mr运行结果目录)

    ]# hdfs dfs -mkdir -p /demo/pywc/output/

    • 查找hadoop-streaming*.jar类库

    hadoop-streaming*.jar是Python MR运行时需要的支持类库,这个文件一般位于Hadoop安装目录下

    ]# find /usr/local/src/hadoop-2.6.1/ -name hadoop-streaming*.jar
    /usr/local/src/hadoop-2.6.1/share/hadoop/tools/sources/hadoop-streaming-2.6.1-sources.jar
    /usr/local/src/hadoop-2.6.1/share/hadoop/tools/sources/hadoop-streaming-2.6.1-test-sources.jar
    /usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar

    /usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar这个就是我们需要的文件

    5. Python MapReduce在Hadoop集群上运行

    run.sh脚本的编写

    ]# pwd
    /demo/pywc
    ]# touch run.sh
    ]# vim run.sh

    • run.sh内容
    HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
    
    INPUT_FILE_PATH="/demo/pywc/The_Man_of_Property.txt"
    OUTPUT_PATH="/demo/pywc/output/"
    # 删除历史运行结果
    $HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH
    
    
    $HADOOP_CMD jar $STREAM_JAR_PATH \
        -input $INPUT_FILE_PATH \
        -output $OUTPUT_PATH \
        -mapper "python map.py" \
        -reducer "python reduce.py" \
        -file ./map.py \
        -file ./reduce.py
    

    Python MR Job运行

    • 执行run.sh提交任务
    ]# ./run.sh
    Deleted /demo/pywc/output
    18/08/25 23:47:08 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
    packageJobJar: [./map.py, ./reduce.py, /tmp/hadoop-unjar1524200123508957206/] [] /tmp/streamjob8469713926615236498.jar tmpDir=null
    18/08/25 23:47:10 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.40.130:8032
    18/08/25 23:47:10 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.40.130:8032
    18/08/25 23:47:12 INFO mapred.FileInputFormat: Total input paths to process : 1
    18/08/25 23:47:12 INFO mapreduce.JobSubmitter: number of splits:2
    18/08/25 23:47:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1535210088819_0002
    18/08/25 23:47:13 INFO impl.YarnClientImpl: Submitted application application_1535210088819_0002
    18/08/25 23:47:13 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1535210088819_0002/
    18/08/25 23:47:13 INFO mapreduce.Job: Running job: job_1535210088819_0002
    18/08/25 23:47:25 INFO mapreduce.Job: Job job_1535210088819_0002 running in uber mode : false
    18/08/25 23:47:25 INFO mapreduce.Job:  map 0% reduce 0%
    18/08/25 23:47:43 INFO mapreduce.Job:  map 50% reduce 0%
    18/08/25 23:47:44 INFO mapreduce.Job:  map 100% reduce 0%
    18/08/25 23:47:53 INFO mapreduce.Job:  map 100% reduce 100%
    18/08/25 23:47:54 INFO mapreduce.Job: Job job_1535210088819_0002 completed successfully
    18/08/25 23:47:54 INFO mapreduce.Job: Counters: 49
        File System Counters
            FILE: Number of bytes read=1026038
            FILE: Number of bytes written=2379600
            FILE: Number of read operations=0
            FILE: Number of large read operations=0
            FILE: Number of write operations=0
            HDFS: Number of bytes read=635800
            HDFS: Number of bytes written=92691
            HDFS: Number of read operations=9
            HDFS: Number of large read operations=0
            HDFS: Number of write operations=2
        Job Counters 
            Launched map tasks=2
            Launched reduce tasks=1
            Data-local map tasks=2
            Total time spent by all maps in occupied slots (ms)=30141
            Total time spent by all reduces in occupied slots (ms)=7629
            Total time spent by all map tasks (ms)=30141
            Total time spent by all reduce tasks (ms)=7629
            Total vcore-seconds taken by all map tasks=30141
            Total vcore-seconds taken by all reduce tasks=7629
            Total megabyte-seconds taken by all map tasks=30864384
            Total megabyte-seconds taken by all reduce tasks=7812096
        Map-Reduce Framework
            Map input records=2866
            Map output records=110509
            Map output bytes=805014
            Map output materialized bytes=1026044
            Input split bytes=208
            Combine input records=0
            Combine output records=0
            Reduce input groups=9000
            Reduce shuffle bytes=1026044
            Reduce input records=110509
            Reduce output records=9000
            Spilled Records=221018
            Shuffled Maps =2
            Failed Shuffles=0
            Merged Map outputs=2
            GC time elapsed (ms)=495
            CPU time spent (ms)=4970
            Physical memory (bytes) snapshot=480501760
            Virtual memory (bytes) snapshot=6184808448
            Total committed heap usage (bytes)=259534848
        Shuffle Errors
            BAD_ID=0
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0
        File Input Format Counters 
            Bytes Read=635592
        File Output Format Counters 
            Bytes Written=92691
    18/08/25 23:47:54 INFO streaming.StreamJob: Output directory: /demo/pywc/output/
    

    JOB正常运行完成

    • 结果查看
    ]# hdfs dfs -ls /demo/pywc/output/
    Found 2 items
    -rw-r--r--   1 root supergroup          0 2018-08-25 23:47 /demo/pywc/output/_SUCCESS
    -rw-r--r--   1 root supergroup      92691 2018-08-25 23:47 /demo/pywc/output/part-00000
    [root@master pywl]# hdfs dfs -cat /demo/pywc/output/part-00000 | head -100
    #  省去若干行
    accosting   1
    account 12
    accountant  2
    accounts    10
    accuracy    1
    accurately  1
    accustomed  9
    ache    3
    achieve 1
    achievement 1
    achievements    1
    achieves    1
    achilles    1
    aching  2
    acid    1
    

    相关文章

      网友评论

          本文标题:Python MapReduce HelloWorld

          本文链接:https://www.haomeiwen.com/subject/bzyniftx.html