MapReduce: 一种简化的大规模集群数据处理法

时间:2020-12-15 15:15:07

(只有文字没有图,图请参考http://research.google.com/archive/mapreduce.html

MapReduce: 一种简化的大规模集群数据处理法

翻译:风里来雨里去

原文:MapReduce: Simplified Data Processing on Large Clusters

作者:JeffreyDean and Sanjay Ghemawat

转载请保留以上信息

摘要

MapReduct是一个用于处理与生成大型数据集的编程模型及相关实现。用户分别指定一个map函数与一个reduce函数,由map函数处理一个输入键值对,生成若干中间键值对,再由reduce函数合并具有相同键的中间值。这一模型可以用于表述许多真实世界的问题。

采用这种函数式风格编写的程序能自动地并行运转在廉价计算机构成的大规模集群中。运行系统管理着输入数据的拆分、横跨多机的程序调度、硬件故障的处理,以及多机间的通信。这样一来,就算是没有任何并行计算开发经验的程序员都能轻易地利用一个大型分布式系统的资源。

我们的MapReduce实现是运行于由廉价计算机构成的大规模集群之上,具有很高的伸缩性,一个典型的MapReduce操作往往需要处理数千台计算机上的TB级数据。程序员们认为这一系统易于使用,目前他们已经实现了数百个MapReduce程序,而且每天都要在Google内部的集群上运行1000多个MapReduce作业。

1 简介

在过去的5年中,本文作者与许多Google同事曾经编写了数百个专门用途的程序,这些程序都是对爬虫取回的网页、web请求日志等容量巨大的原始数据进行处理,计算出各种不同的衍生数据,例如反向索引、各种形式的网页结构图、各网站的网页总数、指定日期的频繁查询集,等等。这些程序的算法往往很简单,但由于输入数据量太大,为了要在可以接受的时间内完成,我们不得不将它们放到数千台计算机上去运行。而为了处理并行化、数据分发、硬件故障等难题,又不得不在原本简单的程序中加入大量复杂的代码。

为解决这一难题,我们设计了一个新模型,利用运行库隐藏并行化、故障处理、数据分发和负载均衡等复杂细节,程序员只需表达真正想要的计算逻辑即可。我们从Lips等函数式语言的map与reduce原语中受到启发,发现以前所写的程序大都具备一个共性:对输入的“记录”执行一个map操作,得出若干中间键值对,然后处理中间值,对键相同的中间值执行一个reduce操作,对衍生出的数据加以合并。利用这种由用户指定map/reduce操作的模型,很容易实现并行化,而且可使用重新运行作为容错的主要手段。

这一成果的主要贡献是提供了一个简单而强大的接口,可帮助实现大规模计算的自动并行化,同时还提供了该接口的一个实现,可在由廉价计算机构成的大规模集群上达到很高的性能。

本文的第2节讲述了MapReduce的基本编程模型,并给出了一些例子。第3节介绍了一个专为我们的集群环境度身订造的MapReduce实现。第4节介绍了一些我们认为有用的优化技术。第5节利用几个不同的作业,对我们的MapReduce实现进行了性能评测。第6节介绍了MapReduce在Google内部的使用情况,以及我们利用它重写索引编制系统的一些经验。第7节讨论了一些相关的成果。

2 编程模型

某一计算,获取若干输入键值对,生成若干输出键值对。MapReduce的用户可以通过两个函数表达这一计算:Map与Reduce。

Map是由用户编写,它获取一个输入键值对,生成若干中间键值对。MapReduce将所有具有相同键I的中间值编为一组,交给Reduce函数。

Reduce函数同样是由用户编写,它接受I及I对应的所有值,将它们合并为较小的集合。一次Reduce调用往往只生成0到1个值。在将中间值传递给reduce函数时,系统采用了迭代方式,从而得以处理那些由于数据过多而无法放入内存的情况。

2.1 示例

现在,假设需要统计某一批文档中各个单词出现的次数。用户可能会写出这样的代码:

map(String key, String value):

//key: document name

//value: document contents

foreach word w in value:

EmitIntermediate(w, “1”);

reduce(String key, Iterator values):

//key: a word

//values: a list of counts

intresult = 0;

foreach v in values:

result += ParseInt(v);

Emit(AsString(result));

map函数输出各单词及出现次数(本例中为1)。reduce函数将指定单词的次数累加起来。

用户还需编写一些代码,将输入输出文件的名称及一些可选参数填入mapreducespecification对象,然后将它作为参数,调用MapReduce函数。系统会将用户代码链接到MapReduce库(以C++实现)上。附录A提供了本例的完整代码。

2.2 类型

上一节的伪代码中,输入输出均为字符串。但在概念上,用户提供的map和reduce函数应具有以下相应类型:

map (k1,v1) -> list(k2,v2)

reduce (k2,list(v2) -> list(v2)

也就是说,输入键值与输出键值分属不同域。而中间键值与输出键值属于相同域。

在我们的实现中,map/reduce函数的输入与输出均采用字符串,而字符串与相应类型间的转换交由用户代码负责。

2.3 更多示例

以下是一些很容易采用MapReduce模型的小例子。

分布式grep:如果map函数匹配到指定的模式,即输出一行。reduce函数是一个恒等函数,直接将中间数据复制为输出数据。

URL访问频率统计:map函数处理web请求日志,输出

include”mapreduce/mapreduce.h”

// User’smap function

classWordCounter : public Mapper {

public:

virtual void Map(constMapInput& input) {

const string& text =input.value();

const int n = text.size();

for (int i = 0; i < n; ) {

// Skip past leading whitespace

while ((i < n) &&isspace(text[i]))

i++;

// Find word end

int start = i;

while ((i < n) && !isspace(text[i]))

i++;

if (start < i)

Emit(text.substr(start,i-start),”1”);

}

}

};

REGISTER_MAPPER(WordCounter);

// User’sreduce function

classAdder : public Reducer {

virtual void Reduce(ReduceInput*input) {

// Iterate over all entries withthe

// same key and add the values

int64 value = 0;

while (!input->done()) {

value +=StringToInt(input->value());

input->NextValue();

}

// Emit sum for input->key()

Emit(IntToString(value));

}

};

REGISTER_REDUCER(Adder);

intmain(int argc, char** argv) {

ParseCommandLineFlags(argc, argv);

MapReduceSpecification spec;

// Store list of input files into”spec”

for (int i = 1; i < argc; i++){

MapReduceInput* input =spec.add_input();

input->set_format(“text”);

input->set_filepattern(argv[i]);

input->set_mapper_class(“WordCounter”);

}

// Specify the output files:

// /gfs/test/freq-00000-of-00100

// /gfs/test/freq-00001-of-00100

// …

MapReduceOutput* out =spec.output();

out->set_filebase(“/gfs/test/freq”);

out->set_num_tasks(100);

out->set_format(“text”);

out->set_reducer_class(“Adder”);

// Optional: do partial sumswithin map

// tasks to save network bandwidth

out->set_combiner_class(“Adder”);

// Tuning parameters: use at most2000

// machines and 100 MB of memoryper task

spec.set_machines(2000);

spec.set_map_megabytes(100);

spec.set_reduce_megabytes(100);

// Now run it

MapReduceResult result;

if (!MapReduce(spec, &result))abort();

// Done: ‘result’ structurecontains info

// about counters, time taken,number of

// machines used, etc.

return 0;

}