课程主页:https://pdos.csail.mit.edu/6.824/index.html
课程安排:https://pdos.csail.mit.edu/6.824/schedule.html(有资料)
视频: https://www.bilibili.com/video/BV1R7411t71W?p=2
论文:https://pdos.csail.mit.edu/6.824/papers/mapreduce.pdf
MapReduce 组成:
Map 函数:
处理输入的(key,value)对,生成中间结果(key,value)对,并将中间结果传给reduce函数处理
reduce 函数:
处理中间结果(key,value)对,将相同的key进行聚合。
举例:
- 计算大量文档中每个单词的数量:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
- Count of URL Access Frequency(URL的访问频率)
(URL, total count) - Reverse Web-Link Graph(反向网络链接图)
(target, list<source)>) - Term-Vector per Host (主机词向量)
(hostname, term vector)
处理流程:
Execution overview.jpg
- 将输入文档分割为M个(每个16m~64m大小)。
- 选取一个程序副本为master,用于分配map和reduce任务。
- 分配到map任务的worker,读取分割好的输入,分析(key,value)对,传递给map函数处理,并生成中间结果缓存在内存中。
- 缓存的中间结果被定时的写到本地磁盘,这些中间结果在本地磁盘的位置会被传到master,用以分配到相应的reduce worker。
- 分配到reduce任务的worker读取存储在本地磁盘的中间结果,并更具key进行排序。注意:中间结果数据量如果太大,会用到外部排序。
- reduce worker 迭代的遍历排序好的中间结果,将不同的key和相应的value集合传到reduce函数中进行处理。生成的结果被附加写入到最后的输出文件中去。
- 当所有的map和reduce任务都完成,master唤醒用户程序,并返回。
生成多个输出文件,并不会被合并为一个文件,一般情况下会作为下一个MapReduce调用的输入或者应用到下一个分布式应用中。
容错
- worker 失败: 重新执行worker所执行的任务。
- master 失败: 直接取消当前的MapReduce计算。当然也可以周期性的记录master的状态,当master是失败时,从上一个记录点恢复,重新执行。
Locality
这里的Locality可以看做master的一种调度策略。
当文件系统将输入文件分割为大小为64M的数据块,并为了保证高可用性,每个块有多个副本(一般为3个),这些数据块被保存在不同机器上。当master调度map任务时,会优先考虑在存在相应输入数据的机器上进行调度,如果调度失败,则考虑调度到距离数据较近的机器上(例如在同一台交换机,同一个机架上)。
这么考虑的原因是,在这个计算环境中,处理大规模的数据,网络带宽是一种稀缺资源,而这种策略可以减少网络带宽的消耗。
任务粒度
M map任务和 R reduce任务应该比worker的数量大的多。这样可以提高worker的动态负载均衡和其从失败中恢复的速度。
通常情况下,M:以每个单独任务处理16M~64M的输入数据为依据。R:所用机器的较小的倍数关系。
后备任务
当MapReduce操作接近完成时,master调度一个后备执行剩下的任务,无论是主要的任务和还是后备的任务完成,这项任务都会被标记为完成。后备任务的设计是用来解决因为任务执行落后者带来的延长整体任务执行时间的问题。
改进:
- 分区函数:hash(key) mod R => hash(Hostname(urlKey))。
- 保证顺序:对于给定的分区,保证中间键/值对以递增的键顺序处理。
- 合并函数:map任务会产生大量重复的key,使用合并函数可以在这些key被发送到网络中之前进行合并。一般合并函数使用和reduce函数相同的代码,差别只是在处理输出文件上。
- 输入输出类型:支持多种不同的格式,用户只需实现reader 接口就可以添加自己的输入类型。
- 副作用:对于产生额外输出文件的问题,程序会将输出写入到临时文件中,并在全部完成后进行重命名。
- 跳过不好的记录: worker会安装一个handler用来捕获在特定记录上的错误信号。
- 本地执行:为解决分布式系统难以调试的问题,提供了MapReduce的替代实现,用以本地调试。
8.状态信息:master提供了一组状态页面,用以查看计算的进度。
9.计数器:MapReduce库提供一个计数器用以统计各种事件发生的次数。
应用:
- 大规模机器学习问题
- Google 新闻和 Froogle 产品的聚类问题
- 提取产品数据,网页属性
- 大规模图形计算
- Google网页搜索服务的索引系统
#include "mapreduce/mapreduce.h"
// User’s map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
// User’s reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into "spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class("Adder");
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: ’result’ structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}
网友评论