美文网首页
hadoop streaming 编写全局排序

hadoop streaming 编写全局排序

作者: sadamu0912 | 来源:发表于2019-04-25 23:16 被阅读0次

map_allsort.py

#!/usr/bin/python
import sys

base_count = 10000
for line in sys.stdin:
        ss= line.strip().split("\t")
        key =ss[0]
        val =ss[1]
        newKey = base_count+int(key)
        print "%s\t%s" % (newKey,val)

reduce_allsort.py

#!/usr/bin/python
import sys
base_value = 10000
for line in sys.stdin:
    key, val = line.strip().split('\t')
    print str(int(key)-base_value) + "\t" + val

allsort_run.sh

#!/bin/bash
HADOOP_CMD="/root/software/hadoop/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH=$(pwd)/hadoop-streaming-2.6.1.jar
INPUT_FILE_PATH_A="/input/allsort/a.txt"
INPUT_FILE_PATH_B="/input/allsort/b.txt"
OUTPUT_SORT_PATH="/output/allsort"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH
# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
    -output $OUTPUT_SORT_PATH \
    -mapper "python map_allsort.py" \
    -reducer "python reduce_allsort.py" \
    -jobconf "mapred.reduce.tasks=1" \
    -file ./map_allsort.py \
    -file ./reduce_allsort.py \

a.txt

1   hadoop
3   hadoop
5   hadoop
7   hadoop
9   hadoop
11  hadoop
13  hadoop
15  hadoop
17  hadoop
19  hadoop
21  hadoop
23  hadoop
25  hadoop
27  hadoop
29  hadoop
31    hadoop

b.txt

0   java
2   java
4   java
6   java
8   java
10  java
12  java
14  java
16  java
18  java
20  java
22  java
24  java
26  java
28  java
30  java
32  java

概括了说就是利用mapreduce框架自带的归并排序功能
先partition到一个桶当中。然后排序

假如说要把上面的数据0-16放一起,17-32放一起

List<K,V> =====>List<Partition,K,V> K内部是快速排序
每个partition内部,又做了一次映射。 List<K,V> ====>List<K',V'>
partition 函数 p(x) ,map 函数m(x) ,combine 函数 c(x).
没有处理的话,透传,各个函数=1
按照上面的要求 partition函数 p(x) 如下
P(K)=\begin{cases} 0 & K<=16 \\ 1 & 16<K<=32 \\ \end{cases}

map_sort2p.py

#!/usr/bin/python
import sys

base_count = 10000

for line in sys.stdin:
    ss = line.strip().split('\t')
    key = ss[0]
    val = ss[1]

    new_key = base_count + int(key)

    red_idx = 1
    if new_key < (10100 + 10000) / 2:
        red_idx = 0

    print "%s\t%s\t%s" % (red_idx, new_key, val)

reduce_sort2p.py

#!/usr/bin/python
import sys

base_count = 10000

for line in sys.stdin:
    idx_id, key, val = line.strip().split('\t')

    new_key = int(key) - base_count
    print '\t'.join([str(new_key), val])

sort2p_run.sh

#!/bin/bash
HADOOP_CMD="/root/software/hadoop/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH=$(pwd)/hadoop-streaming-2.6.1.jar
INPUT_FILE_PATH_A="/input/allsort/a.txt"
INPUT_FILE_PATH_B="/input/allsort/b.txt"

OUTPUT_SORT_PATH="/output/sort2p"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH

# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
    -output $OUTPUT_SORT_PATH \
    -mapper "python map_sort2p.py" \
    -reducer "python reduce_sort2p.py" \
    -file ./map_sort2p.py \
    -file ./reduce_sort2p.py \
    -jobconf mapred.reduce.tasks=2 \
    -jobconf stream.num.map.output.key.fields=2 \
    -jobconf num.key.fields.for.partition=1 \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

相关文章

网友评论

      本文标题:hadoop streaming 编写全局排序

      本文链接:https://www.haomeiwen.com/subject/swntnqtx.html