MapReduce 模式、算法和用例

时间:2021-07-28 18:23:21

MapReduce 模式、算法和用例

  在这篇文章中,我整合了一些MapReduce的模式和算法,以便于读者系统化地认识那些在互联网及科学文献中能够找到的不同技术。同时,我也提供了几个实用的案例学习。所有的描述及代码片段使用了标准Hadoop平台的MapReduce模型,包括:Mappers,Reduce,Combiners,Partitions和Sorting。下图描绘了这个框架。

                              图1  MapReduce框架

常规的MapReduce模式

计数和总结

问题陈述:从一批含有术语集合的文档中,计算出各个术语总共出现的次数。亦或是,计算出各个术语任意的函数结果。例如:通过一个日志文件的各个记录(每条记录包含一个响应时间)来计算平均响应时间。

解决方案:让我们先从一些简单的问题着手。下面的代码片段展示了这样一个含义:当每个Mapper处理一个术语时,它只发出一个“1”;而Reduceer则遍历整个术语列表并对其求和。

class Mapper

  method Map(docid id, doc d)

      for all term t in doc d do

         Emit(term t, count 1)

 

class Reducer

  method Reduce(term t, counts [c1, c2,...])

      sum = 0

      for all count c in [c1, c2,...] do

          sum = sum + c

      Emit(term t, count sum)

这种方法明显的缺陷是:Mapper发出了大量虚假计数。通过对每个文档的计数进行总结,Mapper能够减少较多的计数。

class Mapper

  method Map(docid id, doc d)

      H = new AssociativeArray

      for all term t in doc d do

          h{t} = h{t} + 1

      for all term t in H do

         Emit(term t, count H{t})

为了不仅能够对一个文档累加计数,而且能够对一个Mapper节点处理的所有文档进行累加计数,可以补充使用Combiners。

class Mapper

  method Map(docid id, doc d)

      for all term t in doc d do

         Emit(term t, count 1)

 

class Combiner

  method Combine(term t, [c1, c2,...])

      sum = 0

      for all count c in [c1, c2,...] do

          sum = sum + c

      Emit(term t, count sum)

 

class Reducer

  method Reduce(term t, counts [c1, c2,...])

      sum = 0

      for all count c in [c1, c2,...] do

          sum = sum + c

      Emit(term t, count sum)

应用:日志分析,数据查询。

比较

问题陈述:根据一个术语集合及一个术语的相关函数,找出所有具有相同函数值的术语并将其保存到文件中,亦或是根据所有能够共同处理的术语执行其它的计算。最典型的例子就是倒排索引的建立。

解决方案:这个解决方案是简单易懂的。Mapper为每个术语计算指定的函数,并将函数值和术语本身作为结果发出。Reducer获取所有依据函数值分类的术语,并对其进行处理或是保存等。在倒排索引例子中,术语就是单词,而函数值是含有该术语的文芳的编号。

应用:倒排索引,ETL(数据抽取、转换、装载)

过滤,分析及验证

问题陈述:从一些记录中,收集所有满足指定条件的记录,或是将每条记录(区别于其它记录)转换另一种表现形式。类似的案例包括诸如文本分析、数值抽取及格式转换等。

解决方案:这个解决方案相当直接。Mapper一条条录入记录,并发出接收的记录或是该记录转换后的格式。

应用:日志分析,数据查询,ETL,数据验证

执行分布式任务

问题陈述:一个大量数据的计算任务可以被划分成许多子任务,通过将所有子任务的结果进行合并,从而获得最终的结果。

解决方案:在划分一类特定事物时(这些事物被存储为Mapper的输入数据),每个Mapper录入一个事物,执行相应的计算并发出结果。Reducer合并所有被发出的部分结果,并形成最终结果。

案例学习:模拟数字通讯系统

类似于WiMAX那样的数字通讯系统的软件模拟器,它能够通过系统模型传递一组随机数据,并计算传输的错误概率。每一个Mapper模拟指定的数据量(规定样本的1/N),发出错误率。Reducer计算平均错误率。

应用:物理和工程模拟,数值分析,性能试验

排序

问题陈述:对一组记录根据指定的规则进行排序,或是让这些记录形成特定的顺序。

解决方案:简单排序是显而易见的。Mapper只要发出所有的元素及排序关键字(由元素函数值组合形成)。然而,实际上排序都在被巧妙地使用,这也就是为什么说排序是MapReduce(或hadoop)的核心思想之一。尤其是,非常普遍地使用综合的关键字来达到排序和分类的目的。最初,MapReduce中的排序是以根据关键字排序被发出的关键字值对为目的,但是存在着补充使用Hadoop实施细节来完成值排序的技术。如果MapReduce被用来排序原始数据(非中间数据),那将是无意义的,而持续使用BigTable概念维护数据的有序状态才是一个不错的选择。换句话说,在每次插入时排序数据将比在进行MapReduce查询时排序更加有效。

应用:ETL,数据分析

非常规的MapReduce模式

迭代消息传递(图形处理)

问题陈述:从一个实体关系网络结构中,根据其邻接实体的基本属性计算出每个实体的状态。这种状态可以是与其它实体间的距离,或是具备特定属性的邻接实体的表示,或是邻接实体密度的特征等等。

解决方案:上述网络结构被存储为一组节点,每个节点包含一张邻接表。概念上,MapReduce作业以迭代的方式执行,并在每次迭代中发送消息给它的相邻节点。每个邻接点根据接收的消息更新自己的状态。当满足一些特定条件,例如达到固定的最大迭代次数(比如说网络直径),或是两次连续的迭代没有使状态发生改变时,迭代即可终止。从技术角度来说,Mapper通过把邻接点编号作为关键字,来为每个节点发出消息。因此,所有的消息都依据引入的节点进行分类,同时减速器能够根据新状态重新计算状态和节点。下图描绘了这个算法。

class Mapper

  method Map(id n, object N)

      Emit(id n, object N)

      for all id m in N.OutgoingRelations do

         Emit(id m, message getMessage(N))

 

class Reducer

  method Reduce(id m, [s1, s2,...])

      M = null

      messages = []

      for all s in [s1, s2,...] do

          if IsObject(s) then

             M = s

          else               // s is a message

             messages.add(s)

      M.State = calculateState(messages)

      Emit(id m, item M)

需要着重强调的是,一个节点的状态会迅速在整个网络传播。网络不要太稀疏,因为所有的节点会被它的邻接点影响。下图描绘了这个处理过程

案例学习:树形目录的可用性传播

问题陈述:这个问题的灵感来源于现实生活的电子商务。从大的目录(如男人,女人,儿童)中分裂出小的目录(如男士牛仔裤,女士长裙),并最终形成小的终端目录(如男士蓝色牛仔裤)。终端目录是可获得的,也可以是不可获得的。当子树中至少存在一个可获得的终端目录时,则上层目录也可以获得。

解决方案:这个问题可以通过使用前一节描述的框架来解决。接下来我们定义getMessagecalculateState方法。

class N

State in {True= 2, False = 1, null = 0}, initialized 1 or 2 for

end-of-line categories, 0 otherwise

method getMessage(object N)

return N.State

method calculateState(state s, data [d1, d2,...])

return max([d1, d2,...] )

案例学习:广度优先搜索

问题陈述:从一个图形中,计算出从源点到所有其他节点的距离。

解决方案:源点发出0给所有的邻接点,而这些邻接点传递这个计数时不断增加1

class N

  State is distance, initialized 0 for source node, INFINITY for all othernodes

 

method getMessage(N)

  return N.State + 1

 

method calculateState(state s, data[d1, d2,...])

  min( [d1, d2,...] )

案例学习:网页排名和单侧映射数据汇总

这个算法是由谷歌建议,作为一个函数来计算一个网页的相关性。原本的算法相当复杂,但是在它的核心,它仅仅是传播节点之间的权重,其中每个节点利用平均传入的权重计算其自身的权重。

class N

State isPageRank

method getMessage(object N)

return N.State/ N.OutgoingRelations.size()

method calculateState(state s, data [d1, d2,...])

return ( sum([d1, d2,...]) )

值得提及的是,我们使用的架构太宽泛,以至于我们没有利用状态是数值这一特点。在许多实际的案例中,由于这一事实,我们可以在映射器端执行聚集值。这一优化方法可以从以下代码片段了解(网页排名)。

class Mapper

  method Initialize

      H = new AssociativeArray

  method Map(id n, object N)

      s = N.PageRank  / s.OutgoingRelations.size()

      Emit(id n, object N)

      for all id m in N.OutgoingRelations do

         H{m} = H{m} + p

  method Close

      for all id n in H do

         Emit(id n, value H{n})

 

class Reducer

  method Reduce(id m, [s1, s2,...])

      M = null

      for all s in [s1, s2,...] do

          if IsObject(s) then

             M = s

          else

             s = s + p

      M.PageRank = s

      Emit(id m, item M)

应用:图像分析,网站索引

区别重复值(计数独特元素)

问题陈述:从包含字段FG的记录集合中,为每个有相同G的记录子集(根据G分类),计算出其独特F的个数

这个问题可以稍微被归类概括为多面搜索。

问题陈述:现有一个记录集合,每个记录有字段F及任意个数的种类标签G={G1G2…}。为任一标签的子集计算出独特字段F的总数。例如:

Record 1: F=1, G={a, b}

Record 2: F=2, G={a, d, e}

Record 3: F=1, G={b}

Record 4: F=3, G={a, b}

 

Result:

a -> 3   // F=1, F=2, F=3

b -> 2   // F=1, F=3

d -> 1   // F=2

e -> 1   // F=2

解决方案I:第一种方法分两阶段解决问题。第一阶段:Mapper为每对FG发出虚拟计数器;Reducer计算每对出现的总次数。这阶段的主要目标F值的唯一性。第二阶段:根据G来分类形成分组,并计算出每个分组中元素总个数。

Phase I:

class Mapper

  method Map(null, record [value f, categories [g1, g2,...]])

      for all category g in [g1, g2,...]

         Emit(record [g, f], count 1)

 

class Reducer

  method Reduce(record [g, f], counts [n1, n2, ...])

      Emit(record [g, f], null )

Phase II:

class Mapper

  method Map(record [f, g], null)

      Emit(value g, count 1)

 

class Reducer

  method Reduce(value g, counts [n1, n2,...])

      Emit(value g, sum( [n1, n2,...] ) )

解决方案II:第二种方法只需要一个MapReduce作业,但是它不是可扩展的,并且适应性有限。这个算法十分简单。Mapper发出值和种类,Reducer从每个值的种类列表及每个种类的增量计数器列表中去除重复值。最后一步是对Reducer发出的所有计算器求和。如果具有相同F值的记录数不是非常多,并且种类的总数也是有限的,那么这个方法可以应用。例如:这个方法可用于处理网络日志和用户分类——用户总量很多,但单个用户的事件数及种类数都有限。在这个架构中,若在数据传递给Reducer之前,利用Combiners从种类列表中去除重复值将是无效的。

class Mapper

  method Map(null, record [value f, categories [g1, g2,...] )

      for all category g in [g1, g2,...]

          Emit(value f, category g)

 

class Reducer

  method Initialize

      H = new AssociativeArray : category ->count

   methodReduce(value f, categories [g1, g2,...])

      [g1', g2',..] = ExcludeDuplicates( [g1,g2,..] )

      for all category g in [g1', g2',...]

         H{g} = H{g} + 1

  method Close

      for all category g in H do

         Emit(category g, count H{g})

应用:日志分析,独特用户计数

互相关

问题陈述:现有一个元组集合,为每对可能的元素计算其共同出现的元组总数。如果元素的总数是N,则报告N*N。这个问题出现在文本分析(那就是说元素是词语,元组说是句子),市场分析(买这类商品的消费者倾向于也买那类商品)。如果N*N非常小并能够存储在单个机器的内存中,那么方案实施起来将是简单明了的。

成对方法:第一种方法是让Mapper发出所有元素对及虚拟计数器,然后利用Reducer对这些计数器求和。缺陷是:一Combiners的益处有限,而且很有可能所有的元素对都不同;二没有在内存中的积累。

class Mapper

  method Map(null, items [i1, i2,...] )

      for all item i in [i1, i2,...]

         for all item j in [i1, i2,...]

            Emit(pair [i j], count 1)

 

class Reducer

  method Reduce(pair [i j], counts [c1, c2,...])

      s = sum([c1, c2,...])

      Emit(pair[i j], count s)

成条方法:第二种方法是根据元素对中的第一个元素来分类,并在对所有邻接元素累加时维护一个关联数组(“条”)。Reducer为向导元素i接收所有的关联数组,合并它们,并发出同样的结果作为对方法。

l  产生较少的中间键,一次减少排序次数

l  最大化利用Combiners

l  执行内存中的累加。如果应用不当将会导致问题

l  等复杂的实现

l  一般来说,成条方法比成对方法更快

class Mapper

  method Map(null, items [i1, i2,...] )

      for all item i in [i1, i2,...]

         H = new AssociativeArray : item ->counter

         for all item j in [i1, i2,...]

            H{j} = H{j} + 1

         Emit(item i, stripe H)

 

class Reducer

  method Reduce(item i, stripes [H1, H2,...])

      H = new AssociativeArray : item ->counter

      H = merge-sum( [H1, H2,...] )

      for all item j in H.keys()

         Emit(pair [i j], H{j})

应用:文本分析,市场分析

关系MapReduce 模式

在这一节,我们将仔细研究关系运算并讨论在MapReduce中如何实现它们。

选择:

class Mapper

methodMap(rowkey key, tuple t)

if t satisfiesthe predicate

Emit(tuple t,null)

投影:投影比选择稍微复杂,但我们应该用Reducer来消除可能的重复值。

class Mapper

methodMap(rowkey key, tuple t)

tuple g =project(t) // extract required fields to tuple g

Emit(tuple g,null)

class Reducer

methodReduce(tuple t, array n) // n is an array of nulls

Emit(tuple t,null)

并:Mappers由两个待合并集合的所有记录来反馈。Reducer被用来去除重复值。

class Mapper

methodMap(rowkey key, tuple t)

Emit(tuple t,null)

class Reducer

methodReduce(tuple t, array n) // n is an array of one or two nulls

Emit(tuple t,null)

交:Mappers由两个待求交的集合的所有记录来反馈。Reducer只发出出现两次的记录,当且仅当这两个集合都包含该记录,因为记录包含的主键在一个集合中只出现一次。

class Mapper

methodMap(rowkey key, tuple t)

Emit(tuple t,null)

class Reducer

methodReduce(tuple t, array n) // n is an array of one or two nulls

if n.size() =2

Emit(tuple t,null)

差:现有两个记录集合—R和S,计算差集R-S。Mapper发出所有的元组和标签(记录所属集合的名字)。Reducer只发出R集合中记录。

class Mapper

methodMap(rowkey key, tuple t)

Emit(tuple t,string t.SetName) // t.SetName is either 'R' or 'S'

class Reducer

methodReduce(tuple t, array n) // array n can be ['R'], ['S'], ['R','S'], or ['S','R']

if n.size() =1 and n[1] = 'R'

Emit(tuple t,null)

分组和聚集:按以下方式,分组和聚集能够在一个MapReduce作业中执行。Mapper从每个元组中抽取值进行分类、聚集及发送。Reducer接收已分组的值进行聚集,并计算聚合函数。典型的聚合函数如求和、求最值,能够以流媒体的方式计算,因此不必同时处理所有的值。然而,在一些案例中,会用到双相的MapReduce作业,例如DistinctValues模式。

class Mapper

methodMap(null, tuple [value GroupBy, value AggregateBy, value ...])

Emit(valueGroupBy, value AggregateBy)

class Reducer

methodReduce(value GroupBy, [v1, v2,...])

Emit(valueGroupBy, aggregate( [v1, v2,...] ) ) // aggregate() :

sum(), max(),...

联接:在MapReduce框架中,联接能够非常完美的实现,但是存在着一些面向效率和数据量差异的技术。这一节我们学习一些基本的方法。参考文献中有指向联接技术学习的链接。

再分配联接(缩减联接,排序-合并联接)

这个算法根据键K对集合R和L进行联接。Mapper遍历R和L中的所有元组,从元组中取出值,给元组附上只是所属集合的标签,发出k作为键的标签元组。Reducer为一个特定的键K接收所有的元组,并把它们入两个容器中。当两个容器填满时,Reducer运行嵌套循环,并发出这两个容器中元素的交叉联接。每个被发出的元组由R元组,L元组及键K组成。这种方法有如下的缺陷:Mapper发出所有的数据,即使是只在一个集合出现的键。在内存中,Reducer应该为每个键保存所有的数据。如果数据没有存储,Reducer需要通过交换来解决。然而,再分配联接是最通用的技术,尤其是其它优化技术都不适用的时候。

class Mapper

  method Map(null, tuple [join_key k, value v1, value v2,...])

      Emit(join_key k, tagged_tuple [set_nametag, values [v1, v2, ...] ] )

 

class Reducer

  method Reduce(join_key k, tagged_tuples [t1, t2,...])

      H = new AssociativeArray : set_name ->values

      for all tagged_tuple t in [t1,t2,...]     // separate values into 2 arrays

         H{t.tag}.add(t.values)

      for all values r in H{'R'}                 // produce a cross-join of thetwo arrays

         for all values l in H{'L'}

            Emit(null, [k r l] )

复制联接(映射联接,哈希联接)

    实际上,将小集合和大集合联接是很典型的(那就是说带有日志记录列表的用户列表)。假设我们联接两个集合—R和L,R集合很小。如果是这样的话,集合R能够分发给所有的Mappers,并且每个Mapper能够加载并通过联接键为它们建立索引。最普遍和有效的缩印技术是哈希表。之后,Mapper遍历集合L的所有元组,并和存储在哈希表中集合R的元组联接。这种方法非常有效,因为不需要通过网络对集合L进行排序或是传输,但集合R必须足够小并能够分发给所有的Mapper。

class Mapper

methodInitialize

H = new AssociativeArray: join_key -> tuple from R

R = loadR()

for all [join_key k, tuple [r1, r2,...] ] in R

H{k} =H{k}.append( [r1, r2,...] )

methodMap(join_key k, tuple l)

for all tupler in H{k}

Emit(null,tuple [k r l] )

参考文献:

1. Join Algorithms using Map/Reduce

2. Optimizing Joins in a MapReduce Environment

机器学习和数学MapReduce算法

C. T. Chu et alprovides anexcellent description of machine learning algorithms for

MapReduce in the article Map-Reduce forMachine Learning on Multicore.

FFT using  MapReduce:http://www.slideshare.NET/hortonworks/large-scale-math-with-hadoop-mapred

uce

MapReduce for integer factorization:http://www.javiertordable.com/files/MapreduceForIntegerFactorization.pdf

Matrixmultiplication with

MapReduce: http://csl.skku.edu/papers/CS-TR-2010-330.pdfhttp://www.norstad.org/matrix-multiply/index.html





---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

本文译自 Mapreduce Patterns, Algorithms, and Use Cases

在这篇文章里总结了几种网上或者论文中常见的MapReduce模式和算法,并系统化的解释了这些技术的不同之处。所有描述性的文字和代码都使用了标准Hadoop的MapReduce模型,包括Mappers, Reduces, Combiners, Partitioners,和 sorting。如下图所示。

MapReduce 模式、算法和用例

基本MapReduce模式

 

计数与求和

问题陈述: 

有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。

解决方案:

让我们先从简单的例子入手。在下面的代码片段里,Mapper每遇到指定词就把频次记1,Reducer一个个遍历这些词的集合然后把他们的频次加和。


1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Reducer
7 method Reduce(term t, counts [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)

这种方法的缺点显而易见,Mapper提交了太多无意义的计数。它完全可以通过先对每个文档中的词进行计数从而减少传递给Reducer的数据量:


1 class Mapper
2 method Map(docid id, doc d)
3 H = new AssociativeArray
4 for all term t in doc d do
5 H{t} = H{t} + 1
6 for all term t in H do
7 Emit(term t, count H{t})

如果要累计计数的的不只是单个文档中的内容,还包括了一个Mapper节点处理的所有文档,那就要用到Combiner了:


1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Combiner
7 method Combine(term t, [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
12
13 class Reducer
14 method Reduce(term t, counts [c1, c2,...])
15 sum = 0
16 for all count c in [c1, c2,...] do
17 sum = sum + c
18 Emit(term t, count sum)
应用:Log 分析, 数据查询

 

整理归类

 

问题陈述:

有一系列条目,每个条目都有几个属性,要把具有同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。

解决方案:

解决方案很简单。 在 Mapper 中以每个条目的所需属性值作为 key,其本身作为值传递给 Reducer。 Reducer 取得按照属性值分组的条目,然后可以处理或者保存。如果是在构建倒排索引,那么 每个条目相当于一个词而属性值就是词所在的文档ID。

应用:倒排索引, ETL

过滤 (文本查找),解析和校验

问题陈述:

假设有很多条记录,需要从其中找出满足某个条件的所有记录,或者将每条记录传换成另外一种形式(转换操作相对于各条记录独立,即对一条记录的操作与其他记录无关)。像文本解析、特定值抽取、格式转换等都属于后一种用例。

解决方案:

非常简单,在Mapper 里逐条进行操作,输出需要的值或转换后的形式。

应用:日志分析,数据查询,ETL,数据校验

 

分布式任务执行

 

问题陈述:

大型计算可以分解为多个部分分别进行然后合并各个计算的结果以获得最终结果。

解决方案:  将数据切分成多份作为每个 Mapper 的输入,每个Mapper处理一份数据,执行同样的运算,产生结果,Reducer把多个Mapper的结果组合成一个。

案例研究: 数字通信系统模拟

像 WiMAX 这样的数字通信模拟软件通过系统模型来传输大量的随机数据,然后计算传输中的错误几率。 每个 Mapper 处理样本 1/N  的数据,计算出这部分数据的错误率,然后在 Reducer 里计算平均错误率。

应用:工程模拟,数字分析,性能测试

排序

问题陈述:

有许多条记录,需要按照某种规则将所有记录排序或是按照顺序来处理记录。

解决方案: 简单排序很好办 – Mappers 将待排序的属性值为键,整条记录为值输出。 不过实际应用中的排序要更加巧妙一点, 这就是它之所以被称为MapReduce 核心的原因(“核心”是说排序?因为证明hadoop计算能力的实验是大数据排序?还是说Hadoop的处理过程中对key排序的环节?)。在实践中,常用组合键来实现二次排序和分组。

MapReduce 最初只能够对键排序, 但是也有技术利用可以利用Hadoop 的特性来实现按值排序。想了解的话可以看这篇博客

按照BigTable的概念,使用 MapReduce来对最初数据而非中间数据排序,也即保持数据的有序状态更有好处,必须注意这一点。换句话说,在数据插入时排序一次要比在每次查询数据的时候排序更高效。

应用:ETL,数据分析

 

非基本 MapReduce 模式

 

迭代消息传递 (图处理)

 

问题陈述:

假设一个实体网络,实体之间存在着关系。 需要按照与它比邻的其他实体的属性计算出一个状态。这个状态可以表现为它和其它节点之间的距离, 存在特定属性的邻接点的迹象, 邻域密度特征等等。

解决方案:

网络存储为系列节点的结合,每个节点包含有其所有邻接点ID的列表。按照这个概念,MapReduce 迭代进行,每次迭代中每个节点都发消息给它的邻接点。邻接点根据接收到的信息更新自己的状态。当满足了某些条件的时候迭代停止,如达到了最大迭代次数(网络半径)或两次连续的迭代几乎没有状态改变。从技术上来看,Mapper 以每个邻接点的ID为键发出信息,所有的信息都会按照接受节点分组,reducer 就能够重算各节点的状态然后更新那些状态改变了的节点。下面展示了这个算法:


1 class Mapper
2 method Map(id n, object N)
3 Emit(id n, object N)
4 for all id m in N.OutgoingRelations do
5 Emit(id m, message getMessage(N))
6
7 class Reducer
8 method Reduce(id m, [s1, s2,...])
9 M = null
10 messages = []
11 for all s in [s1, s2,...] do
12 if IsObject(s) then
13 M = s
14 else // s is a message
15 messages.add(s)
16 M.State = calculateState(messages)
17 Emit(id m, item M)

一个节点的状态可以迅速的沿着网络传全网,那些被感染了的节点又去感染它们的邻居,整个过程就像下面的图示一样:


MapReduce 模式、算法和用例

案例研究: 沿分类树的有效性传递

问题陈述:

这个问题来自于真实的电子商务应用。将各种货物分类,这些类别可以组成一个树形结构,比较大的分类(像男人、女人、儿童)可以再分出小分类(像男裤或女装),直到不能再分为止(像男式蓝色牛仔裤)。这些不能再分的基层类别可以是有效(这个类别包含有货品)或者已无效的(没有属于这个分类的货品)。如果一个分类至少含有一个有效的子分类那么认为这个分类也是有效的。我们需要在已知一些基层分类有效的情况下找出分类树上所有有效的分类。

解决方案:

这个问题可以用上一节提到的框架来解决。我们咋下面定义了名为 getMessage和 calculateState 的方法:


1 class N
2 State in {True = 2, False = 1, null = 0},
3 initialized 1 or 2 for end-of-line categories, 0 otherwise
4 method getMessage(object N)
5 return N.State
6 method calculateState(state s, data [d1, d2,...])
7 return max( [d1, d2,...] )

案例研究:广度优先搜索

问题陈述需要计算出一个图结构中某一个节点到其它所有节点的距离。

解决方案: Source源节点给所有邻接点发出值为0的信号,邻接点把收到的信号再转发给自己的邻接点,每转发一次就对信号值加1:


1 class N
2 State is distance,
3 initialized 0 for source node, INFINITY for all other nodes
4 method getMessage(N)
5 return N.State + 1
6 method calculateState(state s, data [d1, d2,...])
7 min( [d1, d2,...] )

案例研究:网页排名和 Mapper 端数据聚合

这个算法由Google提出,使用权威的PageRank算法,通过连接到一个网页的其他网页来计算网页的相关性。真实算法是相当复杂的,但是核心思想是权重可以传播,也即通过一个节点的各联接节点的权重的均值来计算节点自身的权重。


1 class N
2 State is PageRank
3 method getMessage(object N)
4 return N.State / N.OutgoingRelations.size()
5 method calculateState(state s, data [d1, d2,...])
6 return ( sum([d1, d2,...]) )

要指出的是上面用一个数值来作为评分实际上是一种简化,在实际情况下,我们需要在Mapper端来进行聚合计算得出这个值。下面的代码片段展示了这个改变后的逻辑 (针对于 PageRank 算法):


1 class Mapper
2 method Initialize
3 H = new AssociativeArray
4 method Map(id n, object N)
5 p = N.PageRank / N.OutgoingRelations.size()
6 Emit(id n, object N)
7 for all id m in N.OutgoingRelations do
8 H{m} = H{m} + p
9 method Close
10 for all id n in H do
11 Emit(id n, value H{n})
12
13 class Reducer
14 method Reduce(id m, [s1, s2,...])
15 M = null
16 p = 0
17 for all s in [s1, s2,...] do
18 if IsObject(s) then
19 M = s
20 else
21 p = p + s
22 M.PageRank = p
23 Emit(id m, item M)
应用:图分析,网页索引

 

值去重 (对唯一项计数)

问题陈述: 记录包含值域F和值域 G,要分别统计相同G值的记录中不同的F值的数目 (相当于按照 G分组).

这个问题可以推而广之应用于分面搜索(某些电子商务网站称之为Narrow Search)

  Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G={b}
Record 4: F=3, G={a, b}

Result:
a -> 3 // F=1, F=2, F=3
b -> 2 // F=1, F=3
d -> 1 // F=2
e -> 1 // F=2

解决方案 I:

第一种方法是分两个阶段来解决这个问题。第一阶段在Mapper中使用F和G组成一个复合值对,然后在Reducer中输出每个值对,目的是为了保证F值的唯一性。在第二阶段,再将值对按照G值来分组计算每组中的条目数。

第一阶段:


1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...]])
3 for all category g in [g1, g2,...]
4 Emit(record [g, f], count 1)
5
6 class Reducer
7 method Reduce(record [g, f], counts [n1, n2, ...])
8 Emit(record [g, f], null )

第二阶段:


1 class Mapper
2 method Map(record [f, g], null)
3 Emit(value g, count 1)
4
5 class Reducer
6 method Reduce(value g, counts [n1, n2,...])
7 Emit(value g, sum( [n1, n2,...] ) )

解决方案 II:

第二种方法只需要一次MapReduce 即可实现,但扩展性不强。算法很简单-Mapper 输出值和分类,在Reducer里为每个值对应的分类去重然后给每个所属的分类计数加1,最后再在Reducer结束后将所有计数加和。这种方法适用于只有有限个分类,而且拥有相同F值的记录不是很多的情况。例如网络日志处理和用户分类,用户的总数很多,但是每个用户的事件是有限的,以此分类得到的类别也是有限的。值得一提的是在这种模式下可以在数据传输到Reducer之前使用Combiner来去除分类的重复值。


1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...] )
3 for all category g in [g1, g2,...]
4 Emit(value f, category g)
5
6 class Reducer
7 method Initialize
8 H = new AssociativeArray : category -> count
9 method Reduce(value f, categories [g1, g2,...])
10 [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
11 for all category g in [g1', g2',...]
12 H{g} = H{g} + 1
13 method Close
14 for all category g in H do
15 Emit(category g, count H{g})
应用:日志分析,用户计数

互相关

问题陈述:有多个各由若干项构成的组,计算项两两共同出现于一个组中的次数。假如项数是N,那么应该计算N*N。

这种情况常见于文本分析(条目是单词而元组是句子),市场分析(购买了此物的客户还可能购买什么)。如果N*N小到可以容纳于一台机器的内存,实现起来就比较简单了。

配对法

第一种方法是在Mapper中给所有条目配对,然后在Reducer中将同一条目对的计数加和。但这种做法也有缺点:

  • 使用 combiners 带来的的好处有限,因为很可能所有项对都是唯一的
  • 不能有效利用内存

1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 for all item j in [i1, i2,...]
5 Emit(pair [i j], count 1)
6
7 class Reducer
8 method Reduce(pair [i j], counts [c1, c2,...])
9 s = sum([c1, c2,...])
10 Emit(pair[i j], count s)

Stripes Approach(条方法?不知道这个名字怎么理解)

第二种方法是将数据按照pair中的第一项来分组,并维护一个关联数组,数组中存储的是所有关联项的计数。The second approach is to group data by the first item in pair and maintain an associative array (“stripe”) where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.

  • 中间结果的键数量相对较少,因此减少了排序消耗。
  • 可以有效利用 combiners。
  • 可在内存中执行,不过如果没有正确执行的话也会带来问题。
  • 实现起来比较复杂。
  • 一般来说, “stripes” 比 “pairs” 更快

1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 H = new AssociativeArray : item -> counter
5 for all item j in [i1, i2,...]
6 H{j} = H{j} + 1
7 Emit(item i, stripe H)
8
9 class Reducer
10 method Reduce(item i, stripes [H1, H2,...])
11 H = new AssociativeArray : item -> counter
12 H = merge-sum( [H1, H2,...] )
13 for all item j in H.keys()
14 Emit(pair [i j], H{j})
应用:文本分析,市场分析




参考资料:Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce

用MapReduce 表达关系模式

在这部分我们会讨论一下怎么使用MapReduce来进行主要的关系操作。

筛选(Selection)


1 class Mapper
2 method Map(rowkey key, tuple t)
3 if t satisfies the predicate
4 Emit(tuple t, null)

投影(Projection)

投影只比筛选稍微复杂一点,在这种情况下我们可以用Reducer来消除可能的重复值。


1 class Mapper
2 method Map(rowkey key, tuple t)
3 tuple g = project(t) // extract required fields to tuple g
4 Emit(tuple g, null)
5
6 class Reducer