来源:DataFunTalk
导读 本文主题为 Gluten 向量化引擎,提速 Spark 两倍性能。
内容包括以下三部分:
1. Why and What is Gluten?
2. Gluten 实现原理
3. 当前进展和后续工作
分享嘉宾|张智超 Kyligence 高级大数据架构师
编辑整理|张龙春 HW
出品社区|DataFun
Why and What is Gluten?
首先介绍一下为什么发起 Gluten 这一项目,以及 Gluten 是一个什么类型的项目。
在介绍 Gluten 之前,先探讨一下当前 Spark 在计算性能上遇到的问题。
上图是 Spark 各个版本在 TPCH 下的基础算子性能图。这里抽取的是三个主要的基础算子,HashAgg、HashJoin 和 TableScan。从 Spark 的 1.6 版本以来,逐步引入了诸如钨丝计划、向量化 Parquet Reader 等一系列优化之后,整体的计算性能有两倍左右的提升。但同时我们也发现,在 3.0 版本以后,整体计算性能的提升有所减缓。那么到底是什么成为了 Spark 计算性能提升的瓶颈呢?这里我们做了一些调研。
上面这张图是 Spark 跑 TPCH 过程当中的性能监控指标。从图上可以看到内存的使用率、网卡 IO、磁盘 IO 等,这些指标时高时低,可以说明并不是瓶颈所在。最明显的就是 CPU 利用率,在整个运行过程中,CPU 利用率基本上都高达 80%~90%。显而易见 CPU 是 Spark 计算性能所面临的较大瓶颈。
Spark 是基于 JVM 的,而 JVM 只能利用到一些比较基础的 CPU 指令集。虽然有 JIT 的加持,但相比目前市面上很多的 Native 向量化计算引擎而言,性能还是有较大差距。因此考虑如何将具有高性能计算能力的 Native 向量引擎引用到 Spark 里来,从而提升 Spark 的计算性能,突破 CPU 瓶颈。接下来从两个方面来考虑如何集成向量化引擎。
一方面,Spark 经过多年发展,作为基础的计算框架,不管是在稳定性还是可扩展性方面,以及生态建设都得到了业界广泛认可。所以不考虑去改动基础框架,而是保留它原有的架构,使用 Native 向量化计算引擎替换掉 Spark 原有基于 JVM 的 Task 计算模型,就可以把高性能计算能力带给 Spark,突破 CPU 的瓶颈问题。
另一方面就是近几年来 Native SQL 向量化计算引擎层出不穷,出现多种优秀框架。这些框架大致可以分为两类,一类以产品方式发布,比如 ClickHouse、DuckDB、MongoDB。ClickHouse 使用很广泛,DuckDB 是单机下类似 SQLite 的向量化计算引擎。第二类是以 Library 方式发布,比如 Meta 公司发布的 Velox,其主旨也是想利用 Library 方式替换掉 Presto 基于 JVM 的计算模型,进而提升 Presto 计算能力。还有其他的,如 Arrow Computer Engine 也是以 Library 方式进行发布。
以上提到的几种 Native 向量化计算引擎,它们都有一些共同特点,比如都是使用 C++ 开发,就很容易利用 CPU 原生指令集的优化。另外它们都是基于列式数据格式。结合这两点,这些引擎就很容易可以去做向量化处理,进而达到高性能计算。基于这两点, Gluten 项目也就应运而生。它是一个基于 Spark 的向量化引擎中间件。会把 Spark SQL 整个执行过程当中的计算转移到向量化引擎去执行,来获得指令集的原生加速。
如上图展示,整个框架仍然使用 Spark 原有的 Master/Worker 方式去运行。因为原生 Spark 是在 Task 上做具体的计算,所以这里做了一些改动。在执行 Pipeline 的时候,会先做一个选择,如果 Pipeline 里的 Operator 或 Expression 是 Native 引擎支持的情况,就会交由 Gluten,然后通过 JNI 接口去调用 Native 向量化引擎做计算,从而提升性能。如果存在未支持的 Operator 或者 Expression 的情况,就会做 fallback,让它回到 Spark 原生的 JVM 引擎去执行。
从上图我们也可以看到,目前 Gluten 支持 ClickHouse、Velox、Apache Arrow Computer Engine 这几个 Native 向量化引擎。这里也体现了 Gluten 名字来源的意义,因为 Gluten 在拉丁语里是粘合剂的意思,之所以取这个名称,就是想要利用各种优秀的向量化执行引擎,来把它粘合到 Spark 的整个执行体系中。进而让 Spark 提升计算性能,突破之前发现的 CPU 瓶颈问题。
02
Gluten 实现原理
接下来通过 Gluten 的组件架构、Plan 转换等方面介绍 Gluten 的实现原理。
上图是 Gluten 的整体设计。因为 Gluten 会沿用 Spark 原有的框架。当一个 SQL 进来,会通过 Spark 的 Catalyst 把 SQL 转成 Spark 的物理计划,然后物理计划会传递给Gluten。Gluten 会以 Plugin 的方式集成到 Spark 中。
在 Physical Plan 交给 Gluten Plugin 的时候,会添加一些扩展的规则,然后把 Physical Plan 转换成语言无关的 Substrait Plan。经过这个转换后再交由下面的各种 Native 向量化引擎去执行计算。各自的向量化引擎会根据 Substrait Plan 构建自己的 Execute pipeline,然后读取 Input 数据去做计算,计算完后都会以列式方式返回给 Spark。整个数据流转过程是基于 Spark 原生 Columnar batch 和 Columnar vector 抽象,为 Native 向量化引擎做了一些具体的扩展和实现。
对于 ClickHouse,因为它内部有 block 概念,就会以 block 的方式去扩展 vector。对于 Arrow Computer Engine, 由于 Apache Arrow 本身就定义了内存数据格式,会把引擎计算出来的数据转为 Arrow 格式,去实现一个叫 Arrow Columnar Vector 的格式表达 Arrow 类型的数据,从而让 Spark 能够去识别和读取。目前 Gluten 支持的是 Velox、ClickHouse、Arrow Computer Engine 这三个主要的Native引擎。针对 GPU 方面,Intel 团队之前做过一些调研和研究,计划后续在 Gluten 发展壮大之后,会在 GPU 加速方面做一些扩展。
在整个流转过程当中,Gluten 的 Plugin 层起到承上启下的关系。Gluten Plugin 有哪些组件呢?
① 最为核心的就是 Plan 的 Conversion 组件,它把 Spark Physical Plan 通过 Extension rule inject 的方式转成 Substrait Plan,然后再把 Substrait Plan 传递到底层的 Native Engine 执行。
② 第二就是 Memory 管理。我们知道 Native Engine 完全是脱离 JVM 的。如果不把Native Engine 的内存交给 Spark 统一来管理的话,就很可能出现内存溢出或者直接打爆整台机器内存的情况,所以 Memory 管理也至关重要。
③ 第三就是 Shuffle,我们知道 Shuffle 是整个执行过程中比较重的 Operator,而且 Spark 原生的 Shuffle 是基于 Row 的。所以扩展出了一个叫 Columnar Shuffle Manager 的对象,支持整个 Shuffle 过程当中的列式数据。
④ 第四 Shim Layer 组件,熟悉 Spark 就应该知道,在 Spark 支持 Hive 时,也通过 Shim Layer 方式去支持多版本的 Hive。这里的 Shim Layer 也是为了让 Gluten 能支持多个版本 的Spark。因为 Spark 对外公开的接口,在版本之间变化不会那么大,但内部接口变化还是比较大。所以如果要支持多版本 Spark 的话,就需要通过 Shim Layer 来适配多版本。
⑤ 第五个是 Fallback 组件,这是一个当前比较重要的组件。我们知道 Spark 经过这么多年的发展,目前支持的 Operator 和 Expression 很多,而 Gluten 在发展初期,不可能把所有的 Operator 或 Expression 都支持。当遇到 Native Engine 不支持的情况时,会先通过 fallback 机制做验证,验证完之后如果不支持,就会回退到 Spark 原生 JVM 引擎去执行。
⑥ 第六个是 Metrics,它把 Native Engine 执行过程中的指标统计上报给 Gluten Plugin,然后再由 Plugin上报给 Spark 的 Metrics System 做展示或 API 调用。接下面针对这些组件逐一做简单介绍。
2. 执行计划
如上图,整个 Gluten 的核心重点是执行计划的传递。使用 Substrait 项目来作为 Spark Physical Plan 到 Native 向量化引擎传递的载体。Substrait 是与语言无关的,使用 proto 协议进行表达。在 Spark 经过 Catalyst 解析,规则优化后会输出 Physical Plan,通过 Gluten 一系列 Extension Rules Inject 方式插入规则,这些规则主要是对 Spark Physical Plan 逐一转换成 Substrait Plan。转换过程会涉及到 fallback 机制校验。转换后的 pipeline 会尽可能构造在一个 whole stage 内,然后一次性地调用 Native Engine 执行。Gluten 使用 Java 方式去输出 Substrait Plan 后,底层 Native 向量化引擎,比如 C++ 实现的引擎,会用 C++ 去解析这个 Substrait Plan,解析完后根据这个 Plan 去构造各自的 Execute pipeline,比如 scan、filter、aggregate 等,然后执行,执行完返回数据给 Spark。所以 Substrait 是一个比较重要的切入点。
3. Fallback
如上图,描述的是 fallback 功能的实现, 实现 fallback 功能主要是因为 Spark 已经支持了十几种 operator 和上百种 expression,Gluten 在初期不可能很快把这些 operator 及 expression 全部都支持。在整个 pipeline 的转换过程中,会对每一个 Spark 的 Physical Plan 去做 Validate,Validate 成功说明里面包含的 operator 和 expression 都是支持使用 Native 向量化引擎执行。那么就会尽可能打包成一个 Whole Stage 的 Transformer。这里的 Whole Stage Transformer 可以对标 Spark 的 Whole Stage code gen。当校验到某个 operator 或者 expression 失败后,说明 Native Engine 是无法支持当前的 operator 或者 expression,就会触发 fallback 机制。因为 Spark 的计算是基于行式的,而 Native 向量化引擎是基于列式的。因此就会在 fallback 这个 operator 前后插入行转列、列转行的算子。Spark 执行完后行转列,后继交给 Native Engine 执行。当然要尽可能避免这种 fallback 操作,因为这两次转换对整个执行的过程的性能损耗是很大的。
这里做过一些研究,尝试在整个 Whole Stage 执行过程中,如果发现有一次回退,那么后面的 operator 和 expression 即使 Native Engine 支持,也不再做转换,减少频繁的行列转换。通过对每一个 Whole Stage 都做这样的限制,尽可能减少 fallback,以及插入行转列/列转行的操作。当然最终的目标还是尽可能让 Native Engine 支持 Spark 所有的 operator 和 expression,减少 fallback。
上面描述的 Plan 转换可能会比较抽象。这里举个简单例子,上面的图是 TPCH Q1 做 Plan 转换的过程。左边是 Vanilla Spark 的 Plan,右边是 Gluten 对标的 Plan。因为 DAG 图比较大,所以直接用这种 Plan 的方式展示。可以看出 Vanilla Spark 在 scan 完之后,由于 Parquet Reader 是向量化读取的列式数据,它就会先执行列转行,然后给后面的 filter、project 进行行式计算。交给 Gluten 来执行后,我们可以看到 Scan 之后是列式数据。因为向量化执行引擎本身也是全部基于列式,这里就不需要再做一次列转行的操作。接下来就是执行和左边一一对标的 Filter Execute Transformer,Project Execute Transformer 以及 Aggregate 等一系列操作,包括列式的 Shuffle Exchange,也都是列式执行。另外 Q1 会涉及到 order by,也就是会做 sort 操作。目前 sort 算子还没有支持,可以看到会插入列转行算子进去,把列式数据转为行式。后面就全部由原生 Vanilla Spark 算子 Shuffle Exchange,sort 去执行。
4. 列数据抽象
参照上图,现在介绍整个 Gluten 的执行过程当中,涉及到的数据传递流程。在 Spark 里有 Columnar Batch 和对应的 Columnar Vector 抽象。基于这个抽象把 Native Engine 各自对应的 Native 列式数据表达做具体实现。
这里涉及两种实现方式,第一种,我们知道 Apache Arrow 是想要实现一种与语言无关,规范的内存格式。如果使用 Arrow 这种格式,那么在 Spark 这边会有一个 Arrow Columnar Vector,对标的就是 Arrow 这种内存数据格式。这样做的好处就是如果新接入一个 Native Engine,只要把数据 Export 成为 Arrow 格式,那么 Spark 上层就可以很容易地接入,然后交给 Spark 来做读取操作等。这种方式有一个不利的地方,因为每种 Native Engine 内部都会有各自的内存数据表达方式。为了统一接入,每一次都要做转换成 Arrow 格式的操作,性能上会有所损耗。
还有一种方式,就如 ClickHouse backend 的自定义实现,因为 ClickHouse 内部是 block 格式,那么基于 Spark Columnar Batch 和 Columnar Vector 去实现了 Block Columnar Vector,来代表了一个 block 数据存储。通过这种方式在 Native Engine 引擎执行完后,返回的数据可以由这个 Block Columnar Vector 持有这块数据,进而做后续的数据处理。
Velox 之前则采用的是方式一,因为性能的损耗,目前也在实现基于内部数据格式的 Velox Columnar Vector 的方式来做数据的传递。
5. Shuffle
对于 Spark 来讲比较重的操作是 shuffle。参照上图,在 Vanilla Spark 里,使用的是行式 Shuffle。但是对于向量化引擎,内部数据流转过程当中使用的是列式。因此基于 Spark Shuffle Manager 框架,扩展出 Columnar Shuffle Manager。在整个向量化引擎执行中,涉及到的 Shuffle 操作,都会通过这个 Columnar Shuffle Manager 去做列式数据 split,序列化,压缩,最后输出 Shuffle 数据到磁盘。而读取 Shuffle 数据则交由 Spark 原有的框架进行操作,再传递给 Native Engine 作后续处理。目前的 Columnar Shuffle Manager 只实现了类似于 Spark 的 Bypass Sort Shuffle 的功能,所以当 Shuffle partition 过多的时候,它会生成过多的小文件,所以性能会有问题。有做过一个调研,即直接利用 Spark 原生 Sort Shuffle Manager 的方案,只是在序列化和反序列化的时候直接使用了列式的方式输出 Shuffle 数据。这么做的优点是很容易利用 Spark 原生已经支持的 Sort Shuffle 的方式。在 Shuffle partition 达到几百甚至上千的时候,可以去规避小文件过多的问题。但是在实际测试过程中,也发现了一些问题,例如:在使用 Sort Shuffle Manager 时,整个 Shuffle 数据都会在内存里拷贝一遍。因为 Native Engine 的数据在堆外,Spark Sort Shuffle Manager 在传输过程中,会把 Native 数据拉到 JVM 里,再经由 Spark 的 Shuffle 框架做写入或者读取。这就会涉及一个 memory copy 问题,这个问题也是后面会重点解决的一个点。
6. 内存管理
上图是内存管理机制的一个流程图。Native Engine 完全脱离 JVM,为了保证整个执行过程中,不会让 Native Engine 把整个内存吃完或者是出现 OOM 情况,则基于 Spark 原生的 Unified Memory Manager 机制,集成 Spark Off-heap Memory Manager 对 Native Engine 所用的内存做管理。Spark 的 task 执行时,会使用 Task Memory Manager 对每个 task 的内存做申请释放管理。基于这套机制去实现了 Native Engine 的 memory consumer,每个 task 在调用执行 Native Engine 的时候,会通过这一套机制把 Native Engine 申请的内存上报到 Task Memory Manager,于是在 Spark 侧就可以知道 Native Engine 使用了多少内存,Executor 的内存存量是否满足要申请的内存大小。
但这里有一点与 Spark 原生的 task memory 管理不同,就是 Native Engine 自身会有一个 memory pool,因为在 Native Engine 这一块的内存的申请都是细化到 malloc 这个级别的,也就是在 Native Engine 执行过程中,new 一个对象或者是申请一个小的 byte 数组,都会去调用这个接口来上报内存分配的情况,如果每次 malloc 就要上报的话,就会导致频繁回调接口,进而影响整体的性能。因此在 Native Engine 侧都加了一个 memory pool。比如一次申请 8M 空间。那么在 Native Engine 使用的内存没达到 8M 时,是不会频繁地通过 JNI 的方式调用 Spark 的 Memory Consumer 往 JVM 上报。而 ClickHouse backend 的实现有些不同,它则是如果申请的内存在小于 4M 情况下,并不会向 Spark 申请内存。
不管是以哪种方式,都是为了尽可能的规避频繁通过JNI接口去回调 Spark Memory Consumer 来上报内存,减少对性能的影响。另外 Spark 在内存管理的时候,当 Executor 内存不足时,会去调用 spill 方法让 Memory Consumer 把自己管理的内存释放掉一部分,或者把一部分内存吐出到磁盘上面。当前 spill 功能还未实现,目前全部数据都是 hold 在内存当中,后续计划把这一块补齐。
7. Debug 支持
上图是 JVM 与 Native Engine 调用的关系图。为了方便 Debug,在整个 Plan 转换的过程当中,会以 Substrait Plan 的方式统一地下发给 Native Engine。在出现问题的情况下,可以把出现问题的 Substrait Plan 生成二进制文件。然后就可以把这个 Plan 交由 Native Engine 侧做 UT 或者作为问题重现的输入,达到直接在 Native 侧做 Debug、profile 或者优化等操作。可以避免 Debug 时候同时要启动 JVM 和 Native Engine 的麻烦。
03
当前进展和后续工作
下面介绍一下 Gluten 项目的进展情况,以及后续工作重点。
1. Gluten 当前的状态
当前 TPCH 场景下,Velox 22 条 SQL 已经全部支持。对于 ClickHouse backend,因为 ClickHouse 不支持 not equal join,q21 目前暂时还不支持,其他的 21 条也都已经支持了。目前支持的 operator 包含 Scan、Filter、Project、HashAggregate。Join 支持的是 BroadcastHashJoin、ShuffledHashJoin。关于 Expression 的支持情况是:TPCH 里,SQL 里面涉及的 Expression 都已经支持。
2. 性能
在使用 Gluten+ Velox backend 或者 ClickHouse backend 后,到底能为 Spark 带来多少性能的提升呢?上图描述了 Intel 团队,使用 Gluten + Velox backend,在 TPCH 1000 场景下做的一个测试。我们可以看到在有一些 SQL,比如 Q4 可以达到 3.6 倍的提升。整体也有两倍提升,即整个 TPCH 跑一轮需要的耗时是 Vallina Spark 的一半。这是 6 月份出的数据,一些最新的数据会更新在 Gluten 的 Github 上。后面还做了一些优化,当前该图中的性能已经有了进一步的提升。
而上图是 Gluten + ClickHouse backend 的测试结果,同样也是在 TPCH 1000 下的性能测试,最高的可以达到 3.5 倍的提升。各自跑一轮,整体时间的提升也是达到了两倍多。目前针对 ClickHouse backend,TPCH 的 q21 先 skip 掉,只跑了剩下的 21 条,整体性能提升了两倍。Gluten 团队最近也在性能方面做了进一步优化,整体的性能还会有进一步的提升。
3. 工作计划
后续的一个重点工作就是完成对 TPCDS 的 104 条 SQL 的支持。同时在 data type 方面,会支持 Float、Binary、Decimal、复杂类型等,后面也会再针对这些类型做更进一步的优化。
另外就是 local cache 的支持,如果 Gluten 运行在云上,读取的是对象存储,则需要有 cache 层支持。但这个 cache 层,需要在各自 Native Engine 去做实现,因此需要各自 Native Engine 的 backend 把云端的数据做 cache 管理。这一块也都在计划当中,应该在近期会有实质的进展。
最后是 Shuffle,Shuffle 一直以来都是重点,目前发现 Columnar Shuffle Manager 还存在着一些问题,包括 push based 的 Shuffle Manager 如何支持等,也是后面去调研支持的重点。
以上就是 Gluten 后续的工作重点。
04
回到开始第一张图,在 Spark 发展到 3.2 时,性能提升趋缓。结合 Gluten,不管是 Velox backend 还是 ClickHouse backend,整体的性能提升达到了两倍,甚至某一些场景下面可以达到三倍以上的性能提升。而且这是 Spark 性能整体的提升,并不是只针对某些个别 SQL 的调优。
上图是目前 Gluten 项目参与团队的主要人员名单。因为 Gluten 项目来自于 Intel 的开源软件,所以 Intel 参与的人数最多,而且 Intel 在这一块也深耕了多年。接着就是 Kyligence 的同事们。Kyligence 是从 Gluten 项目开始筹划就一直参与开发的,目前重点 focus 在 ClickHouse backend,也包括整体的 Gluten 功能。接着是 Bigo 的几位小伙伴们也是全职加入到了 Gluten 项目开发当中。
这里还要特别感谢的一位是 Substrait 项目的 owner:Jack,因为 Substrait 项目在发布初期,Gluten 项目就去使用了,在使用过程中得到了 Jack 的很多帮助和建议。
另外还要特别感谢的是 Velox 团队,因为 Intel 团队使用了 Velox 作为 backend 来开发。Velox 团队为 Intel 团队提供了很多支持,也特别感谢他们。
最后欢迎能有更多的小伙伴们加入到 Gluten 开源项目中,Github 地址:。也欢迎小伙伴们加入 Gluten 用户群,一起交流谈论~