摘要:超大规模数据挖掘和数据分析需求的日益增长,引领着工业和学术界设计大数据计算平台新模式。MapReduce和Dryad是两种流行的平台,数据流在这两种平台上采用操作符的有向非循环图形式。迭代程序在数据挖掘、网页排名、图像分析、模型拟合等许多应用领域中自然而然地出现了,而这两种平台缺乏对迭代程序的内嵌支持。在这篇文章中会呈现HaLoop,这是一种改进的用来服务于各种应用的Hadoop MapReduce框架。HaLoop不仅在编程上支持迭代的应用,而且为了动态地提高其运行效率,采取了调度任务程序感知循环和增加各种缓存的机制。我们在真正的查询和真实的数据集上评价HaLoop。HaLoop减少了1.85倍的查询时间,而且在mapper和reducers之间shuffles的数据只有4%。
介绍
大规模数据密集型应用的探索,使高度可扩展的并行数据处理平台的需求日益上升。工业上的探索有网页数据分析、点击流量分析、网络管理日志分析等,科学上的探索有大规模模拟产生的数据分析,传感器部署,高通量实验室设备。
MapReduce是一种有名的计算框架,它可以对商品计算机集群进行编程,实现在单程中执行大规模数据处理。MapReduce集群可以以容错的方式扩展到上千个结点。虽然并行数据库系统可能也服务这些数据分析应用,但他们比较昂贵、难以管理或者对于长时间的查询缺乏容错性。Hadoop是一个开源的MapReduce实现, Yahoo!、FaceBook和其它厂商已经将它拿来做大数据分析。有了MapReduce框架,程序员通过实现map和reduce函数来转化和合并数据从而简单地并行化他们的应用。很多算法可以很自然地适用于MapReduce模型,比如单词统计,等值连接查询和倒序列表构建。
然而,许多数据分析技术要求迭代计算,包括网页排名、超文本诱导的主题搜索、递推关系查询、聚类、神经网络分析、社交网络分析以及网络流量分析。这些技术有一个共性:数据的计算在满足收敛或停止条件前是被迭代处理的。MapReduce框架并没有直接支持这些迭代的数据分析应用。程序员必须通过手动发布多个MapReduce作业并使用驱动程序来编写执行程序从而实现迭代程序。
在MapReduce中手动编写一个迭代程序会存在两个关键的问题。第一个问题是即使大多数的数据可能在迭代过程中不会改变,但数据在每次迭代中必定会被重复加载和处理,这样会浪费I/O、网络带宽和CPU资源。第二个问题是终止条件涉及到何时到达固定点——或者说是当应用的输出在连续两次迭代中不改变时。这种情况可能每次迭代本身都需要额外的MapReduce作业,卷入额外任务方面的开销,读取磁盘的数据,以及通过网络移动数据。为了显示这些问题,请考虑以下两个例子。
EXAMPLE 1. 网页排名(PageRank),PageRank是一种链接分析算法,通过根据入链的权重迭代计算每个顶点的权重,为图中每个顶点分配权重(等级)。在关系代数中,PageRank算法可以表示为一个连接,连接步骤之后紧跟的是两个合并的更新。这些步骤必定会被驱动程序重复执行直到满足终止条件(比如,每个页面的排名收敛或到达了一个指定的迭代次数)。
图1展示了一个具体的例子。R0(图1a)是初始等级表,L(图1b)是联系表。MapReduce作业(MR1和MR2在图1c中)需要实现RageRank的循环体。第一个MapReduce作业连接等级表和联系表。Mappers将连接两者关系的列作为key,将剩下的列作为value。Reducers计算每个单独的源URL连接,以及每个出链的排名贡献(new_rank)。第二个MapReduce作业计算每个单独目标URL的排名总和:map函数是个体函数,reduce函数把每个入链的排名贡献求和。每次迭代中,都是从Ri更新到Ri+1。举个例子,可以通过迭代计算R1,R2而得到R3。
在PageRank算法中,联系表L是始终不变的。然而由于MapReduce框架没有意识到这个属性,L会在每次迭代中处理和shuffle。更糟的是,不变的联系表可能常常比结果的等级表要大得多。最后,决定排名是否收敛要求每次迭代都有一个额外的MapReduce作业去检测。
EXAMPLE 2. 后代查询Descendant Query. 给定社会网络关系如图2a所示,谁跟Eric是跳两级关系的朋友?为了查询这个问题,我们首先可以找到Eric直接的朋友,然后找所有朋友的朋友。一个相关的查询就是找出所有朋友关系表F中能到达Eric的朋友。这些查询可以通过执行两个MapReduce作业(图2b的MR1和MR2)的驱动程序来实现,要么两次迭代,要么直到达到固定点为止。第一个MapReduce作业通过连接上一次迭代后的朋友表找出新的一级朋友
假设
就像PageRank的例子,一些重要的数据(朋友表F)在查询的整个过程中保持不变,会在每次迭代中被处理和shuffled。
许多其它的数据分析应用都有与上述两个例子相似的特性:一部分重要的被处理数据在迭代过程中保持不变,并且分析需要不断进行直到到达fixpoint。有些例子包括大多数迭代的模型拟合算法(比如k-means聚类和神经网络分析)、大多数网页/图像排名算法(如HITS)、递推图像或网络查询。
这篇文章介绍了一个新的系统叫做HaLoop,它被设计用来高效地处理上述类型的应用。HaLoop扩展了MapReduce,并基于两个简单的直觉。首先,一个MapReduce集群可以缓存第一次迭代的固定数据,这些数据在之后的迭代中会重复利用。第二,一个MapReduce集群可以缓存reducer的输出,这些输出使得检查fixpoint变得更加高效,而不需要额外的MapReduce作业。
这篇文章做了以下贡献:
1、 新编程模型和结构用来解决迭代程序:HaLoop处理本应该手动编程的循环控制。它提供了编程接口来表示迭代的数据分析应用(第2节)。
2、 感知循环任务计划:HaLoop的任务计划通过在物理上共定位任务使迭代过程中数据再利用,共定位任务是指在不同的迭代中处理相同的数据(第3节)。
3、 循环不变数据缓存:在application的第一次迭代期间,HaLoop在集群结点上缓存和索引迭代不变的数据。缓存不变的数据减少了在随后的迭代过程中的loading和shuffling的I/O开销(4.1节和4.3节)。
4、 缓存支持Fixpoint评估:HaLoop 缓存和索引一个reducer的本地输出。这避免了对于fixpoint或收敛点检查的专用步骤的需要。
5、 实验学习:我们评估了运行迭代程序的系统,该系统处理了合成的和现实世界的数据。HaLoop在所有指标中都胜过Hadoop;平均地讲,HaLoop减少1.85倍的查询运行时间,而且在mappers和reducers之间的shuffles只有4%(第5节)。
HaLoop概述
这节介绍了HaLoop的结构和它的应用编程模型。
2.1 结构
图3阐述了HaLoop的结构,一个改进版本的开源MapReduce实现的Hadoop。
HaLoop继承了基本的分布式计算模型和Hadoop架构。HaLoop依靠分布式文件系统(HDFS)存储每个作业的输入和输出数据。该系统被分成两部分:一个master结点和多个slave结点。客户端提交作业到master结点上。每个被提交的作业,master结点调度一些并行的任务在slave结点上跑。每个slave结点有一个任务跟踪守护进程来与master结点通信和管理每个任务的执行。这些任务要么是map任务(map任务通常是转换数据块,并通过一个键值对调用用户定义的map函数),要么是reduce任务(reduce任务通常复制mapper输出的对应分区,对输入keys进行分组,并通过一个key和相关联的values唤醒一个用户定义的reduce函数)。举个例子,图3中有三个正在系统中运行的作业:
作业1,作业2和作业3。每个作业有三个任务同时运行在slave结点上。
为了满足迭代数据分析应用的要求,我们对基本的Hadoop MapReduce框架做了几个改变。首先,HaLoop给用户提供了新的应用编程接口,该接口可以简化迭代MapReduce程序的表示。第二,HaLoop的master结点包含了一个新的循环控制模块,该模块可以重复地启动新的组成循环体的map-reduce步骤,直到满足一个用户指定的停止条件。第三,HaLoop使用一个新的任务调度器来迭代应用程序(第3节)。第四,HaLoop在slave结点上缓存和索引应用数据(第4节)。如图3所示,与Hadoop一样,HaLoop依靠相同的文件系统并且有相同的任务队列结构,但是任务调度器和任务跟踪模块被改变了,循环控制、缓存和索引模块都是新的。任务跟踪器不仅管理任务的执行,还管理在slave结点上的缓存和索引,并重定向每个任务缓存和索引访问本地文件系统。
2.2 编程模型
PageRank和descendant query的例子是HaLoop支持的具有代表性的迭代程序。这里,我们展示了递推程序的一般形式和详细的API。
HaLoop程序所支持的迭代程序可以提取出以下的核心结构:
R0是初始的结果,L是不变的关系。当达到固定点时——当结果在相邻两次迭代都不改变时,如Ri+1=Ri,此形式的程序将终止。这个形式足够表示递推程序的广泛的类了。
典型地,fixpoint被定义为连续两次迭代恰恰相等的点,但是HaLoop也支持近似fixpoint的概念,就是说当连续两次迭代的差小于用户设定的阈值或达到最大的迭代次数时,计算停止。在机器学习和复杂的分析中,两种近似fixpoints对于表示收敛条件是非常有用的。比如,对于PageRank,使用用户设定的阈值或固定的迭代次数作为循环终止的条件是很常见的。
虽然我们的递推公式描述了我们打算支持的迭代程序类,但是这项工作没有开发用于表达递推查询的高级声明语言。不过,我们把焦点放在提供一个针对迭代MapReduce程序的有效的基础API上;我们断定各种各样的高级语言(如,Datalog)可以在这个基础上实现。
为了编写一个HaLoop程序,程序员需要定制循环体(一个或多个map-reduce对)和选择性地定制一个终止条件和循环不变的数据。我们现在讨论HaLoop的API(见附件图16的总结)。Map和Reduce与标准的MapReduce相似而且也是按要求要有的;剩下的API是新的也是可选的。
为了制定循环体,程序员通过下列函数构造多步MapReduce作业:
1、 Map 转换输入的
<key,value>
元组到中间的<in_key,in_value>
元组 2、 Reduce 处理中间的元组,合并相同的key,输出
<out_key,out_value>
元组。接口包括用来缓存不变值的新参数,这个不变值与in_key关联。 3、 AddMap和AddReduce表示由超过一个MapReduce步骤组成的循环体。AddMap(AddReduce)通过一个整数表示步骤的顺序来关联一个Map(Reduce)函数。
HaLoop默认测试两次迭代是否相等来决定是否要停止计算。为了指定一个近似fixpoin的终止条件,程序员可以使用下面的函数:
1、 SetFixedPointThreshold在连续两次迭代的距离上设置一个界限。如果这个界限被超越,近似fixpoint还没有到达,计算还要继续。
2、 ResultDistance函数计算了两个具有相同out_key的out_value集合的距离。一个out_value集合
3、 SetMaxNumOfIterations提供了循环终止条件的进一步控制。不管这次和上次迭代输出的距离,如果已经执行到最大的迭代次数,HaLoop就会终止一个作业。SetMaxNumOfIterations也可以被用来实现简单的for循环。
为了指定和控制输入,程序员使用:
4、 SetIterationInput关联一个输入源与一个特定的迭代,因为输入文件在不同的迭代可能会不同。比如,Example 1中,每个i+1次迭代的输入为
5、 AddStepInput关联循环体中额外的输入源和一个中间map-reduce对。前面的map-reduce对的输出总是在下一个map-reduce对的输入中。
6、 AddInvariantTable指定一个循环不变的输入表(一个HDFS文件)。在作业执行期间,HaLoop将在集群结点上缓存这张表。
这个编程接口足够表示多种迭代应用了。附件记录了PageRank(9.2节)、descendant query(9.3节)、k-means(9.4节)在这种接口上的实现。图4展示了HaLoop与Hadoop的区别,从应用的角度看:在HaLoop中,用户程序指定循环设置和框架来控制循环执行,而在Hadoop中,控制循环是application的责任。
3 感知循环任务调度Loop-Aware Task Scheduling
这节介绍了HaLoop任务调度器。该调度器潜在地给迭代程序提供比Hadoop调度器更好的调度。3.1节和3.2节分别阐述了所需的调度和调度算法。
3.1 Inter-Iteration Locality
HaLoop调度器的高级目标是把这些map和reduce任务放在同样的物理机器上,这些任务处理着相同的数据但执行着不同的迭代。通过这个方法,数据可以更加容易被缓存并在迭代中再利用。比如,图5是一个样本在连接步骤的调度。该步骤是来自例1的PageRank应用的MR1(图1 c)。作业中有两个迭代和三个slave结点。
第一个迭代的调度跟Hadoop没区别。在第一个迭代的连接步骤中,输入表为L和R0.三个map任务被执行,每个map任务从一个或其它的输入数据文件中加载一部分数据。对于普通的Hadoop,mapper的输出key(在这个例子中是连接属性)被hash,来决定分配给哪个reduce任务。然后,三个reduce任务会被执行,每个reduce任务加载收集的mapper输出的一个分区。在图5中,reducer R00处理key的hash值为0的mapper输出,reducer R10处理hash值为1的,reducer R20处理hash值为2的。
迭代2的连接步骤的调度可以利用迭代之间的局部性:处理指定数据分区D的任务(mapper或reducer)在物理结点上被调度,其中D在迭代1中被处理。注意L和R1是输入到迭代2的连接步骤中的两个文件。
图5中的调度提供了从过去的迭代再利用循环不变数据的可行性。因为L是循环不变的,mappers M01和M11会计算出与M00和M10相同的结果。完全没有必要再计算这些mapper的输出,也没有必要把它们传输给reducers。在迭代1中,如果reducer输入分区0、1和2被分别存储在节点n3、n1和n2上,那么在迭代2中,L不需要再加载、处理和shuffle。在那种情况下,在迭代2中,只有一个mapper M21需要启动用于R1-split0,因此三个reducers只会从M21中复制中间数据。有了这个策略,虽然reducer的输入是没有区别的,但是来源却有两类:mappers的输出(跟通常一样)和本地磁盘。
在图5中,我们把调度的特性叫做inter-iteration locality。令d为一个文件拆分(mapper 输入分区)或一个reducer输入分区, 为第i次迭代中使用d的任务。然后我们说所谓的inter-iteration locality就是如果对于所有的i>1, 和 被分配到同样的物理结点上,前提是假设 存在的话。在HaLoop中任务调度的目标是实现inter-iteration locality。为了实现这个目标,唯一的限制是HaLoop要求reduce任务的数量需要在迭代期间保持不变,以至于分配mapper输出给reducer结点的hash函数可以保持不变。
3.2 调度算法
HaLoop的调度器跟踪每个物理机器上的被每个map和reduce任务处理的数据分区。它使用这个信息来调度随后的任务,这些任务正是要考虑到inter-iteration locality。
更具体的说,HaLoop调度器的工作如下。当收到来自slave结点的heartbeat时,master结点尝试去分配给slave结点一个还未分配的任务,这个任务使用缓存在该结点上的数据。为了支持这种分配,master结点维护从每个slave结点到该结点在上一次迭代中处理的数据分区的映射。如果slave结点已经满载,master再次分配任务给相邻的slave结点。
图6给出了调度算法的伪代码。在每个迭代之前,把previous设置成current,然后current设置成一个新的空的HashMap对象。在作业的第一次迭代中,调度器与Hadoop是一样的(line 2)。第一次调度之后,master结点记住了数据和结点的关联(lines 3 and 13)。在随后的迭代中,调度器尝试保持之前的数据-结点关联(lines 11 and 12)。如果由于负载的原因,这种关联不能再维持住了,master结点将和另一个结点关联数据(lines 6-8)。
4 缓存和索引
由于任务调度器提供的inter-iteration locality,一个物理结点通常只需访问特定的循环不变数据分区。为了减少I/O代价,HaLoop在物理结点的本地磁盘上缓存了这些数据分区,以便随后的再利用。为了加快处理速度,HaLoop索引了缓存的数据。如果一个缓存变得不可用了,它会自动从map任务所在的物理结点或HDFS重新加载。HaLoop支持三种类型的缓存:reducer输入缓存,reducer输出缓存,和mapper输入缓存。这每一个都适合一定数量的Application场景。Application程序员可以通过HaLoop API选择开启或关闭一个缓存类型。(见附件9.1)
4.1 Reducer输入缓存
如果一张中间表被指定循环不变(通过HaLoop API AddInvariantTable)和reducer输入缓存被开启,HaLoop将缓存所有reducer的输入并创建缓存数据的索引。注意reducer输入在每个reduce函数调用前被缓存,以至于reducer输入缓存的元组可以通过reducer输入key被排序和分组。
让我们考虑社会网络的例子(Example 2)来知道reducer输入缓存是如何工作的。作业中有三个物理结点n1、n2和n3,reducers的数量设置成2。在第一次迭代的连接步骤中,有三个mappers:一个处理F-split0,一个处理F-split1,一个处理∆S0-split0。三个拆分在图7中展示。两个reducer的输入分区在图8中展示。n1上的reducer对应hash值为0,n2上的reducer对应的hash值为1。由于表F(表ID为“#1”)被程序员使用AddInvariantTable函数设置成不变的,每个reducer将缓存表ID为“#1”的元组到本地文件系统。
在后面的迭代中,当一个reducer传递已shuffle好的key和关联的values到用户定义好的Reduce函数时,也会在本地reducer输入缓存中搜索key来找出关联的values,并一起传递给Reduce函数(注意HaLoop调整的Reduce接口接受这种参数;详见附件9.1)。而且,如果reducer输入缓存被打开,在第一次迭代中的mapper输出会被缓存到相应的mapper本地磁盘供未来的reducer缓存加载。
在缓存的物理层,keys和values被分到两个文件,每个key有关联到它对应的values的指针。有时在缓存不变数据中的选择性是很低的,因此,在reducer输入数据被缓存到本地磁盘时,HaLoop在keys之上创建索引,也把它存储到本地文件系统里。由于reducer输入缓存排好序然后被reducer输入key以同样的排序获得,磁盘搜索操作只会以前面的方式执行,更差的情况是,在每次迭代中,输入缓存从本地磁盘按顺序被扫描一遍。
Reducer输入缓存适合PageRank,HITS,各种递推关系的查询,任何其它与大量不变数据重复连接的算法。Reducer输入缓存要求分区函数f对于每个mapper输出元组t满足:(1)f必须确定的,(2)f必须始终保持不变,(3)除了元组t,f必须没有其它任何输入。在HaLoop中,reduce任务的数量在迭代过程中没有改变,因此默认的hash分区满足这些条件。
4.2 Reducer输出缓存
Reducer输出缓存在每个reducer结点上存储和索引最近的本地输出。这个缓存被用来减少评估fixpoint终止条件的开销。就是说,如果application必须通过比较当前的迭代输出与上一次迭代的输出来测试收敛的条件,reducer输出缓存使框架能够用分布式的方式去比较。
Reducer输出缓存被用在需要在每次迭代后对fixpoint进行评估的应用上。比如,在PageRank中,用户可能会设置这样一个收敛条件,指定整个排名在相邻两次迭代的差别低于设定好的阈值。有了reducer输出缓存,不需要分离的MapReduce步骤,fixpoint就可以用分布式的方式去评估。在所有Reduce函数完成之后,每个reducer在reduce过程中评估fixpoint的条件并把这个本地的评估结果报告给master结点,master结点算出最后的应答。
Reducer输出缓存要求,在循环体中的最后一个map-reduce对的mapper输出分区函数和reduce函数要满足下述条件:如果
4.3 Mapper输入缓存
Hadoop尝试将map任务与他们的输入数据协同定位。在现实世界的Hadoop集群中,数据本地mappers的速度大约为70%-95%,这依靠运行时的环境。HaLoop的mapper输入缓存是为了避免在非初始迭代期间mapper的非本地化数据读取。在第一次迭代中,如果一个mapper非本地化读取了一个拆分的输入,该输入将会被缓存到mapper所在的物理结点上的本地磁盘。然后,随着循环感知任务的调度,在后续的迭代中,所有mappers只从本地磁盘中读取数据,或来自HDFS或来自本地文件系统。Mapper输入缓存可以用在模型拟合的应用如k-means聚类,神经网络分析,其它任何用到mapper输入的迭代算法,其中mapper输入在迭代期间不能改变。
4.4 缓存重新加载
存在一小部分缓存必须重新构造的情况:(1)主机结点失败,(2)主机结点满载,map或reduce任务必须调度到不同的替代结点上。Reducer通过从第一次迭代的mapper输出复制所需的分区构造reducer输入缓存。为了重新加载mapper输入缓存或reducer输出缓存,mapper/reducer只需从分布式文件系统中读取对应块,分布式文件系统存储了缓存数据的副本。缓存重新加载对用户程序完全透明的。
5 实验评估
我们比较了HaLoop和Hadoop在迭代数据分析应用上的性能。由于reducer输入缓存,reducer输出缓存和mapper输入缓存是独立可选的,我们在5.1-5.3节把它们各自评估。
5.1 Reducer输入缓存评估
这套实验在Amazon的弹性云(EC2)上使用了50个虚拟机器集群和90个slave结点。Master结点总是只有一个。Application选择了PageRank和descendant query。两个application都在HaLoop(使用新的编程模型)和Hadoop(使用传统的驱动方法)上实现。
我们使用半合成和现实世界的数据集:Livejournal(18GB,社交网络数据),Triples(120GB,语义网站数据)和Freebase(12GB,概念联动图)。硬件和数据集的描述详见9.6节。
我们在Livejournal和Freebase上执行PageRank查询,在Livejournal和Triples数据集上执行descendant query。图9-12展示了Hadoop和HaLoop的结果。Reduce任务的数量设置成slave结点数。失败的性能没有被量化;所有的实验结果是在没有结点失败的情况下获得的。
总体上,如图所示,对于一个迭代10次的作业,当reducer输入缓存使用时HaLoop降低了1.85倍的运行时间。我们在后面会讨论,reducer输出缓存在Hadoop与HaLoop之间创建了一个额外的gap,但在总的运行时间上并没有太大影响。我们现在就更加细节地展示这些结果。
总体运行时间.在这个实验中,我们使用SetMaxNumOfIterations,而不是fixedPointThreshold和ResultDistance,来指定循环终止条件。结果见图9a,图10a,图11a和图12a。
在PageRank算法中,每次迭代都有连接与合并两个步骤。图9a和图10a的运行时间是所有迭代的连接时间与合并时间的总和。在descendant query算法中,也有两个步骤:连接和去重。图11a图12a的运行时间是所有迭代的连接时间和去重时间的总和。
HaLoop总是比Hadoop好。Descendant query在Triples数据集上提升最好,PageRank在Livejournal和Freebase的提升效果居中,但是descendant query在Livejournal数据集的提升效果最差。Livejournal是一个高扇出和高可达性的社交网络数据集。descendant query在之后的迭代(>3)中产生了如此多的重复以至于去重开销占了绝大多数,而且HaLoop的缓存机制并没有很大程度上减少运行时间。相反,Triples数据集连接较少,因此连接步骤是主导开销,缓存是至关重要的。
连接步骤运行时间. HaLoop任务调度和reducer输入缓存潜在地减少连接步骤时间,但是没有减少descendant query去重步骤的开销,也没有减少PageRank最后合并步骤的开销。因此,为了部分解释为什么HaLoop整个作业运行时间会更短,我们比较了每次迭代的连接步骤的性能。图9b,图10b,图11b和图12b画出了每次迭代的连接时间。HaLoop的表现大大超过Hadoop。
在第一次迭代中,HaLoop比Hadoop慢,在四个图的(a)和(b)中可以看到。原因是HaLoop在第一次迭代时做了额外的工作:HaLoop缓存了在每个reducer的本地磁盘中排好序分好组的数据,创建了缓存数据的索引,并存储了索引。那就是说,在第一次迭代时,HaLoop做了与Hadoop同样的事,还写了缓存到本地磁盘。
连接步骤的代价分布.为了更好的了解HaLoop在每个阶段的提升,我们比较了在Map和Reduce阶段上的连接步骤的代价分布。图9c,图10c,图11c,和图12c展示了确定的迭代上(这里是第三次迭代)的连接步骤代价分布。衡量的是在每个阶段上的时间花费。在HaLoop和Hadoop中,reducers在第一次mapper完成之后立马开始复制数据。“Shuffle time”一般是发生在reducer开始拷贝map输出数据与对拷贝后的数据开始排序之间;shuffling与未完成的mappers是并发的。第一个完成的mapper的运行时间在两个算法里是最短的,如,1-5秒从一个64MB的HDFS块读取数据。如果我们画出第一个mapper的运行时间作为“map阶段”,时间将会很短从而难以可见地与shuffle阶段和reduce阶段进行比较。因此我们令图中的“shuffle time”变成通常的shuffle time加上第一次mapper完成的时间。图中的“reduce time”是一个reducer在shuffle阶段后花费的总时间,包括排序和分组,以及Reduce函数调用时间。注意,图中的“shuffle time”加上“reduce time”组成了我们谈到的“连接步骤”。考虑到所有四个图,我们总结HaLoop在两个阶段的表现都比Hadoop好。
“Reduce”虽然有,但在图11c中不可见。“reduce time”并不是0,只是相比较“shuffle”来说非常短。它利用了HaLoop为缓存数据创建的索引。那么∆Si和F的连接将使用索引搜索来搜索缓存F中的合格元组。而且,每次迭代会创建几乎很少的新纪录,因此F的选择性非常低。因此开销变得微不足道了。对比PageRank,索引并没有帮到太多,因为选择性很高。对于Livejournal(图12)的decendants query,在迭代>3时,索引也没有帮忙,因为选择性开始变得高了。
连接步骤的shuffle阶段上的I/O.为了看出节省了多少shuffling I/O,我们比较了每个迭代的连接步骤的shuffled数据的总数。因为HaLoop缓存了循环不变数据,可以完全避免shuffling这些数据的开销。这些节省的开销为整个性能的提升做出了重要的贡献。图9d,图10d,图11d,和图12d画出了shuffled数据的大小。平均HaLoop连接步骤shuffle的数据只有Hadoop的4%。
5.2 reducer输出缓存评估
跟reducer输入缓存的实验一样,这个实验共享相同的硬件和数据集。为了知道HaLoop的reducer输出缓存有多有效,我们比较了每个迭代的fixpoint评估的代价。因为descendant query有一个不重要的fixpoint评估,该评估要求测试文件是否为空,所以我们在Lovejournal和Freebase上运行PageRank(9.2节)。在Hadoop实现中,fixpoint评估是用一个额外的MapReduce作业实现的。与Hadoop比较,HaLoop通过利用reducer输出缓存和一个内嵌的分布式fixpoint评估减少了这个步骤平均40%的开销。图13a和b展示了每个迭代的fixpoint评估的时间花费。
5.3 Mapper输入缓存的评估
由于mapper输入缓存是为了减少slave结点间的数据传输,但我们不知道EC2虚拟机的磁盘I/O的实现,这套实验使用一个8结点的物理机集群。PageRank和descendant query不能利用mapper输入缓存因为它们的输入在迭代过程中会改变。因此,用于评估的应用是k-means聚类算法。我们使用两个现实世界的天文学数据集(多维元组):cosmo-dark(46GB)和cosmo-gas(54GB)。硬件和数据集描述详见9.6节。我们改变总的迭代次数,并在图14中画出算法的运行时间。在试验的HaLoop集群上,因为没有并发的作业,Mapper的locality rate大约为95%。通过避免非本地化数据的加载,HaLoop表现比Hadoop稍微好一点。
6 相关工作
并行数据库系统分离数据存储和并行查询工作负载来实现更好的性能。然而,它们对失败非常敏感,还不能扩展到上千个结点。对于评估递推的查询,各种优化技术已经在文献中提出了。现有工作尚未得到大规模运作。而且,大多数技术与我们的研究是正交的;我们提供了一个执行数据密集型迭代程序的低级基础。
最近,MapReduce已经成为无共享集群中大规模并行数据分析的流行替代方案。Hadoop是一个开源的MapReduce实现。MapReduce后面跟着一系列相关的系统包括Dryad,Hive,Pig,和HadoopDB等。就像Hadoop,这些系统都没有对迭代或递推类型的分析提供明确的支持和优化。
Mahout是一个在Hadoop顶层建立一套可扩展的机器学习库的项目。由于大多数机器学习算法是模型拟合应用,几乎所有的算法都要涉及迭代程序。Mahout使用一个外部驱动程序来控制循环,新的MapReduce作业在每次迭代中启动。该方法的缺点已经在第1节讨论了。类似Mahout,我们正在努力帮助迭代数据分析算法在可扩展架构上工作,但是我们在修改基本系统方面有所不同:我们将迭代功能注入到MapReduce引擎中。
Twister是一个基于流的MapReduce框架,它支持mappers和reducers长时间运行着分布式内存缓存的迭代程序。他们的建立为了避免mapper数据从磁盘重复地加载。然而,Twister的mappers和reducers之间的流结构对失败很敏感,长时间的mappers/reducers加上内存缓存对商品机器集群来说不是一个可扩展的解决方案,该集群的每个结点都有内存和资源的限制。
最后,Pregel是一个用来处理大型图像数据集的分布式系统,但是它不支持普通的迭代程序。
7 结论和未来的工作
这篇文章展示了HaLoop的设计、实现和评估,HaLoop是一个支持大规模迭代数据分析应用的并行和分布式系统。HaLoop建立在Hadoop之上,并通过一个新的编程模型和一些重要的优化来扩展它。这些优化包括(1)一个感知循环的任务调度器,(2)循环不变的数据缓存,和(3)对于有效验证fixpoint的缓存。我们在一些大型数据集和迭代查询上评估了HaLoop原型。结果现实把迭代程序支持到MapReduce引擎可以大大提高迭代数据分析应用的整体性能。在未来的工作上,我们会在HaLoop之上实现一个简化的Datalog评估引擎,以实现在声明式中编程的大规模迭代数据分析。
8 参考文献
本文翻译自《HaLoop:Efficient Iterative Data Processing on Large Clusters》,查看具体参考文献请下载该文献。
9附件
该附件介绍了HaLoop系统和示例的其他实现细节,实验设置细节和讨论。
9.1 HaLoop实现细节
我们首先提供一些另外的关于HaLoop在Hadoop上的扩展细节。
9.1.1 Hadoop背景
在Hadoop中,客户端程序必须以集中的方式或通过额外的MapReduce作业自行实现顶点评估。他们也必须决定什么时候开启新的MapReduce作业。Mahout项目通过这样的方法已经实现了多迭代的机器学习和数据挖掘算法。图15描述了一个迭代程序是如何在Hadoop中执行的。图中也展示了以下类在Hadoop系统中是如何组合在一起的。
Hadoop master node.在Hadoop中,接口TaskScheduler和类JobInProgress扮演了master结点的角色:他们接收来自slave结点的heartbeats并管理任务调度。
Hadoop slave node.类TaskTracker是每个slave结点上的守护进程。它发送heartbeats包括任务完成的信息给master结点。它从master结点收到任务执行的命令。
User-defined map and reduce functions.类MapTask和ReduceTask是用户定义Mapper和Reducer的容器。这些包装类加载、预处理并将数据传递给用户代码。一旦一个TaskTracker得到来自TaskScheduler的任务执行命令,它启动一个进程来开始MapTask或ReduceTask线程。
9.1.2 HaLoop对Hadoop的扩展
我们扩展和改进了Hadoop,如下:
Hadoop master node: loop control and new API.我们通过实现TaskScheduler和改进JobInProgress类来执行HaLoop的循环控制和任务调度。
另外,HaLoop提供扩展API来促成客户端程序,用函数来建立循环体,关联每个迭代的输入文件,指定一个循环终止条件,开启/关闭缓存,告知HaLoop循环不变的数据。JobConf类代表一个客户端作业和承载这些APIs。图16介绍API的描述。
Hadoop slave nodes: caching. 我们通过更改类MapTask,ReduceTask和TaskTracker来实现HaLoop的缓存机制。在map/reduce任务中,HaLoop在本地文件系统创建了一个目录用来存储缓存数据。该目录是在任务的工作目录下并被标记为迭代数。因此,之后想获取缓存的任务就可以知道该数据时从哪次迭代产生的。迭代作业完成后,跟该作业有关的整个缓存会被擦除。
User-defined map and reduce functions:iterations. 在Hadoop中,我们增加抽象类MapperIterative和ReducerIterative来包裹Mapper/Reducer接口。他们给用户定义的map/reduce提供了一个空的实现,并增加了新的map/reduce函数来接受两种参数,一个是普通的map/reduce函数的参数,一个是迭代关系的参数比如当前迭代数。ReduceIterative的新reduce函数也增加了另一个新的参数,该参数存储了缓存的reducer输入values,该values与key相关联。
User-defined map and reduce functions: fixpoint evaluation. HaLoop用分布式的方式评估fixpoint。一个迭代的reduce阶段结束之后,ReduceTask通过执行用户定义的距离函数来计算前后两次迭代的距离总和。然后TaskTracker传回合并后的值给JobInProgress。JobInProgress把每个TaskTracker的预合并的值求和,并把该值与fixedPointThreshold比较。如果距离小于fixedPointThreshold或当前迭代数已经是maxNumOfIterations,JobInProgress会唤醒“作业完成”事件去终止作业的执行。否则,JobInProgress会把一些任务放进任务队列去开始新的迭代。图15也介绍了HaLoop如何执行一个作业。特别是,我们可以看到TaskScheduler管理一个迭代作业执行的生命周期。
9.2 PageRank实现
详见:HaLoop:大集群上高效的迭代数据处理(下)
9.3 Descendant Query实现
详见:HaLoop:大集群上高效的迭代数据处理(下)
9.4 k-means实现
详见:HaLoop:大集群上高效的迭代数据处理(下)