map_allsort.py
#!/usr/bin/python
import sys
base_count = 10000
for line in sys.stdin:
ss= line.strip().split("\t")
key =ss[0]
val =ss[1]
newKey = base_count+int(key)
print "%s\t%s" % (newKey,val)
reduce_allsort.py
#!/usr/bin/python
import sys
base_value = 10000
for line in sys.stdin:
key, val = line.strip().split('\t')
print str(int(key)-base_value) + "\t" + val
allsort_run.sh
#!/bin/bash
HADOOP_CMD="/root/software/hadoop/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH=$(pwd)/hadoop-streaming-2.6.1.jar
INPUT_FILE_PATH_A="/input/allsort/a.txt"
INPUT_FILE_PATH_B="/input/allsort/b.txt"
OUTPUT_SORT_PATH="/output/allsort"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH
# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
-output $OUTPUT_SORT_PATH \
-mapper "python map_allsort.py" \
-reducer "python reduce_allsort.py" \
-jobconf "mapred.reduce.tasks=1" \
-file ./map_allsort.py \
-file ./reduce_allsort.py \
a.txt
1 hadoop
3 hadoop
5 hadoop
7 hadoop
9 hadoop
11 hadoop
13 hadoop
15 hadoop
17 hadoop
19 hadoop
21 hadoop
23 hadoop
25 hadoop
27 hadoop
29 hadoop
31 hadoop
b.txt
0 java
2 java
4 java
6 java
8 java
10 java
12 java
14 java
16 java
18 java
20 java
22 java
24 java
26 java
28 java
30 java
32 java
概括了说就是利用mapreduce框架自带的归并排序功能
先partition到一个桶当中。然后排序
假如说要把上面的数据0-16放一起,17-32放一起
List<K,V> =====>List<Partition,K,V> K内部是快速排序
每个partition内部,又做了一次映射。 List<K,V> ====>List<K',V'>
partition 函数 p(x) ,map 函数m(x) ,combine 函数 c(x).
没有处理的话,透传,各个函数=1
按照上面的要求 partition函数 p(x) 如下
map_sort2p.py
#!/usr/bin/python
import sys
base_count = 10000
for line in sys.stdin:
ss = line.strip().split('\t')
key = ss[0]
val = ss[1]
new_key = base_count + int(key)
red_idx = 1
if new_key < (10100 + 10000) / 2:
red_idx = 0
print "%s\t%s\t%s" % (red_idx, new_key, val)
reduce_sort2p.py
#!/usr/bin/python
import sys
base_count = 10000
for line in sys.stdin:
idx_id, key, val = line.strip().split('\t')
new_key = int(key) - base_count
print '\t'.join([str(new_key), val])
sort2p_run.sh
#!/bin/bash
HADOOP_CMD="/root/software/hadoop/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH=$(pwd)/hadoop-streaming-2.6.1.jar
INPUT_FILE_PATH_A="/input/allsort/a.txt"
INPUT_FILE_PATH_B="/input/allsort/b.txt"
OUTPUT_SORT_PATH="/output/sort2p"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH
# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
-output $OUTPUT_SORT_PATH \
-mapper "python map_sort2p.py" \
-reducer "python reduce_sort2p.py" \
-file ./map_sort2p.py \
-file ./reduce_sort2p.py \
-jobconf mapred.reduce.tasks=2 \
-jobconf stream.num.map.output.key.fields=2 \
-jobconf num.key.fields.for.partition=1 \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
网友评论