2019-04-17 蔡元楠
我有幸几次与来 Google 参观的同行进行交流,当谈起数据处理技术时,他们总是试图打探 MapReduce 方面的经验。
这一点让我颇感惊讶,因为在硅谷,早已没有人去谈论 MapReduce 了。
今天,我们就来聊聊为什么 MapReduce 会被硅谷一线公司淘汰。
我们先来沿着时间线看一下超大规模数据处理的重要技术以及它们产生的年代。
我认为可以把超大规模数据处理的技术发展分为三个阶段:石器时代,青铜时代,蒸汽机时代。
石器时代
我用“石器时代”来比喻 MapReduce 诞生之前的时期。
数据的大规模处理问题早已存在。早在 2003 年的时候,Google 就已经面对大于 600 亿的搜索量
。
但是数据的大规模处理技术还处在彷徨阶段。当时每个公司或者个人可能都有自己的一套工具处理数据。却没有提炼抽象出一个系统的方法。
青铜时代
2003 年,MapReduce 的诞生标志了超大规模数据处理的第一次革命,而开创这段青铜时代的就是下面这篇论文《MapReduce: Simplified Data Processing on Large Clusters》。
杰夫(Jeff Dean)和桑杰(Sanjay Ghemawat)从纷繁复杂的业务逻辑中,为我们抽象出了 Map 和 Reduce 这样足够通用的编程模型
。后面的 Hadoop 仅仅是对于 GFS、BigTable、MapReduce 的依葫芦画瓢,我这里不再赘述。
蒸汽机时代
到了 2014 年左右,Google 内部已经几乎没人写新的 MapReduce 了
。
2016 年开始,Google 在新员工的培训中把 MapReduce 替换成了内部称为 FlumeJava(不要和 Apache Flume 混淆,是两个技术)的数据处理技术
。
这标志着青铜时代的终结,同时也标志着蒸汽机时代的开始。
我跳过“铁器时代”之类的描述,是因为只有工业革命的概念才能解释从 MapReduce 进化到 FlumeJava 的划时代意义
。
Google 内部的 FlumeJava 和它后来的开源版本 Apache Beam
所引进的统一的编程模式,将在后面的章节中为你深入解析。
为什么 MapReduce 会被取代
现在你可能有一个疑问 :为什么 MapReduce 会被取代?今天我将重点为你解答。
高昂的维护成本
使用 MapReduce,你需要严格地遵循分步的 Map 和 Reduce 步骤。当你构造更为复杂的处理架构时,往往需要协调多个 Map 和多个 Reduce 任务
。
然而,每一步的 MapReduce 都有可能出错
。
为了这些异常处理,很多人开始设计自己的协调系统(orchestration)。例如,做一个状态机(state machine)协调多个 MapReduce,这大大增加了整个系统的复杂度。
如果你搜 “MapReduce orchestration” 这样的关键词,就会发现有很多书,整整一本都在写怎样协调 MapReduce。
你可能会惊讶于 MapReduce 的复杂度
。我也经常会看到一些把 MapReduce 说得过度简单的误导性文章。
例如,“把海量的××数据通过 MapReduce 导入大数据系统学习,就能产生××人工智能”。似乎写文的“专家”动动嘴就能点石成金。而现实的 MapReduce 系统的复杂度是超过了“伪专家”的认知范围的。下面我来举个例子,告诉你 MapReduce 有多复杂。
想象一下这个情景,你的公司要预测美团的股价,其中一个重要特征是活跃在街头的美团外卖电动车数量,而你负责处理所有美团外卖电动车的图片。
在真实的商用环境下,为了解决这个问题,你可能至少需要 10 个 MapReduce 任务:
首先,我们需要搜集每日的外卖电动车图片。
数据的搜集往往不全部是公司独自完成,许多公司会选择部分外包或者众包。所以在数据搜集(Data collection)部分,你至少需要 4 个 MapReduce 任务:数据导入(data ingestion)
:用来把散落的照片(比如众包公司上传到网盘的照片)下载到你的存储系统。数据统一化(data normalization)
:用来把不同外包公司提供过来的各式各样的照片进行格式统一。数据压缩(compression)
:你需要在质量可接受的范围内保持最小的存储资源消耗 。数据备份(backup)
:大规模的数据处理系统我们都需要一定的数据冗余来降低风险。
仅仅是做完数据搜集这一步,离真正的业务应用还差得远。
真实的世界是如此不完美,我们需要一部分数据质量控制(quality control)
流程,比如:数据时间有效性验证 (date validation)
:检测上传的图片是否是你想要的日期的。照片对焦检测(focus detection)
:你需要筛选掉那些因对焦不准而无法使用的照片。
最后才到你负责的重头戏——找到这些图片里的外卖电动车。而这一步因为人工的介入是最难控制时间的。你需要做 4 步:
数据标注问题上传(question uploading):上传你的标注工具,让你的标注者开始工作。
标注结果下载(answer downloading):抓取标注完的数据。
标注异议整合(adjudication):标注异议经常发生,比如一个标注者认为是美团外卖电动车,另一个标注者认为是京东快递电动车。
标注结果结构化(structuralization): 要让标注结果可用,你需要把可能非结构化的标注结果转化成你的存储系统接受的结构。
这里我不再深入每个 MapReduce 任务的技术细节,因为本章的重点仅仅是理解 MapReduce 的复杂度。
通过这个案例,我想要阐述的观点是,因为真实的商业 MapReduce 场景极端复杂,像上面这样 10 个子任务的 MapReduce 系统在硅谷一线公司司空见惯。
在应用过程中,每一个 MapReduce 任务都有可能出错,都需要重试和异常处理的机制。所以,协调这些子 MapReduce 的任务往往需要和业务逻辑紧密耦合的状态机
。
这样过于复杂的维护让系统开发者苦不堪言。
时间性能“达不到”用户的期待
除了高昂的维护成本,MapReduce 的时间性能也是个棘手的问题。
MapReduce 是一套如此精巧复杂的系统,如果使用得当,它是青龙偃月刀,如果使用不当,它就是一堆废铁。不幸的是并不是每个人都是关羽
。
在实际的工作中,不是每个人都对 MapReduce 细微的配置细节了如指掌。
在现实中,业务往往需求一个刚毕业的新手在 3 个月内上线一套数据处理系统
,而他很可能从来没有用过 MapReduce。这种情况下开发的系统是很难发挥好 MapReduce 的性能的。
你一定想问,MapReduce 的性能优化配置
究竟复杂在哪里呢?
我想 Google 500 多页的 MapReduce 性能优化手册足够说明它的复杂度了。这里我举例讲讲 MapReduce 的分片(sharding)难题,希望能窥斑见豹,引发大家的思考。
Google 曾经在 2007 年到 2012 年间做过一个对于 1PB 数据的大规模排序实验,来测试 MapReduce 的性能。
从 2007 年的排序时间 12 小时,到 2012 年的排序时间缩短至 0.5 小时。即使是 Google,也花了 5 年的时间才不断优化了一个 MapReduce 流程的效率。
2011 年,他们在 Google Research 的博客上公布了初步的成果。
其中有一个重要的发现,就是他们在 MapReduce 的性能配置上花了非常多的时间。包括了缓冲大小 (buffer size),分片多少(number of shards),预抓取策略(prefetch),缓存大小(cache size)等等。
所谓的分片,是指把大规模的的数据分配给不同的机器 / 工人,流程如下图所示。
选择一个好的分片函数(sharding function)
为何格外重要?让我们来看一个例子。
假如你在处理 Facebook 的所有用户数据,你选择了按照用户的年龄作为分片函数(sharding function)。我们来看看这时候会发生什么。
因为用户的年龄分布不均衡
(假如在 20~30 这个年龄段的 Facebook 用户最多),导致我们在下图中 worker C 上分配到的任务远大于别的机器上的任务量。
这时候就会发生掉队者问题(stragglers)。别的机器都完成了 Reduce 阶段,只有 worker C 还在工作。
当然它也有改进方法。掉队者问题可以通过 MapReduce 的性能剖析(profiling)发现。 如下图所示,箭头处就是掉队的机器。
图片引用:Chen, Qi, Cheng Liu, and Zhen Xiao. “Improving MapReduce performance using smart speculative execution strategy.” IEEE Transactions on Computers 63.4 (2014): 954-967.
回到刚刚的 Google 大规模排序实验。
因为 MapReduce 的分片配置异常复杂,在 2008 年以后,Google 改进了 MapReduce 的分片功能,引进了动态分片技术 (dynamic sharding),大大简化了使用者对于分片的手工调整。
在这之后,包括动态分片技术在内的各种崭新思想被逐渐引进,奠定了下一代大规模数据处理技术的雏型。
小结
这一讲中,我们分析了两个 MapReduce 之所以被硅谷一线公司淘汰的“致命伤”
:高昂的维护成本
和达不到用户期待的时间性能
。
文中也提到了下一代数据处理技术雏型。这就是 2008 年左右在 Google 西雅图研发中心诞生的 FlumeJava,它一举解决了上面 MapReduce 的短板。
另外,它还带来了一些别的优点:更好的可测试性;更好的可监控性;从 1 条数据到 1 亿条数据无缝扩展,不需要修改一行代码,等等。
思考题
如果你在 Facebook 负责处理例子中的用户数据,你会选择什么分片函数,来保证均匀分布的数据分片?
相关讨论:
Codelife
我们最早采用的是哈希算法,后来发现增删节点泰麻烦,改为带虚拟节点的一致性哈希环开处理,稍微复杂点,但是性能还好
作者回复: 谢谢你的答案!应该是一个很有经验的高级工程师了吧。使用Consistent hashing是可以很好地解决平均分配和当机器增减后重新hashing的问题。
maye :
个人愚见:虽然MapReduce引擎存在性能和维护成本上的问题,但是由于Hive的封装使其适用性很广泛,学习成本很低,但是实际使用过程中和Spark等相比性能差太多了。不过对于计算引擎模型的理解方面,MapReduce还是一个很经典的入门模型,对于未来迁移到其他计算引擎也是有很大帮助的。
还有一个个人问题:不知道蔡老师对于流计算和批处理的关系是怎么看待的?流计算有可能完全取代批处理么?
关于思考题:问题的核心店在于Reducer Key是否倾斜,个人认为可以按照update_time之类的时间字段进行分片处理。
作者回复: 你好Maye,谢谢你的留言与提问!
第一问我也说说我的愚见吧。关于流处理和批处理的关系
我更倾向于批处理可以算是流处理的一个子集吧。我们可以这么抽象地看,流计算所处理的都是无限数据集,而我们从中按照时间窗口抽取一小段出来的话,这一小段有边界的数据集其实也就是批处理所处理的数据集了
。所以说批处理算是流处理的一个子集吧。
但是现在流计算中两大问题
:
1)Exactly once delivery 2)message order,还没有非常完美的解决方案,但是我相信可以攻克的。所以未来趋势还是趋于统一
。现在Google所推出的Apache Beam项目其实也是想解决这样一个问题,统一批处理和流处理的编程接口。更详细的内容我会在后面的章节展开讲解。
思考题你也看到了问题的本质,就是能找到趋于平均分配的分片处理方式。
王伟 :
你好!我工作中遇到这样的场景:会员在我们平台注册,信息会保存在对应商家的商家库中,现在需要将商家库中的信息实时的同步到另一台服务的会员库中,商家库是按照商家编号分库,而且商家库和会员库没在同一台服务器部署。想请教一下,像这种我们如何做到实时同步?
作者回复: 你好王伟!首先感谢你的提问!
我不确定你所说的实时同步是想表达Eventual Consistency还是Strong Consistency,那我就争对两个都说说自己的愚见吧。
因为会员信息都会保存在商家库中,所以这里我假设商家库的信息可以作为source of truth。
如果你指的是Eventual Consistency的话,可以在会员更新商家库的同时将会员信息利用Pub/Sub发送给会员库去更新。考虑到Pub/Sub中间有可能会丢包,我们可以再建立一个定时任务每隔一段时间将全部商家库中的信息扫描一遍再更新到会员库中。当然具体的实现可以再作优化,因为商家库是按商家编号分库的,我们可以记录下哪些商家编号的表最近有更新我们就只扫描那些表,而不用扫描全局的表。
如果你指的是Strong Consistency的话,我们可以在中间再创建一个State Machine,记录是否两个库都同时更新了。在读取会员信息的时候,我们需要查询这个State Machine,只有当两个库同时都更新的时候才将会员信息返回。根据第九讲的CAP理论,这样的做法其实会牺牲掉Availability,也就是你的服务可用性。
当然具体的需求你会比我更了解,所以相信你能够从中做出设计上的取舍。也欢迎你继续留言提问,我们可以一起讨论学习进步!
cricket198…
如果不需要按某些字段做聚合分析,只是普通数据处理的话,直接用Round Robin分片即可。我想了解什么是“动态分片”技术?即使不用MR,其他大数据处理框架也需要用到“分片”,毕竟大数据的处理是“分而治之”,如何分才能分得好是关键。日常工作中经常遇到数据倾斜问题
,也是由于分片不合理导致的
。如果对于待处理的数据你了解到好办,知道用哪些字段作分片最合适,但如果遇到不熟悉的数据你又该如何分片?而不是等到出现数据倾斜问题的时候才发现,进行分片修改再重跑呢?谢谢老师指教!
作者回复: Round robin确实能保证均匀但是有个很大的问题是没有容错。因为在分布式处理的时候数据处理顺序是“随机”的,可能是shard 1/2/3也可能是 shard 1/3/2,如果发现shard 2所有任务挂了(机器坏了)需要重试,如果有确定的sharding function很容易找出shard 2的任务,round robin的话就无法还原shard 2任务了。当然你可以说我再搞个数据库把round robin结果保存,但那样就更复杂了。
Keanu
我会考虑使用用户邮箱名首位的字母或数字进行分片。
作者回复: 谢谢你的答案!如果有些首位字母或数字的使用率很高,而其它字母或数字的使用率低的话,那些使用率高的字母或数字会造成Hot Spot的问题噢。欢迎你继续留言,我们一起学习进步!
明翼
一般用户信息表都存在一个id,有的是递增的数字id,有的是类似uuid随机字符串,对于递增的直接对机器数量取余,如果是字符串通过比较均衡的hash函数操作后再对机器数量取余即可。
作者回复: 谢谢你的答案!这个答案不错。不过取余运算在机器有增减的时候会遇到麻烦,所有的用户必须重新取余运算一遍。Consistent Hashing可以很好地解决这个问题
。欢迎你继续留言,我们一起学习进步!
JensonYao
MapReduce是从纷繁复杂的业务逻辑中,为我们抽象出了 Map 和 Reduce这样足够通用的编程模型。
缺点:
1、复杂度高
当你构造更为复杂的处理架构时,往往进行任务划分,而且每一步都可能出错。而且往往比认为的复杂的多。
2、时间性能达不到用户要求
Google500 多页的 MapReduce 性能优化手册
1PB的排序从12小时优化到0.5小时花了5年
思考题:如果你在 Facebook 负责处理例子中的用户数据,你会选择什么分片函数,来保证均匀分布的数据分片?
由于没有过相关的经验,从网上查了下资料,常见的数据分片有
1、hash
2、consistent hash without virtual node
3、consistent hash with virtual node
4、range based
文章中使用的方法就是range based方法,缺点在于区间大小固定,但是数据量不确定,所以会导致不均匀。
其他三种方法都可以保证均匀分布的数据分片,但是节点增删导致的数据迁移成本不同。
1、hash函数节点增删时,可能需要调整散列函数函数,导致大量的数据迁移
consistent hash是将数据按照特征值映射到一个首尾相接的hash环上,同时也将节点映射到这个环上。对于数据,从数据在环上的位置开始,顺时针找到的第一个节点即为数据的存储节点
2、consistent hash without virtual node 增删的时候只会影响到hash环上响应的节点,不会发生大规模的数据迁移。但是,在增加节点的时候,只能分摊一个已存在节点的压力;同样,在其中一个节点挂掉的时候,该节点的压力也会被全部转移到下一个节点
3、consistent hash with virtual node 在实际工程中,一般会引入虚拟节点(virtual node)的概念。即不是将物理节点映射在hash换上,而是将虚拟节点映射到hash环上。虚拟节点的数目远大于物理节点,因此一个物理节点需要负责多个虚拟节点的真实存储。操作数据的时候,先通过hash环找到对应的虚拟节点,再通过虚拟节点与物理节点的映射关系找到对应的物理节点。引入虚拟节点后的一致性hash需要维护的元数据也会增加:第一,虚拟节点在hash环上的问题,且虚拟节点的数目又比较多;第二,虚拟节点与物理节点的映射关系。但带来的好处是明显的,当一个物理节点失效是,hash环上多个虚拟节点失效,对应的压力也就会发散到多个其余的虚拟节点,事实上也就是多个其余的物理节点。在增加物理节点的时候同样如此。
引用blog:http://www.cnblogs.com/xybaby/p/7076731.html
所以这样看具体采用何种方式要结合其他的因素(显示场景,成本?),如何抉择我也不是很清楚。
作者回复: 线下做了研究了很好啊。这三个看起来都可以吧。一般场景我觉得可以选择复杂度低的第一种,后面的对于普通场景可能都有点overkill。
hua168
那现在还在用MapReduce的大数据软件怎么搞了?也会被慢慢淘汰?还需要学习吗?
作者回复: 如果还没开始学,就可以直接开始学我们这里介绍的apache beam吧。如果已经开始学了,肯定也是有收获的,学习永远没有最好的“时机”,因为技术永远在发展更新啊
。就像买不到最时髦的衣服一样。还是如第一篇虽说,看一个技术要看到它怎么解决问题,学习他的思路
。
mjl
个人理解,对于已知数据分布情况的数据,我们大多数情况下能找到合适的一个分区策略对数据进行分片。但实际上这对于数据开发者来说,就需要知道整体数据的一个基本情况。而对于数据倾斜,基本分为分区策略不当导致的倾斜以及单热点key的倾斜,对于后者,无论用户设置什么分区策略,都无法对数据进行分割。
对于数据倾斜问题的话,spark 3.0版本计划合入的AE功能给出了一定的方案。对于倾斜的partition,在shuffleWrite阶段就可以统计每个map输出的各个分区信息,然后根据这些信息来调整下一个stage的并发度。进一步的话,对于两表join,一张表有存在热点key的话,可以广播另外一张表的该partition,最终与其他分区的join结果做union。基于这个思路的话,engine其实是能很灵活的处理数据倾斜类问题,而不用用户去花精力研究选择。
作者回复: 这个思路看起来是做了很多课后研究了!希望后面也能继续参与讨论!
leben kri…
MR的劣势刚好对应了Spark的优势
- 通过DAG RDD进行数据链式处理,最终只有一个job,大大降低了大数量MR的维护成本
- 优先基于内存计算的Spark相对于基于磁盘计算的MR也大幅度提高了计算性能,缩短计算时间
个人觉得,这两点可以作为MR和Spark的主要区别。
作者回复: 谢谢你的留言!我认同你的观点,MR确实在每一步都需要经过磁盘的数据读取和结果的写入,速度上也就比不上Spark了。希望后面能继续看见你留言,一起学习进步!
侵权可联系删除
本文转载于蔡源楠老师在极客时间的《大规模数据处理实战》,为收费课程的免费试读内容,
这个课程很不错,本人在这个的基础上做了自己的理解和扩展加深。