网上找到一篇非常好的mapReduce的文字,摘录于此:https://www.cnblogs.com/aijianiula/p/3976562.html
1.MapReduce框架简介
要设计倒排索引这个算法,那么我们首先得知道MapReduce框架中的InputFormat类,Mapper类,Partition过程,sort过程,Combine类,Reduce类的设计原理。
1.1InputFormat类
InputFormat类的作用是什么呢?其实就是把输入的数据(就是你上传到hdfs的文件)切分成一个个的split,然后将split分拆成<key,value>对做为map函数的输入。hadoop里默认使用TextInputFormat类处理输入(这个类只处理文本文件)。
TextInputFormat类将文本文件的多行分割成多个split,并通过LineRecorderReader将其中的每一行解析成<key,value>对,key为该行在文本中的偏移量,value值为这一行的内容。例如下面一个text.txt文本,经过InputFormat类处理之后
image如上图text1.txt文档,经过InputFormat处理之后,形成了3个<key,value>对,这3对之中,第一对中key=0,是因为hello单词位于文本头。第二对中key=31,是因为下一行的首单词if相对整个文本,它位于31的位置。类似第三对也如此。您可以自己数下并知道。
这3个<key,value>对做为map函数的输入,然后你可以根据需要在对输入的<key,value>对进行处理。
1.2Mapper类
实现map函数,根据输入的<key,value>对生成中间结果。这里以wordcount例子来说,对于上面的text.txt文档经过InputFormat处理之后,上面3对<key,value>对输入map之后,输出过程如下:
image 图2 map处理过程
1.3Partition过程
Partition就是分区。为什么要分区呢?因为大多时候有多个Reducer,分区的作用就是对map的输出,redcue的输入进行预处理。最重要的就是处理到在输入reduce的数据,保证同一个key值被分到同一个reduce。MapReduce程序中,Partition决定Map节点的输出将分区到那个Reduce节点上。默认是使用HashPartition,根据key值进行Hash操作。
image 图3 Partition分区过程
仔细观察Partition分区过程,text1.txt和text2.txt的partition1分区中有相同的key。这就是分区的效果,把相同的key放在同一个分区里。
1.4 sort过程
sort过程就是把键值对按照key值的首字母进行排序。
image1.5 Combin类
Combine过程是Mapper的一部分,在map函数之后执行。实现combine函数,合并中间结果具有相同key值的键值对。
image1.6 Reduce类
reduce过程是整合的过程,当然像wordcount的例子,我们只需要统计单词个数而已,只需一个简单的统计循环。
这里对reduce的介绍有点简略,可以有多个reducer, 同一个partition的数据只会被同一个reducer来处理,反之不成立。
image结论
mapper一定在reduce之前完成运行,多进程的mapper和多进程的reducer,以及多进程(我的理解哈)的partition等步骤,可以充分利用多核cpu,而且数据显然和cpu是在一起的,相当于cpu需要迁就数据,编程范式也有了相当的改变。
python代码实现
# https://pymotw.com/3/multiprocessing/mapreduce.html
from multiprocessing import Pool
class SimpleMapReduce:
def __init__(self, map_func, reduce_func, num_workers=None):
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = Pool(num_workers)
def partition(self, mapped_values):
partitioned_data = collections.defaultdict(list)
for key, value in mapped_values:
partitioned_data[key].append(value)
return partitioned_data.items()
def __call__(self, inputs, chunksize=1):
# map是多进程的
map_responses = self.pool.map(
self.map_func,
inputs,
chunksize=chunksize,
)
# map全部完成后再做partition, partition是单进程的
partitioned_data = self.partition(
itertools.chain(*map_responses)
)
# reduce也是多进程的
reduced_values = self.pool.map(
self.reduce_func,
partitioned_data,
)
return reduced_values
应用:统计金庸小说中最常使用的单词, 目录结构如下
novels
├── 1.txt
├── 2.txt
├── 3.txt
└── 4.txt
map做的事情是每本小说都有一个进程去处理
import collections
import itertools
from multiprocessing import Pool
import codecs
import jieba
import string
import zhon.hanzi
import glob
import time
TRANSLATE_TABLE = str.maketrans({p: ' ' for p in string.punctuation + zhon.hanzi.punctuation}) # 把标点符号变成空格
STOP_WORDS = set()
with codecs.open('stopwords.txt', encoding='gbk') as f:
for word in f:
STOP_WORDS.add(word.strip())
STOP_WORDS.add('')
STOP_WORDS.add('ixdzs')
class SimpleMapReduce:
def __init__(self, map_func, reduce_func, num_workers=None):
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = Pool(num_workers)
def partition(self, mapped_values):
partitioned_data = collections.defaultdict(list)
for key, value in mapped_values:
partitioned_data[key].append(value)
return partitioned_data.items()
def __call__(self, inputs, chunksize=1):
map_responses = self.pool.map(
self.map_func,
inputs,
chunksize=chunksize,
)
partitioned_data = self.partition(
itertools.chain(*map_responses)
)
reduced_values = self.pool.map(
self.reduce_func,
partitioned_data,
)
return reduced_values
def file_to_word(input_file):
"""map函数, 返回[(word, 1), ...]"""
word_count_list = []
with codecs.open(input_file, encoding='gbk') as f:
for line in f:
new_line = line.translate(TRANSLATE_TABLE)
# for word in new_line.split():
for word in jieba.cut_for_search(new_line):
if word in STOP_WORDS:
continue
elif len(word) <= 3:
continue
word_count_list.append((word.strip(), 1))
return word_count_list
def sum_count(item):
"""输入(word, [1,1,1]), 输出(word, 3)"""
return item[0], sum(item[1])
def try_file_to_word():
input_file = 'test_file.txt'
l = file_to_word(input_file)
print(l)
def count_jinyong(num_workers=None):
mapper = SimpleMapReduce(file_to_word, sum_count, num_workers=num_workers)
reduced_values = mapper(glob.glob('novels/*.txt'))
word_counts = sorted(reduced_values, key=lambda item: item[1], reverse=True)
top100 = word_counts[:100]
print(top100)
for word, count in top100:
print(word, count)
if __name__ == '__main__':
# print(zhon.hanzi.punctuation)
# try_file_to_word()
assert sum_count(('word', [1, 1, 1])) == ('word', 3)
assert glob.glob('novels/*.txt') == ['novels/4.txt', 'novels/3.txt', 'novels/2.txt', 'novels/1.txt']
start_time = time.time()
num_workers = 4
count_jinyong(num_workers=num_workers)
end_time = time.time()
print(f'num_workers = {num_workers}, time elapse {end_time - start_time}')
start_time = time.time()
num_workers = 1
count_jinyong(num_workers=num_workers)
end_time = time.time()
print(f'num_workers = {num_workers}, time elapse {end_time - start_time}')
输出为, 可以看出多进程运算时间比单进程要短一半多,这就证明了MapReduce的有效性:
[('哈哈大笑', 529), ('成吉思汗', 373), ('完颜洪烈', 346), ('东方不败', 320), ('桃谷六仙', 309), ('微微一笑', 297), ('令狐大哥', 276), ('胡说八道', 262), ('大吃一惊', 255), ('灭绝师太', 247), ('英雄好汉', 194), ('九阴真经', 192), ('令狐冲笑', 173), ('定静师太', 171), ('费要多罗', 169), ('方证大师', 164), ('非同小可', 156), ('动弹不得', 147), ('又惊又喜', 141), ('原来如此', 135), ('令狐公子', 135), ('清清楚楚', 129), ('恭恭敬敬', 124), ('天下第一', 114), ('回过头来', 112), ('令狐冲见', 110), ('如何是好', 108), ('四十二章', 107), ('放在心上', 106), ('一灯大师', 106), ('令狐冲心', 105), ('片刻之间', 100), ('灵智上人', 99), ('令狐掌门', 98), ('意料之外', 96), ('江南七怪', 93), ('降龙十八掌', 93), ('面面相觑', 92), ('无论如何', 92), ('干干净净', 91), ('抬起头来', 90), ('吓了一跳', 89), ('金花婆婆', 84), ('天门道人', 83), ('令狐冲听', 83), ('霎时之间', 81), ('一时之间', 78), ('独孤九剑', 78), ('不戒和尚', 73), ('武当七侠', 72), ('大不相同', 70), ('放声大哭', 69), ('玄贞道人', 69), ('日月神教', 68), ('不知去向', 67), ('躬身行礼', 67), ('建宁公主', 67), ('顷刻之间', 63), ('不动声色', 62), ('阿弥陀佛', 62), ('英雄豪杰', 61), ('图尔布青', 61), ('一言不发', 60), ('笑了起来', 60), ('天下英雄', 60), ('破口大骂', 59), ('一模一样', 59), ('不由自主', 58), ('小小年纪', 58), ('冲虚道长', 58), ('尊姓大名', 57), ('从未见过', 57), ('反清复明', 56), ('迷迷糊糊', 55), ('支持不住', 55), ('自言自语', 54), ('长叹一声', 54), ('胡里胡涂', 54), ('自然而然', 53), ('心花怒放', 53), ('胡言乱语', 53), ('恍然大悟', 53), ('葵花宝典', 53), ('千秋万载', 53), ('一统江湖', 53), ('打狗棒法', 53), ('无根道人', 53), ('无可奈何', 52), ('这么一来', 51), ('亲眼见到', 51), ('令狐冲大', 50), ('惊喜交集', 50), ('如此说来', 49), ('大喝一声', 49), ('手下留情', 49), ('感激不尽', 48), ('莫名其妙', 48), ('粉身碎骨', 47), ('大功告成', 47), ('默不作声', 46)]
哈哈大笑 529
成吉思汗 373
完颜洪烈 346
东方不败 320
桃谷六仙 309
微微一笑 297
令狐大哥 276
胡说八道 262
大吃一惊 255
灭绝师太 247
英雄好汉 194
九阴真经 192
令狐冲笑 173
定静师太 171
费要多罗 169
方证大师 164
非同小可 156
动弹不得 147
又惊又喜 141
原来如此 135
令狐公子 135
清清楚楚 129
恭恭敬敬 124
天下第一 114
回过头来 112
令狐冲见 110
如何是好 108
四十二章 107
放在心上 106
一灯大师 106
令狐冲心 105
片刻之间 100
灵智上人 99
令狐掌门 98
意料之外 96
江南七怪 93
降龙十八掌 93
面面相觑 92
无论如何 92
干干净净 91
抬起头来 90
吓了一跳 89
金花婆婆 84
天门道人 83
令狐冲听 83
霎时之间 81
一时之间 78
独孤九剑 78
不戒和尚 73
武当七侠 72
大不相同 70
放声大哭 69
玄贞道人 69
日月神教 68
不知去向 67
躬身行礼 67
建宁公主 67
顷刻之间 63
不动声色 62
阿弥陀佛 62
英雄豪杰 61
图尔布青 61
一言不发 60
笑了起来 60
天下英雄 60
破口大骂 59
一模一样 59
不由自主 58
小小年纪 58
冲虚道长 58
尊姓大名 57
从未见过 57
反清复明 56
迷迷糊糊 55
支持不住 55
自言自语 54
长叹一声 54
胡里胡涂 54
自然而然 53
心花怒放 53
胡言乱语 53
恍然大悟 53
葵花宝典 53
千秋万载 53
一统江湖 53
打狗棒法 53
无根道人 53
无可奈何 52
这么一来 51
亲眼见到 51
令狐冲大 50
惊喜交集 50
如此说来 49
大喝一声 49
手下留情 49
感激不尽 48
莫名其妙 48
粉身碎骨 47
大功告成 47
默不作声 46
num_workers = 4, time elapse 20.70866894721985
[('哈哈大笑', 529), ('成吉思汗', 373), ('完颜洪烈', 346), ('东方不败', 320), ('桃谷六仙', 309), ('微微一笑', 297), ('令狐大哥', 276), ('胡说八道', 262), ('大吃一惊', 255), ('灭绝师太', 247), ('英雄好汉', 194), ('九阴真经', 192), ('令狐冲笑', 173), ('定静师太', 171), ('费要多罗', 169), ('方证大师', 164), ('非同小可', 156), ('动弹不得', 147), ('又惊又喜', 141), ('原来如此', 135), ('令狐公子', 135), ('清清楚楚', 129), ('恭恭敬敬', 124), ('天下第一', 114), ('回过头来', 112), ('令狐冲见', 110), ('如何是好', 108), ('四十二章', 107), ('放在心上', 106), ('一灯大师', 106), ('令狐冲心', 105), ('片刻之间', 100), ('灵智上人', 99), ('令狐掌门', 98), ('意料之外', 96), ('江南七怪', 93), ('降龙十八掌', 93), ('面面相觑', 92), ('无论如何', 92), ('干干净净', 91), ('抬起头来', 90), ('吓了一跳', 89), ('金花婆婆', 84), ('天门道人', 83), ('令狐冲听', 83), ('霎时之间', 81), ('一时之间', 78), ('独孤九剑', 78), ('不戒和尚', 73), ('武当七侠', 72), ('大不相同', 70), ('放声大哭', 69), ('玄贞道人', 69), ('日月神教', 68), ('不知去向', 67), ('躬身行礼', 67), ('建宁公主', 67), ('顷刻之间', 63), ('不动声色', 62), ('阿弥陀佛', 62), ('英雄豪杰', 61), ('图尔布青', 61), ('一言不发', 60), ('笑了起来', 60), ('天下英雄', 60), ('破口大骂', 59), ('一模一样', 59), ('不由自主', 58), ('小小年纪', 58), ('冲虚道长', 58), ('尊姓大名', 57), ('从未见过', 57), ('反清复明', 56), ('迷迷糊糊', 55), ('支持不住', 55), ('自言自语', 54), ('长叹一声', 54), ('胡里胡涂', 54), ('自然而然', 53), ('心花怒放', 53), ('胡言乱语', 53), ('恍然大悟', 53), ('葵花宝典', 53), ('千秋万载', 53), ('一统江湖', 53), ('打狗棒法', 53), ('无根道人', 53), ('无可奈何', 52), ('这么一来', 51), ('亲眼见到', 51), ('令狐冲大', 50), ('惊喜交集', 50), ('如此说来', 49), ('大喝一声', 49), ('手下留情', 49), ('感激不尽', 48), ('莫名其妙', 48), ('粉身碎骨', 47), ('大功告成', 47), ('默不作声', 46)]
哈哈大笑 529
成吉思汗 373
完颜洪烈 346
东方不败 320
桃谷六仙 309
微微一笑 297
令狐大哥 276
胡说八道 262
大吃一惊 255
灭绝师太 247
英雄好汉 194
九阴真经 192
令狐冲笑 173
定静师太 171
费要多罗 169
方证大师 164
非同小可 156
动弹不得 147
又惊又喜 141
原来如此 135
令狐公子 135
清清楚楚 129
恭恭敬敬 124
天下第一 114
回过头来 112
令狐冲见 110
如何是好 108
四十二章 107
放在心上 106
一灯大师 106
令狐冲心 105
片刻之间 100
灵智上人 99
令狐掌门 98
意料之外 96
江南七怪 93
降龙十八掌 93
面面相觑 92
无论如何 92
干干净净 91
抬起头来 90
吓了一跳 89
金花婆婆 84
天门道人 83
令狐冲听 83
霎时之间 81
一时之间 78
独孤九剑 78
不戒和尚 73
武当七侠 72
大不相同 70
放声大哭 69
玄贞道人 69
日月神教 68
不知去向 67
躬身行礼 67
建宁公主 67
顷刻之间 63
不动声色 62
阿弥陀佛 62
英雄豪杰 61
图尔布青 61
一言不发 60
笑了起来 60
天下英雄 60
破口大骂 59
一模一样 59
不由自主 58
小小年纪 58
冲虚道长 58
尊姓大名 57
从未见过 57
反清复明 56
迷迷糊糊 55
支持不住 55
自言自语 54
长叹一声 54
胡里胡涂 54
自然而然 53
心花怒放 53
胡言乱语 53
恍然大悟 53
葵花宝典 53
千秋万载 53
一统江湖 53
打狗棒法 53
无根道人 53
无可奈何 52
这么一来 51
亲眼见到 51
令狐冲大 50
惊喜交集 50
如此说来 49
大喝一声 49
手下留情 49
感激不尽 48
莫名其妙 48
粉身碎骨 47
大功告成 47
默不作声 46
num_workers = 1, time elapse 37.74268126487732
网友评论