Spark的下一代引擎-Project Tungsten启示录:兼Presto、impala、spark性能根本比较

时间:2022-09-10 00:50:49

感谢优酷土豆大数据应用团队

引自:http://blog.csdn.net/ytbigdata/article/details/50721174

        在过去的一年之中,我们一直在利用Spark做实时交互式分析系统方面的尝试,有兴趣的同学可以看一下我们之前分享的博客《基于Spark的用户分析系统》。我们在不断受到Spark启发的同时,也不得不忍受尚处于青春期的Spark性格中的叛逆。特别是在不断优化系统性能过程中,发现我们实际上是在做与Project Tungsten同样的工作。不知道是该庆幸选对了方向,还是该忧伤重复发明了*。尤其是在对比了Project Tungsten与我们自己的实现,心中五味杂陈。不过也正是由于重复发明*的过程,也让我们对Project Tungsten有了自己的理解,所以在这里聊一下Project Tungsten背后的黑科技。

1. Project Tungsten的野心

        如果你对ProjectTungsten还比较陌生,建议大家看一下这篇博客《Project Tungsten: Bringing Spark Closer to Bare Metal》。在这篇博客中,作者指出ProjectTungsten是为了大幅提升Spark应用使用CPU和Memory的效率,让Spark的性能接近硬件的极限。听着就振奋人心,却不禁引起大家对于Spark性能的疑惑,难道我们看到的比hadoop快几十到几百倍的效率还不是性能的极限吗?答案是远远没到,实际上目前Spark享受的福利还仅仅是将数据放在了内存中,相对于很多其他的框架例如Apache Drill以及Lucene,Spark在调优方面做得工作只能说是刚刚开始。说了这么多,我们首先来分析一下Project Tungsten在捣鼓些什么:

  • 内存管理与二进制处理(Memory Management and Binary Processing)
  • Cache-aware计算(Cache-awareComputation)
  • 代码生成(Code Generation)

看着这三个方面,你可能有很多的疑惑,这完全没有一个主线啊,DataBricks真是不按常理出牌!可是如果你曾经试图利用Java写一个数据库,或者其他数据密集型的应用时,你会发现这三个方向几乎是你必然的选择。

1.1 内存管理与二进制处理

        首先聊一下内存管理以及二进制处理,在相当多的场景中IO是我们程序永恒的瓶颈,我们总是试图做批处理,基于列存储,分区甚至是倒排索引,这一切的努力都是在解决磁盘的IO瓶颈。但是如果数据完全放入了内存之后,我们面临的新问题是什么呢?CPU不够用。其实我们在做实时交互式分析系统的时候就发现了这个问题,我们从来没有如此希望增加CPU的核数,但是集群告诉我们CPU的核数是有限的。那么问题出现在哪儿?其实是我们没有把CPU的资源用在刀刃上。以普通的DDR 3 1666MHz的为例,理论值能够达到10GB/s的读取能力,实际值大概在5GB/s附近,相对于普通机械硬盘30MB/s的连续读取能力,我们可以想象一下,相对于从磁盘读取,从内存直接读取数据对CPU的计算能力诉求有多大。如果再考虑多通道的问题,这个数据量将会按倍数增加。面对这么多的数据,我们可怜的CPU要做哪些工作呢?第一个繁重的工作就是序列化与反序列化,以Java为例这个过程说白了就是完成一堆的对象和一堆二级制数据之间的相互转化。那么为什么Spark需要序列化和反序列化呢,原因很简单就是为了沟通,为了能够将数据从一个实例搬到另外一个实例,这是不是让你想到shuffle的过程。第二个繁重的工作是创建对象,你可能会反驳JVM new一个对象是多么得有效率,而实际上当数据像潮水般涌来,让CPU把他们都包装成一个个的对象写回到内存中,这个创建的时间就不能忽略了。不仅如此,创建过多的对象带来的是大量的内存消耗和GC时间,我们都知道GC也是要消耗CPU时间片的。有时候创建了对象还不是一切的终点,我们需要构建一个有效的数据结构,比如HashMap或者HashSet,构建这些数据结构的时候,大量的比较或者hash值计算都是CPU的天敌。所以大家可能会体会到RDD的join操作是多么得让人着急。第三个工作可能是不停地压缩数据和解压缩数据,如果选择了对序列化后的数据进行压缩编码处理,一旦遇到这种密集型的计算诉求,CPU就会成为绝对的性能瓶颈。

1.2 Cache-aware

        Cache-aware并不是一个新的概念,在上个世纪90年代的时候就有学者在这些方面做过很多的研究,有兴趣的朋友可能看看Cache-aware(《An Overview of Cache Optimization Techniques and Cache{Aware Numerical Algorithms》)以及Cache-oblivious(《Cache-Oblivious Algorithms and Data Structures》)相关的算法方面的研究。单纯从名字就可以看出,这个Cache-aware就是要让大家牢记CPU是有一级和二级缓存的,CPU在读取内存数据的时候不是一个字节一个字节地读取的,而是按照cache line进行读取,也就是我们印象中CPU依次读取8个字节。更进一步我们是不是还朦胧的记得现代CPU的向量计算,也就是SIMD Programming,有兴趣的同学可以看一下《Basics of SIMD Programming》。如果每次CPU都是在随机的地址读取数据,由于CPU的频率比内存的频率高很多,就会造成CPU长时间处于等待的状态。那么我们的Spark是不是都在利用这些特性呢?事实上Spark是构建在JVM上的,JVM基本上都不理会以上介绍的这些CPU技能(当然JIT背后的黑魔法有可能会应用上这些特性,笔者确实没有深入研究过),在一些特殊情况下,比如一个String类型的数据进行排序的时候,实际上首先传入CPU的是对象的引用,在进行比较是需要重新找到对应的字节,这是一个非常耗费CPU资源的操作,产生的cache miss会造成程序慢上数倍。此外由于GC的存在,即使是一开始连续分配的内存,也有可能在随后GC过程中被彻底打散,造成CPU的随机访问内存。如果你需要强行用上CPU这些奥义,那么你就需要打怪升级了。

1.3 代码生成

        这是一个很有意思的领域,作为一个程序员如果了解了解释器、执行计划以及代码生成,你就会发现你下一步想要做的事就是git clone一下mysql的源代码了,高阶的同学们就会考虑写一个自己喜欢的语言了。实际上如果你阅读了spark-catalyst以及DataFrame的实现之后,你会发现他们就是在搞一个内存版的基于列数据库。回到优化这个主题上来,为什么要进行代码生成,不知道各位有没有写过一个sql解析的程序,就是那个典型的visitor模式。你会发现在进行了AST语义解析之后你会生成一个简单的逻辑执行计划,而这个执行计划其实就是一个嵌套了各类操作的方法。实际上这个方法是可以进行直接运行的,但是你会发现这个代码慢的有点惊人。举一个简单的例子,如果用AST表达a+b/c+d*e,你会得到一个三层嵌套的逻辑树,在完成赋值操作(ValueNode)后,首先计算b/c(BinaryNode)、d*e(BinaryNode),然后计算a+(b/c)(BinaryNode),然后再计算(a+b/c)+d*e(BinaryNode),逻辑看着都费劲,这种pop和push栈的操作肯定也是快不起来。有没有一种方法能够生成一个更加高效的执行代码呢?答案就是在优化阶段生成一个以下的函数:

[java] view plain copy Spark的下一代引擎-Project Tungsten启示录:兼Presto、impala、spark性能根本比较 Spark的下一代引擎-Project Tungsten启示录:兼Presto、impala、spark性能根本比较
  1. public int fun(int a,int b, int c ,intd,int e){  
  2.       return a+b/c+d*e;  
  3. }  

        这个代码里面把那些烦人的BinaryNode嵌套替换成了普通的代码,然后利用编译器编译一下上面的代码,这时候你就会发现一切都变得美好了。道理就是这么简单,但是实现过程中可能需要处理SQL协议,执行计划优化等问题,关于SQL解析大家可以看一下《SQL解析过程详解》。生成动态代码用什么编译器会比较快一点,比如大家耳熟能详的LLVM(C++)/JavaCompiler(Java)/Janino(Java)。不少框架为了统一接口,采用JSON的交互方式,比如ElasticSearch,但实际上原理都是一样的。Spark使用了Janino作为代码生成的默认Compiler,其实在编译器上也没有太多的选择余地。

        介绍完Project Tungsten的三个方向之后,你会发现一条清晰的主线,那就是构造一个更加有效地分析引擎,而且这个分析引擎的大部分优化灵感都来自于关系数据库。与此同时我们体会到Spark的无奈,Project Tungsten其实是为了Spark选择Scala而买单。

2. Spark与JVM的缘起缘灭

    在讲ProjectTungsten之前其实应该聊聊Spark的实现,众所周知,Spark选择了Scala作为实现语言。与其说选择Scala是Spark的败笔,还不如说是一种妥协。从语言性质上说,Scala是一个非常易于使用的语言,特别是拥有.map().filter().reduce()这样优雅的接口,当你利用Spark写一个mapreduce程序的时候,你会觉得这才是我想要的语言。但是如果你在阅读akka的源代码时,你可能就会因为晦涩的代码而叫苦不迭。因此Scala是一个为API设计的语言,与此同时为了能够复用Java的开源组件,Scala又选择了JVM,于是Spark的调优就与JVM结下了不解之缘。其实选择JVM并不是一个坏选择,在多数情况下程序会运行地很好。但是当Spark遇到shuffle以及DataFrame时,这就是JVM不太擅长的领域了。FULL GC的长时间停顿将会严重影响一个SQL的执行时间,如果这个系统又是一个分布式的系统,你无法控制让所有的实例都保持相同的GC频率,这个时候根据木桶定律,这个SQL的执行时间取决于运行最慢的那个节点。实际上每一次执行过程中都需要面对GC的问题,根据我们的经验,有时候在进行大规模的数据聚合操作(aggregation)时,GC的时间甚至要比执行的时间多2到3倍。这个时候我们会发现在利用Spark做一个秒级响应的分布式交互分析系统是多么地困难。当然为了解决最短的一块木板的问题,我们可以采取speculation的方式缓解,然而这种方式没有触动一个核心的问题:我们能不能控制GC的回收策略?比如我们需要复用的数据是不需要GC的,临时生成的对象是不需要放到Old Generation的,我们自己或者是Spark比JVM更加了解数据的特性。事实上JVM没有提供方便的接口来实现这些,为了能够稍微控制一下GC的命脉,唯一的救命稻草就是sun.misc.Unsafe。据说Oracle正在讨论在Java 9 中删除Unsafe类,不知道Oracle的这种执着会造成大量的框架开发者投向Golang的怀抱。实际上在Project Tungsten之前Spark已经利用Netty改造了网络层传输问题,而Netty用了多少unsafe的代码,各位同学可以自己研究一下。以上就是JVM与Spark的前世今生,不仅如此,RDD有很多的transform操作比如map(),filter(), join()实际上是在重新生成一个容器,可想而知当数据量非常大的时候,JVM要创建多少的对象,同时这些对象又有多少会进入到Old Generation。有的同学可能会想到复用对象,那么复用对象是不是最终的解决方法呢?答案是看情况,比如你只是做一个SQL的查询操作,复用是一个非常好的思路;如果你是在做一个分类器训练的时候,也许transform后的数据会成为下面几步迭代的输入数据,生成一个新的Array并且cache()一下会让一切变得更快。

3. Project Tungsten的葵花宝典

    这一回我们要回到枯燥的代码上来了,这一章还是尽量多讲大道理,少摆代码,毕竟代码都在那儿,大家有时间可以自己看一遍。道理讲明白了,解决方案也都放在哪里,剩下的事情就变得相对简单了。我们可以来看一下在过去的一年中Project Tungsten针对这些问题修炼了哪些神功。

3.1 二进制处理

       ok,我们首先翻开ProjectTungsten的第一章,欲练此功,必先自宫。这里是不是要抛弃JVM呢?当然如果你有的是时间,并且能够说服所有的Java程序员接受delete,我觉得放弃JVM是美好的。如果无法放弃,我建议大家首先看一下spark-unsafe,你会发现所有的黑科技都在这个工程中。这个工程还只是一个婴儿,前后加起来也不到20个类,而且逻辑都非常清晰,封装一个好用Unsafe工具类Platform,然后抽象了一个MemoryBlock的管理连续内存的类,剩下的就是管理连续内存以及基于连续内存封装的一系列数据结构。前面已经提到了CPU加载数据是以cache line为单位的,那么什么样结构的数据是CPU友好类型的呢?答案就是连续内存,不管是堆上连续内存还是直接连续内存。其实说到这里就非常有意思了,如果需要应用连续内存,我们就需要按照byte来管理我们的数据,仿佛一夜又回到了c的时代。虽然操作二进制数据不太方便,但是带来的好处是非常明显的:可以愉快地拥抱序列化和反序列化了,也可以用上CPU一次加载8个字节的技能了(cache-aware算法),甚至可以用上SIMD向量计算了。不仅如此,你会发现压缩算法比例也会有提升,同时如果用上直接内存(Direct Memory),GC的时间也会跟着减少。剩下来的问题是怎么从连续内存中读取数据?相信大家对于mysql的表的定义非常熟悉,就是为每一个数据的字段设定一个元数据,这样是int类型就读4个字节,是long类型就读8个字节,是String类型的数据可以先记录一个地址和偏移量等。这也是为什么你在加载DataFrame数据的时候需要提供一个json格式元数据定义。笔者测试过,利用unsafe读直接内存的效率与直接操作读byte[]数组的效率相差无几,但是利用unsafe的setLong()来写数据是直接写byte[]速度的8倍。

       Project Tungsten在内存管理上也是有值得借鉴的地方,比如MemoryBlock就是对于连续内存块的封装,记录了字节长度以及引用的位置。一个数据集就可以由List<MemoryBlock>来记录内存page,通过TaskMemoryManager的encodePageNumberAndOffset方法来编码内存地址,是不是有点类似Linux管理内存的方式呢?利用MemoryBlock甚至可以写一个CPU友好的Map,例如org.apache.spark.unsafe.map.BytesToBytesMap。Java中的HashMap实际上是由一个数组和链表实现的,然而这种实现将内存数据彻底打散,CPU的运行效率自然无法跟上。Databricks想到利用一个连续内存记录hash值以及内存地址,同时将key和value都记录到MemoryBlock中,这样构建的Map虽然不具有通用的功能,但是在有些操作比如大量数据aggregation操作时,效率是非常出色的,同时加上使用直接内存避开了GC,想想都有点小激动。

 3.2 Cache-aware算法

        我们可以在spark-core的org.apache.spark.util.collection.unsafe.sort包找到ProjectTungsten在Cache-aware方面做出的努力。例如RecordPointerAndKeyPrefix和UnsafeInMemorySorter这两个类,RecordPointerAndKeyPrefix实际上存储了一个long类型的引用和一个long类型的key prefix,这个类的对象实际上是复用的,一般是从一个连续内存中取出16个字节,前8个字节是key value在连续内存中的地址,而后8个自己是自定义的keyprefix,然后赋值给这个对象。这样就可以利用key prefix做一个初步的比较操作了,而不用再去随机查询pointer对应的实际key value再来进行比较。当然如果发生key prefix相同,就需要比较真正的value值了。

[java] view plain copy Spark的下一代引擎-Project Tungsten启示录:兼Presto、impala、spark性能根本比较 Spark的下一代引擎-Project Tungsten启示录:兼Presto、impala、spark性能根本比较
  1. @Override  
  2. public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {  
  3.   final int prefixComparisonResult = prefixComparator.compare(r1.keyPrefix, r2.keyPrefix);  
  4.   if (prefixComparisonResult == 0) {  
  5.     final Object baseObject1 = memoryManager.getPage(r1.recordPointer);  
  6.     final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + 4// skip length  
  7.     final Object baseObject2 = memoryManager.getPage(r2.recordPointer);  
  8.     final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + 4// skip length  
  9.     return recordComparator.compare(baseObject1, baseOffset1, baseObject2, baseOffset2);  
  10.   } else {  
  11.     return prefixComparisonResult;  
  12.   }  
  13. }  

        这是一个非常巧妙的设计,对比一下String对象的排序,随机查询内存的几率大大减少,又由于所有的key都是存储在连续内存中的,可以大大加速排序的过程。循着这个思路,或者一些经典的Cache-aware的算法,利用Unsafe对于连续内存的操作,可以极大地提升系统的性能,相信Spark将会在这方面做更多的努力。

3.3 Code Generation

    其实如果抛开解释器、逻辑执行计划、物理执行计划以及优化器来讨论代码生成是一个管中窥豹的行为,因为能够用上代码生成的地方一般都是在构造一个查询或者分析引擎的时候。当然如果你是在写一个类似于Spring、Hibernate或者JUnit的框架,你也会发现代码生成也是非常有用的工具。其实代码生成一般都用在代码不好写或者改写复杂的情况下,例如在有一次面试的过程中,就遇到一位候选者利用ASM的动态代码生成完成自动配置的工作,这是非常有趣的一个方向。说了这么多还是希望大家仔细阅读一下spark-catalyst的所有源代码。

    动态代码生成其实网上资料非常多,介绍JavaCompiler、Janino或者是ASM框架使用和Benchmark的文章比比皆是。这里就简单的说说我们在实际应用过程中的体会,JavaCompiler实际上是需要通过一个源代码的字节流来进行编译代码的,生成的class文件会存储到本地磁盘上,然后如果需要使用这个类构建对象或者其中的静态方法,就需要通过ClassLoader来加载字节码,进而就可以利用这个动态生成的类创建对象了。Janino有一个自己的编译器,同时也可以配置成JavaCompiler,提供的结构非常方便,但是相对于JavaCompiler,Janino自身的编译器编译出来的代码运行效率会慢10倍。ASM则是通过直接写字节码来动态生成,编译的速度非常快,但是动态代码的效率与Janino的效率差不多,运行速度都比较慢,特别是当这个动态代码需要运行上亿次,差距非常明显。如果你的代码只是运行一遍,我觉得ASM极快的编译速度会让它成为最好的选择。

4. 总结

        看了上面的介绍,相信各类看官对于Project Tungsten或多或少都有了一定的了解,Project Tungsten其实不是一个非常神秘的计划,其实它的存在是选择JVM的数据引擎必然的优化之路。与其说Project Tungsten将铸就下一次Spark数据引擎的基石,还不如说是长叹一声出身的无奈。Project Tungsten可以说是一个非常简单,同时也还没有成熟的实现方案,对于广大的Java、Scala程序员来说是宝贵的学习材料和案例。希望这篇博客能够帮助大家选择适合自身项目的优化方向,欲知下事如何,且听下回分解。