美文网首页
hadoop streaming 编写全局排序

hadoop streaming 编写全局排序

作者: sadamu0912 | 来源:发表于2019-04-25 23:16 被阅读0次

    map_allsort.py

    #!/usr/bin/python
    import sys
    
    base_count = 10000
    for line in sys.stdin:
            ss= line.strip().split("\t")
            key =ss[0]
            val =ss[1]
            newKey = base_count+int(key)
            print "%s\t%s" % (newKey,val)
    

    reduce_allsort.py

    #!/usr/bin/python
    import sys
    base_value = 10000
    for line in sys.stdin:
        key, val = line.strip().split('\t')
        print str(int(key)-base_value) + "\t" + val
    

    allsort_run.sh

    #!/bin/bash
    HADOOP_CMD="/root/software/hadoop/hadoop-2.6.1/bin/hadoop"
    STREAM_JAR_PATH=$(pwd)/hadoop-streaming-2.6.1.jar
    INPUT_FILE_PATH_A="/input/allsort/a.txt"
    INPUT_FILE_PATH_B="/input/allsort/b.txt"
    OUTPUT_SORT_PATH="/output/allsort"
    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH
    # Step 3.
    $HADOOP_CMD jar $STREAM_JAR_PATH \
        -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
        -output $OUTPUT_SORT_PATH \
        -mapper "python map_allsort.py" \
        -reducer "python reduce_allsort.py" \
        -jobconf "mapred.reduce.tasks=1" \
        -file ./map_allsort.py \
        -file ./reduce_allsort.py \
    

    a.txt

    1   hadoop
    3   hadoop
    5   hadoop
    7   hadoop
    9   hadoop
    11  hadoop
    13  hadoop
    15  hadoop
    17  hadoop
    19  hadoop
    21  hadoop
    23  hadoop
    25  hadoop
    27  hadoop
    29  hadoop
    31    hadoop
    

    b.txt

    0   java
    2   java
    4   java
    6   java
    8   java
    10  java
    12  java
    14  java
    16  java
    18  java
    20  java
    22  java
    24  java
    26  java
    28  java
    30  java
    32  java
    

    概括了说就是利用mapreduce框架自带的归并排序功能
    先partition到一个桶当中。然后排序

    假如说要把上面的数据0-16放一起,17-32放一起

    List<K,V> =====>List<Partition,K,V> K内部是快速排序
    每个partition内部,又做了一次映射。 List<K,V> ====>List<K',V'>
    partition 函数 p(x) ,map 函数m(x) ,combine 函数 c(x).
    没有处理的话,透传,各个函数=1
    按照上面的要求 partition函数 p(x) 如下
    P(K)=\begin{cases} 0 & K<=16 \\ 1 & 16<K<=32 \\ \end{cases}

    map_sort2p.py

    #!/usr/bin/python
    import sys
    
    base_count = 10000
    
    for line in sys.stdin:
        ss = line.strip().split('\t')
        key = ss[0]
        val = ss[1]
    
        new_key = base_count + int(key)
    
        red_idx = 1
        if new_key < (10100 + 10000) / 2:
            red_idx = 0
    
        print "%s\t%s\t%s" % (red_idx, new_key, val)
    
    

    reduce_sort2p.py

    #!/usr/bin/python
    import sys
    
    base_count = 10000
    
    for line in sys.stdin:
        idx_id, key, val = line.strip().split('\t')
    
        new_key = int(key) - base_count
        print '\t'.join([str(new_key), val])
    
    

    sort2p_run.sh

    #!/bin/bash
    HADOOP_CMD="/root/software/hadoop/hadoop-2.6.1/bin/hadoop"
    STREAM_JAR_PATH=$(pwd)/hadoop-streaming-2.6.1.jar
    INPUT_FILE_PATH_A="/input/allsort/a.txt"
    INPUT_FILE_PATH_B="/input/allsort/b.txt"
    
    OUTPUT_SORT_PATH="/output/sort2p"
    
    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH
    
    # Step 3.
    $HADOOP_CMD jar $STREAM_JAR_PATH \
        -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
        -output $OUTPUT_SORT_PATH \
        -mapper "python map_sort2p.py" \
        -reducer "python reduce_sort2p.py" \
        -file ./map_sort2p.py \
        -file ./reduce_sort2p.py \
        -jobconf mapred.reduce.tasks=2 \
        -jobconf stream.num.map.output.key.fields=2 \
        -jobconf num.key.fields.for.partition=1 \
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
    

    相关文章

      网友评论

          本文标题:hadoop streaming 编写全局排序

          本文链接:https://www.haomeiwen.com/subject/swntnqtx.html