基于StarRocks和腾讯云EMR构建云上Lakehouse

时间:2022-11-07 19:06:44

作者:腾讯云EMR业务负责人陈龙(本文为作者在 StarRocks Summit Asia 2022 上的分享)

我目前负责腾讯云EMR 的研发工作,此前先后在百度、支付宝做后端研发。2011年加入腾讯,先后参与了腾讯云Redis、腾讯云云数据库、Apache HBase(以下简称 HBase)以及 EMR 等多款云产品的开发。我个人也向 Apache Hive(以下简称 Hive)等多个社区贡献过代码。今天主要分享:

1. 云上 Lakehouse 基础架构。如何在云上基于高性能执行引擎 StarRocks 和 EMR 构建 Lakehouse?

2. 云上 Lakehouse 性能优化。在计算存储分离的场景下,如何有效保证 Lakehouse 高性能?

3. 云上 Lakehouse 成本控制。如何利用云的弹性能力进行架构改进,降低 Lakehouse 的资源成本?

4. EMR StarRocks 的产品特性。通过 EMR 的产品化能力,如何让云上的 StarRocks 更加易用好用?

 

#01

云上 Lakehouse 基础架构

 

1、Lakehouse 之我见

对于 Lakehouse,可能大家最直观的理解就是数据湖仓一体,到底什么是湖仓一体,Lakehouse 到底要解决什么样的问题,到目前为止没有统一的标准。我理解的 Lakehouse 是基于现代化云上存储计算分离架构,解决了如下问题:

1. 数据重复。尽可能解决维护多套数据分析系统,比如湖和仓的数据重复。去除数据重复性,真正做到 Single source of truth。

2. 低廉存储成本。在数据分析领域中,一般计算和存储都是不对等的。云上的对象存储大大降低了数据分析的存储成本和运维成本。

3. 在技术形态上有统一的架构。在现阶段数据分析场景中,面向不同的场景及不同的时延需求所需要的技术架构也不尽相同,造成系统复杂、运维成本高昂的问题。

4. 有效地降低数据分析的计算和 IO 成本。现在的数据分析系统大多是基于 Apache Hadoop(以下简称 Hadoop)生态技术栈构建,数据更新和删除需要通过 ETL 来进行,会造成大量的重复计算,从而导致计算和 IO 成本上升。

要解决如上问题,Lakehouse 从技术上要具备什么样的核心能力呢?

1. 事务支持和多版本控制。Lakehouse 需要处理多条不同的数据管道,需要在不破坏数据完整性的前提下支持并发读写事务。

2. 高效的更新。基于数据合规和数据更新是业务客观存在的需求。如何高效对数据更新是 Lakehouse 核心能力之一。

3. 高效数据消费。数据分析场景查询是非常复杂的,Lakehouse 需要具有面对复杂业务高效响应的能力。

4. 便捷的数据管理能力。数据管理在 Lakehouse 里面至关重要。数据管理不仅仅是业务数据上质量管理、数据关系的管理,还包含底层数据的索引、元数据版本管理等。

基于以上分析得出,Lakehouse 需要如下四个技术基础组件:

1. 统一的数据格式。基于这个数据格式,可以完成事务管理、高效更新等。

2. 统一的执行引擎。通过统一的执行引擎完成 ETL 类分析和机器查询类分析。

3. 统一的数据管理。提供完善的数据质量、数据分析、数据科学、数据格式等管理能力。

4. 统一的存储。价格低廉、高稳定性、高可靠性的统一存储。

 

2、Lakehouse 基础技术架构

基于这四个核心技术条件,在云基础平台上,如何一步一步去构建云上 Lakehouse 呢?首先从技术架构上拆解云上 Lakehouse。从技术角度看,可以分为如下五层:

1. 计算资源层。云上的云服务器、裸金属以及容器可以为 Lakehouse 提供海量的计算资源,同时还可以通过弹性实现资源随负载的变化而变化。

2. 云上的存储层。云上的对象存储、云 HDFS 、文件存储提供了面向不同业务场景的低成本、高可靠性、高稳定性的存储解决方案。而且云上存储是使用按量计费,成本更加低廉。

3. Data Lake Storage 层(表格式层)。本层使用开源的技术解决方案比如 Apache Iceberg(以下简称 Iceberg)、Apache Hudi(以下简称 Hudi)。同时云上的 Iceberg 和 Hudi 也针对云存储和计算做过大量的优化和扩展。基于开源的表格式的优势是开源开放、格式透明。

4. 统一的计算引擎层。统一计算引擎需要具备 ETL 和继续查询快速响应的能力。现阶段都是通过 Apache Spark(以下简称 Spark)来做 ETL,通过 StarRocks 做机器分析。

5. 统一的数据管理层。本层需提供完善的数据治理。数据管理可以通过 Wedata 来实现数据质量、血缘、数据科学等基础管理,通过 EMR、Lake Service 可以实现更底层的表格式的快照事务效文件以及索引管理的能力,降低 Lakehouse 使用的门槛。基于StarRocks和腾讯云EMR构建云上Lakehouse

3、构建云上 Lakehouse 

接下来介绍如何从数据角度使用云上的产品,把 Lakehouse 组建起来。数据分析面向的数据类型复杂,数据体积延时各不相同,数据延时大为大致分为三类:

1. 事务型数据库产生的数据,一般为应用系统应用程序产生的数据,比如 CRM、ERP 等。

2. 日志类数据,主要由应用程序产生。

3. 时序数据,由物联网、传感器等产生。

数据分析的第一步,使数据尽可能实时地流入到 Lakehouse 之中。数据的流入即数据集成,在云上可通过多个云产品实现。对于事物类型产生的数据,可以通过 EMR Sqoop 或 Spark 导入 Lakehouse 之中。对于日志类数据,可以通过 EMRFLOW、消息服务及、DataInLong 导入到 Lakehouse 之中。对于流式数据,可以通过 EMR、Spark、Oceanus 导入到 Lakehouse 之中。具体选用哪种方式,可根据实际具体的业务成本以技术栈来综合做选择。例如湖中数据可以根据业务场景统一存储为 TableFormat。那么具体选取哪一种湖格式可以根据自己业务的实际需求。

目前 EMR 支持 Iceberg、Hudi 两种湖格式。其中 Iceberg 做了类索引、SavePoint 等很多功能性的优化。流入湖中的数据可以使用对象存储 HDFS 或 CHDFS 作为底层的存储。同时云上的 Iceberg 和 Hudi 基于对象存储做了很多性能方面的优化。围绕湖格式对小文件快照清理、Clustering 索引管理等一直有着较高的门槛。虽然 Iceberg 和 Hudi 也提供了相应的 Produce,但是使用的门槛还是比较高。因此 EMR 提供 Lake service 可以很方便地自动化小文件合并、快照清理等。

对于湖格式内部的 ETL,比如 Clustering,可以通过 EMR 的离线 ETL 来实现。为了达到较好的性能,建议对湖格式定期的做 Clustering。做好 Clustering 后的数据,可以通过 EMR 里面 StarRocks 来分析这些数据。

StarRocks 在 plan 优化层面引入 CPU 和分区裁剪等功能。StarRocks 在执行层面,通过向量化执行引擎和 Native ORC Reader 来保证每一个 SQL 都会有良好的响应。同时在应用层面,应用层还可以通过 DLC 来查询湖里的数据以实现联邦分析,也可以通过 Oceanus 来实现 CDC 功能。

基于StarRocks和腾讯云EMR构建云上Lakehouse

 

4、基于 StarRocks 的云上 Lakehouse

如何通过多个 EMR 集群来实现上述能力?分别从离线链路和实时链路,EMR 以搭建积木的方式来构建 Lakehouse。在离线链路,离线数据通过 DI 工具进行入湖,数据格式可以是 Iceberg 和 Hudi,也可以基于 Hive 的传统离线模式,使用 ORC 等作为文件格式来存储。

因为目前使用的 Iceberg 和 Hudi 缺乏完善的工具做小文件合并、数据的 Clustering。这些操作又会消耗计算资源,因此 EMR 提供了 Lake Service 服务管理服务来简化对 Iceberg 和 Hudi 的使用。EMR 的 Lake Service 提供了快照合并、事务管理、小文件合并以及索引管理的能力。同时云上 StarRocks 支持访问云上对象存储的能力,可以直接查询 Iceberg 和 Hudi 里的数据。结合云上对湖格式的优化以及 StarRocks、CBO 向量化执行等获得良好的性能。

基于StarRocks和腾讯云EMR构建云上Lakehouse

 

5、构建云上 Lakehouse 面临的挑战

构建 Lakehouse 的目的是在数据分析时实现更好的性能、更低的成本。这里的成本包含技术成本、运维成本、使用的计算成本和存储成本,同时整个系统要有很好的可用性。接下来介绍在性能、成本和可用性等方面面临的问题。

1. 性能。在云上存储为统一的对象存储,对象存储和 Hadoop 生态的 HDFS 还是存在一定的差异性,如何保证数据入湖和查询效率是一个不小的挑战。同时还需要 StarRocks 高效的读写云存储。提高性能传统的方式是扫描,尽可能的扫描少的数据文件和良好的索引,在 IO 层面有很好的 IO 能力。

2. 成本。包含两部分成本,一是硬件成本。在大多数数据分析系统中,存储和计算并不对等,如何实现存储成本量化计算成本量化也需要对现有架构进行改进才能实现。另一方面数据分析系统非常复杂,如何高效运维是一个很大的挑战。

3. 可用性。对象存储保证了数据良好的可用性,但是如何高效稳定的使用湖格式是一个很大的挑战。尽管 Iceberg 和 Hudi 也提供了一定的管理工具,但是可用性依然很低。因此在云上需要提供标准化的产品能力提升湖的可用性。

 

#02

如何实现云上 Lakehouse 高性能

 

1、StarRocks 云上架构优化

在可用性方面,StarRocks 的架构简洁,整个系统核心只有 FE 和 BE 两类进程,不依赖外部任何主线,方便不属于维护。同时 FE 和 BE 模块可以在线水平扩散,元数据和数据都用副本机制,确保整个系统无单点。

FE 是 StarRocks 前端节点,负责管理元数据,管理客户端连接,进行查询规划查询调度。FE 配置根据配置有两种类型的角色, Follow 和观察者。Follow 选出一个 Leader,只有 Leader 会对元数据进行写操作,非 Leader 节点会自动将元数据写入路由到 Leader 节点,每次元数据写入的时候必须有多数的 Follow 写入成功才算成功。那么观察者不参与选组操作,只会异步同步并且回放日志,用于扩展集群的查询并发能力。每个 FE 节点在内存里都会保留一份完整的元数据,每个 FE 节点能提供无差别的服务。FE 为了保证性能,在 SQL 做完语义分析之后,还会进行分区裁剪,已过滤掉不必要的数据,同时还会基于 catalog 里面的统计数据进行 CBO 优化,以此生成的物理执行计划是最优的状态。

BE 是 StarRocks 的后端节点,负责数据存储以及 SQL 执行等工作。BE为了保证良好的性能和扩展性,在执行层面引入了向量化执行器,使用SIMD指令集来获得更高的性能,而同时设备又有着较低的负载。同时BE在IO接口方面有良好的扩展性设计,使得很可以很方便的去扩展实现云存储,比如对象存储、CHDFS等存储。

云侧为了降低应用侧的改造成本、同时拥有更高性能,实现了融合存储等技术,使得传统对象存储在调用文件系统 API 时和传统文件系统有着一样的性能表现。同时为了加速云上 StarRocks,引入了两层 Cache 和物化视图技术。

LocalCache 技术:在 BE 级点缓存底层存储数据块实现就近访问,可以通过良好的淘汰算法保证上层有极致的性能。因为云上的 StarRocks 工作在计算存储模式下,BlockCache 主要是缓存对象;存储数据块到计算节点,通过 BlockCache 来降低访问对象存储的延时,以获得良好的性能。物化视图技术:这里的物化视图,是指对经常使用的结果集进行物化,在查询的时候,通过 FE 的 SQL rewrite 直接提取物化结果来提升性能。

​​​​​​​​基于StarRocks和腾讯云EMR构建云上Lakehouse

 

2、EMR Iceberg Data Layout

要获得良好的性能,除了有良好的架构、执行引擎外,对于数据的组织和索引管理也尤为重要。腾讯云 EMR  Iceberg 做了大量的优化工作。

多维分析是大数据分析的一个经典场景,这种分析一般带有过滤条件。对于此类查询,尤其是在高级字段的过滤查询,理论上,我们只需要对原始数据做合理的布局,结合相关的过滤条件,查询引擎就可以过滤掉大量不相关的数据,只读取很少部分的数据。例如我们在入库之前对相关的字段做排序,这样生成的每个文件相应字段的 minmax 值不存在交叉的。查询引擎下过滤条件跟数据源结合每个文件的  minmax 统计信息,就可以过滤掉大量不相干的数据。

上述技术既是我们通常所说的 Data Clustering 和 Data Skip。直接排序可以在单个字段上产生很好的效果。但是如果多个字段直接排序,那么效果会大打折扣。Z order 可以很好地解决多字段排序问题。Z order 是一种可以将多维数据压缩到一维的技术,在时空索引及图像方面使用较广,Z 曲线可以将一线一条无线长曲线填充到任意维度空间。对于一条数据来说,我们可以将其多个要排序的字段看作是数据的多维 Z 曲线,可以通过一定的规则将多维数据映射到一维数据上,构建 Z value 进而可以基于该一维数据进行排序。我们不用太去理解 Z order 在数学上的意义,重点在它为我们解决问题带来的思路——降维。解决问题其实就是要利用空间填充曲线,对多维数据比如一张表的多个列进行降维处理,以提升相关数据聚集效果以及相应数据 minmix 的使用效率。

基于StarRocks和腾讯云EMR构建云上Lakehouse

以图为例,假设红色块是查询中 where 的条件部分。正常情况下,在数据没有被 clustering 前,这些数据分布在 N 个数据文件之中,这样的一个 SQL 要扫描大量的数据文件才能得到结果集。而我们对数据重新排序并 clustering 生成类索引之后,红色块的数据只会分布在部分文件之中。查询给过滤条件的数据源,结合每个文件 minmix 统计信息即可过滤掉大量不相干的数据,这样就可以大幅提升性能。

腾讯云中的 EMR 的 Iceberg 在 ZO 的特性,领先于社区一年进入生产和环境。下面是 EMR 的 ZO 支持的数据类型。EMR 中的 Iceberg 类索引不仅支持基础数据类型,如 int、string、varchar、date 基础类型之外,还支持复合数据类型,如 map、struct 以简化业务使用类索引,同时还提供了 SQL 语法进行 Z 索引的构建,如 offline table、order by column 进一步降低客户使用 Z 索引的门槛。

​​​​​​​​基于StarRocks和腾讯云EMR构建云上Lakehouse

以下是基于腾讯云EMR Iceberg Z 索引基于 SSD 的性能测试报告。基于 SSD 的性能测试的硬件为 10 台八核 32g 磁盘为云 SSD 的标准型机型。

​​​​​​​​基于StarRocks和腾讯云EMR构建云上Lakehouse

从图中我们可以看到,在查询引擎方面,基于 Z order 的查询性能相比,随机扫描有 10 倍的性能提升。相比小文件合并也有 2-5 倍的性能提升。在数据扫描方面,基于 Z order 的数据扫描量只有随机和小文件合并数据扫描量的 40%。一份来自真实的数据:业务总数据量大小是 1.78TB,总行数是 210 条,使用的计算资源是 200 个 CPU。同样的条件下,未开启 Z order 查询需要 1030 秒,而开启 Z order 只需要29秒。

 

3、EMR  Iceberg 流转批

除了通过 Z order 提升性能之外,EMR 中的 Iceberg 还考虑到离线和数据安全,提供了 SavePoint 功能,以进一步降低使用 Iceberg 的门槛。SavePoint 是指将某些文件组织为组织标记为已保存,以便清理程序,不会将其删除。在发生灾难和速度恢复的情况下,有助于数据集还原的时间轴上的某一个节点。同时 EMR 中的 Iceberg 还实现了 StarRocks、Hive、Spark 在查询的时候按照 SavePoint 查询全量数据,可以做到流批融合,既满足了离线场景,又不对现有业务架构做任何调整。

基于StarRocks和腾讯云EMR构建云上Lakehouse​​​​​​​​

EMR 中的 Iceberg 提供了迁移工具,可以将传统 Hive 中的离线表直接转化为湖格式中的 Iceberg 表。SavePoint 具体技术原理是基于 Iceberg 的快照和 TimeTravel 功能,在原表基础上实现了基于事件时间的快照标记功能。可以理解为在原表基础上查看某个事件时间的全量快照。

不同的是,这里强调的是基于数据的事件时间而不是处理时间。在生成好这些 C point 之后,在 StarRocks、Hive、Spark 中读取时,可以根据 meta 表里面的最新的 SavePoint 信息查询某个时刻的全量数据。在性能方面,腾讯云 EMR 做了大量的优化工作,今天分享只是一小部分。

 

#03

云上 Lakehouse 成本控制

Lakehouse 成本控制既是实现 Lakehouse 数据分析成本量化,要实现成本量化,先从现有的技术架构看问题所在。以 Hadoop 集群为例,常规部署时,data load 和计算进程 load manager 为了保证计算数据的亲和性,正常都是部署在一起的。

StarRocks BE 节点既负责计算又负责存储,但实际情况现是计算和存储并不对等,有的时候计算是平均,有的时候存储存储是平均,特别是基于云的环境下,存储全部为云存储,worker 节点只是负责计算。这时我们只需要保留较少的存储节点用于存储计算的中间临时结果和日志。这时就需要对集群的部署结构和引擎内核做改造,以适配云上的架构。下面我们看如何进行改进。

首先来看 Hadoop 集群。把一个 Hadoop 集群的拓扑分为 master、router、core 和 task 四种类型的节点。router 节点用于水平扩展、无状态比较重要的服务,或者是基于云基础设施,简化组件的高可用。比如可以将 HiveServer2 部署在 router 节点上,也可以将 Presto 的 coordinator 部署在 router 上。通过云的负载均衡来实现故障自动切换。而 core 节点相比传统模式下的存储节点,task 节点作为计算节点,只会部署计算进程。而对于 StarRocks 集群,腾讯内部团队对传统的 BE 节点进行拆分,把存储进程和计算进程拆成了两类进程,原来的 BE 节点还负责存储,而计算由新的节点 CN 来负责。至此我们来看基于这种架构,如何通过云上弹性能力来大幅降低 Lakehouse 数据分析成本。

在资源维度看,对于 core 或者是 BE 节点记为固定成本,而 task 或者 CN 节点既是计算节点,可以使用云上提供的自动伸缩,让 task 节点的规模随着负载的变化而变化,使用的资源可以是云上的 CVM 竞价实力或者是容器。那么这里我单独分享一下 EMR 中离在线混合部署的方案。对于 Hadoop 集群,采用的方案是 Yarn on 容器或者是 Spark negative on 容器。这种方案的好处是业务不用修改任何一行代码。对于容器集群,如果是 tke 则可以通过 tke 直接使用业务在线服务器资源。对于跑批的场景特别适用。

正常情况下,业务服务器在夜晚时低峰,而夜晚则是跑批的高峰,也可以使用云上的 eks,使用 eks 则不用关注具体的机型,易用性更好。同样对 StarRocks 集群,一样可以通过离在线混合部署,将 CN 节点直接部署到容器集群之中,通过自动伸缩竞价实力和复用容器里面的在线资源来降低成本。

接下来介绍 EMR 里提供的弹性化产品能力。在弹性能力上,EMR 提供了三种弹性模式来让开发者便捷使用云上的弹性能力。

第一种是托管伸缩。托管伸缩是指自动调整集群大小的冷功能,能够以最低成本实现最佳性能,只需要设置集群最低与最高限制即可使用。托管伸缩能够自动根据负载对集群资源进行规模化,甚至借此实现最佳性能与最低运营成本。无需预测负载模式或者编写自定义逻辑即可轻松建立弹性集群。在弹性资源上可以选择 CVM 竞价实例容器和裸金属,同时缩容过程中支持优雅缩容,配合内核上的改造,在缩容过程中不会导致 query 失败。

第二种是自动伸缩。自动伸缩相比托管伸缩需要显示去设定规则。自动伸缩提供了两种模式,第一种是按负载伸缩,可以根据引擎里面的负载情况进行自动的扩容或者是缩容。而按时间伸缩则可以通过某个时间点进行扩容或者某个时间点进行缩容。对这两种伸缩模式的资源都是都支持 CVM 容器竞价实力和裸金属,同时不仅支持优雅缩容,还支持将这些资源具体加入到某个资源队列或者是资源标签,以降低业务使用的门槛。

第三种是手动缩容,在这种模式下一般用于手动扩缩容,同样支持优雅缩容。对于存储节点,在缩容的时候会自动搬迁数据,无需关注缩容的细节。以上是 EMR 对 Lakehouse 在数据分析中对针对资源成本层面的优化。

 

#04

EMR StarRocks 产品特性

云上的 StarRocks 从如下四个维度提供了产品化的能力,结合云基础设施,让云上的 StarRocks 更加易用和好用。

第一是集群操作。在集群操作层面提供了创建、扩容、缩容以及执行节点变配的规格变配的能力。同时对于 StarRocks 的管理, web UI 提供了统一安全代理。针对 StarRocks 快速发展和版本变化,EMR 提供集群内版本自助升级和引导程序的能力,以便快速更新和自助开发。

第二是集群管理。集群管理层面提供了配置管理,支持 FE、BE 的基础配置文件管理,同时还支持自定义配置文件,以便访问不同的数据源。同时还支持对不同的机型配置分组和配置分类的能力,让开发者一目了然,了解每个配置项的含义和默认值。第三是服务进程管理。针对 StarRocks 的服务进程, EMR 提供了安全启停、进程监控以及一些高阶操作的产品化能力。

第四是集群 APM。针对 StarRocks 集群的 APM,EMR 提供了从 S 层面到 StarRocks 组件层面各个维度的基础监控告警能力。那么监控指标大盘可见,同时还提供了进程日志搜索能力。针对集群的关键事件提供了集群事件、集群巡检能力,让开发者在第一时间掌握集群的健康情况。

 

关于 StarRocks 

StarRocks 创立两年多来,一直专注打造世界*的新一代极速全场景 MPP 数据库,帮助企业建立“极速统一”的数据分析新范式,助力企业全面数字化经营。

当前已经帮助腾讯、携程、顺丰、Airbnb 、滴滴、京东、众安保险等超过 170 家大型用户构建了全新的数据分析能力,生产环境中稳定运行的 StarRocks 服务器数目达数千台。 

2021 年 9 月,StarRocks 源代码开放,在 GitHub 上的星数已超过 3500 个。StarRocks 的全球社区飞速成长,至今已有超百位贡献者,社群用户突破 7000 人,吸引几十家国内外行业头部企业参与共建。