阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse

时间:2024-10-18 07:31:09

摘要:本文整理自阿里云计算平台事业部 OLAP 引擎开发工程师焦明烨老师在8月3日 Streaming Lakehouse Meetup Online(Paimon x StarRocks,共话实时湖仓架构)上的分享。主要分为以下四个内容:

  1. StarRocks数据湖能力介绍
  2. 使用阿里云EMR StarRocks构建基于Paimon的极速实时湖仓
  3. StarRocks + Paimon的最新进展
  4. StarRocks + Paimon未来规划

我的分享将分为以下四个部分。首先,会介绍一下 StarRocks 的数据湖能力,也就是回答为什么要用 StarRocks 来进行数据湖分析这个问题;接下来,简单讲一下阿里云上的 EMR Serverless Serverless StarRocks 是什么样的、该怎么用它来构建 Paimon 湖仓;然后介绍近期在 StarRocks 访问 Paimon 这个方向,我们实现了哪些新功能、做了哪些优化,分别起到什么样的作用,会在哪些方面为使用者赋能;最后,我会将我们正在进行中的工作提前透个底,告诉大家哪些新特性将会在未来上线。

一、StarRocks数据湖能力介绍

StarRocks 从 3.x 版本开始,一直在着力打造数据湖分析的新范式,期望能利用好自己在 Warehouse 上的优势,积极拥抱各种湖格式,在湖上为使用者提供不输给数仓的使用体验。StarRocks 社区有一个非常专业的 DLA Team,专门负责开发和优化 StarRocks 的数据湖场景,因此打造了一款非常优秀的数据湖分析引擎。StarRocks 数据湖分析的特点,我总结了两个关键词,也就是极速统一和简单易用。

  • 极速统一

StarRocks 作为一个向量化引擎,具备全面的向量化执行能力;此外StarRocks拥有现代化的CBO,全面兼容多种湖格式,无论是 Hive Paimon 还是 Iceberg Hudi Delta Lake 等等,都可以使用 StarRocks 进行查询;不仅如此,StarRocks 还实现了 IO 合并,优化对象存储的读取,综合下来相对 Trino 能给带来 3 倍以上的性能提升。

  • 简单易用

StarRocks 建立与数据湖的连接非常方便快捷,是能够做到开箱即用的;与此同时,StarRocks 能够兼容 Trino 的语法、关键字、函数转义等,迁移便利快速,具有九成以上的兼容度。最后,StarRocks 能够通过 Audit Log、Profile 等多种方式对执行的查询进行分析和处理,方便业务侧进行治理和优化。下面我会详细介绍这两个方面。

StarRocks 优势:极速统一

前面已经说过,StarRocks 具有自己在 Warehouse 上的诸多优势,因此希望也能将类似的体验带给湖上用户。先来看左边这幅图,StarRocks内部有统一的 Catalog 机制,不论是 StarRocks 的内表,多种格式的湖上外表,还是 JDBC 外表,在逻辑上都是这样一个 Catalog 的概念。也就是说,StarRocks 单一引擎就能够同时访问包括但不限于内表、Paimon 为代表的湖上外表、MySQL 为代表的 JBBC 外表等多种数据源,并且能够支持跨数据源的 Join。只要用户需要,任意一个数据源的数据都可以写入 StarRocks 内表,实现统一的数据加工和处理,非常方便。此外内外表的数据访问还能够通过 StarRocks 的 RBAC 机制进行统一的权限管理。当然,如果还想使用湖上常用的权限管理方案比如 Ranger,StarRocks 也是支持的。上面就体现了 StarRocks “统一”的地方所在。那么极速体现在哪里呢?我们来看右边这张图。右图的上半部分是 StarRocks 的架构,可以看到 FE 的查询优化器、BE 的查询执行器与 IO 引擎等等等等,唯独少了什么?少了存储对吧。在这个场景下,湖就是 StarRocks 的存储。也就是说,我们可以理解为,StarRocks 做到了把数据留在 HDFS 或者对象存储上,在读取的时候从这些文件系统中获取需要的数据,后续的查询与分析过程都和内表别无二致。作为一款实现的很优秀的查向量化引擎,根据我们的实测,即使只替换查询引擎将 Trino 换成 StarRocks,其他什么都不做,速度就能提高3倍。不仅如此,StarRocks 还提供了多种多样的特性,来对查询进行加速,比如内存和磁盘的两级 Data Cache 缓存,物化视图等。也就是说,对于需要经常执行的湖上热数据的查询,StarRocks 能够做到更快。

StarRocks 优势:简单易用

我直接举例子说明吧。先来看上面的两幅图。如果有熟悉 Trino 的观众应该知道 Trino 想要新建一个连接器是怎么做的,我们是不是需要按照一定规范写一个 Properties 文件,把这个文件分发到包括 Cooridinator 和全部worker 在内的所有节点,然后重启整个集群?不仅如此,一旦有一个配置配错了,Trino 直接拒绝服务了,启动都起不来。也就是说,Trino 的 Catalog 管理和配置校验是 Server 启动的时候做的,对系统维护者来说是一个考验。虽然 Trino 最近也做了 Dynamic Catalog,但这个功能默认是关的,还是 Static 的,Trino 的人说他有安全问题。并且,Trino 的 Catalog 是插件形式的,即使用了 Dynamic Catalog,只要之前用这个 Catalog 执行过查询,就还是会出现资源释放不掉的情况,这个体验显然是非常不好的。StarRocks 这边就不一样,外表 Catalog 默认就是动态创建和配置的,可以做到开箱即用,只要我们保证网络是畅通的,就可以在客户端通过写SQL的方式直接创建一个外表 Catalog,我们看到上面我创建了一个 type 是 Paimon 的 Catalog,这个 Catalog 和 Paimon 自己的那个 Catalog 是一一对应的,我设置了 type 是 DLF,然后配置了 DLF 的 ID,上面全部的操作和 show 出来这个 Catalog 里面的全部数据库,加起来总共花了还不到一秒,是非常简单易用的。StarRocks 也支持对查询进行分析,这里就以最常用的 profile 为例,看这幅小图,我故意构造了一个执行可能会比较慢的情境,我们可以看到一个简单的 select count(1) 花了4秒,数据量也只有一万条,这显然是不合理的。但是只要我们拿到这个查询的 ID,并且开启了 profile 功能,我们就可以获取这个查询对应的 profile。拿到 query id 的方式有很多,包括从系统表里查找、翻找 Audit LOG 等等,这里只是其中一个方式,就是调用函数查询。右边的图就是这个 profile 的文本,当然 profile 很长啊,通常会有成百上千行,这里只截取了一个开头,但从开头已经可以看到 Analyze 这个步骤花了2.6秒,占总时间的一半还多了,时间都花在这了,我们就可以看看为什么 Analyze 要用这么久,从而优化我们的查询业务。

数据湖分析常见场景

聊完了 StarRocks 数据湖分析的特点,我们再来说说几个常见的场景,我就挑了这四个,分别是数据湖分析加速、湖仓分层建模、冷热融合以及全链路 ELT。我会以 Paimon 为例,说说这些场景分别是干什么用的、怎么用。

场景一:数据湖分析加速

第一个场景是最基础的数据湖分析加速场景。我们对比的基准都是 Trino。Trino 也是一个非常优秀的 MPP 查询引擎,非常稳定而高效。刚刚也提到过 StarRocks 是能够兼容 Trino 的语法函数和关键字的,也就是我们把 Trino的语法解析器包了起来,Trino 的 SQL 传进来以后被我们转成了 StarRocks 能看懂的的 AST,后面的流程,包括生成执行计划和查询这些就和 StarRocks 原生语法都一样了。开启的方式也很简单,就是设置一个 session 变量sql_dialect =“Trino” 只要开启了这个,StarRocks 一秒变身成 Trino。具体的使用案例可以看阿里云 EMR Serverless StarRocks 的文档,里面有实际的演示,包括开启前怎么提示函数解析错误查询失败,开启后怎么得到结果等等。根据我们多家用户的生产环境实际测试,可以保证兼容度高达 90% 以上,大多数常见的场景是都能兼容的。可能 Trino 有一些特殊的函数,比如正则什么的,StarRocks 这边没有,这些也还在适配,可能有一些已经适配完成了。

可以看到,以 Trino 等引擎为基准,StarRocks 什么都不做,直接平替就能带来3倍的性能提升,如果更进一步开启 Data Cache,在缓存盘足够大、缓存能够大量命中的前提下,提升更是来到了6倍。新版本的 Data Cache 也是先进了很多,增加了主动的缓存预热功能,并且增加了多项可观测性的指标,这个功能之前是关的,现在也是默认打开了。如果觉得还是不够快,还想要再进一步的提升,就需要后面的一些场景用到的手段了,我这里列举了一些,后面都会一一涉及到。

场景二:湖仓分层建模

第二个场景是湖仓分层建模。我们先来思考一个问题,直接查湖和内表相比,慢在哪里呢?我们前面说过,查内表和查湖逻辑上只有存储是不一样的,那么答案也就显而易见,IO 会是天然的第一道瓶颈。因为我们需要把数据从远端,通常是对象存储,拉取到查询引擎本地。在读数据的时候,经常会遇到网络带宽打满的情况,即使带宽没打满,也会有各种各样的IO问题。有人说,我把数据从湖导入到内表不就可以了,但是其一,本地存储是很昂贵的,远远比对象存储要贵,因此 warehouse 中往往不能存放全量的数据;其二,导入毕竟是要花时间花资源的,也需要专人来维护,等到导入完成了,查到的是什么时候的数据就不好说了,这就局限了使用的情境。于是, StarRocks 就打造了这样一个场景,即湖仓分层建模。可以从图上看到,StarRocks 可以用物化视图的形式,对对象存储中存放的湖上数据进行分层。物化视图就相当于内表,查物化视图会比直接查湖快很多。并且,物化视图是可以嵌套的,在物化视图之上又能构建一个物化视图,每层物化视图都是一次聚合,每一层应对不同的场景,越往上的数据越精炼,所有数据都可以访问到。物化视图可以做到在分区粒度刷新,比如我们以天为粒度刷新,刷新的成本也是比较可控的。这样一来,每层之间的依赖关系的维护,数据的异步刷新,以上这些所有工作,都只需要StarRocks 一个引擎就可以完成了。这样,业务侧就可以查到实时与近实时的数据,并且速度也是非常快的。

场景三:冷热融合

下一个场景叫做冷热融合,这个场景也是依赖物化视图的。我们实际业务中,数据可能是会有冷热之分的,冷数据就是那些可能很久才会查一次的数据,热数据就是近期经常要查询的数据,如果数据全都在湖上,那冷热数据可能就没什么区别了,查的速度都差不多。这个时候,我们在创建物化视图的时候就可以指定热数据的周期,比如我这里设置了是三天,这样物化视图里就只会保留最近3天的数据。然后,StarRocks 还支持物化视图改写的功能,也就是说业务侧不用改 SQL,SQL 写的查询的还是原表,但是 StarRocks 会自动判断这个SQL命中了物化视图中的数据。当业务频繁访问这三天的数据的时候,实际*问的都是物化视图,这样速度就会很快。如果查询融合了冷热数据,StarRocks 支持自动将冷热分区合并,可以看右下的这个图,热数据会从MV中获取,冷数据还是从湖上获取,然后 StarRocks 内部会先做一个 Union,然后再执行后续的处理。这也能加快查询速度,但可能加快的没有那么多。

场景四:全链路ETL

今天我想介绍的最后一个场景是全链路ELT。StarRocks 不仅是一个查询引擎,他也是能够写的。换句话说, StarRocks 具有一定的轻量 ETL 能力。首先 StarRocks 近期对 Spill,也就是当内存不足时将查询的中间结果向磁盘溢写这个能力进行了大幅度的优化,这可以说是 ETL 必备的能力了。然后 StarRocks 可以通过 VVP 也就是 Flink进行 CTAS 或 CDAS,导入各种数据,打造适合大批量数据的全链路的实时数仓,不止如此,StarRocks也是可以写湖的,社区对 Iceberg 和 Hive 支持的比较好,也可以直接将 ORC 或者 Parquet 文件写到文件系统中。我们呢,近期也支持了写 Paimon。这样一来,StarRocks 拥有非常完善的 Connector Sink,可以读内表或者湖上的数据,再写回湖,实现降冷操作,写的速度也很快,写 HDFS 的速度端到端能够达到 Trino 的3倍,对象存储由于 IO的影响很大快不了这么多但也是快的。这样一来,就可以由 StarRocks 这一个引擎,完成一个全链路的轻量 ELT。

二、使用阿里云EMR StarRocks构建基于Paimon的极速实时湖仓

阿里云 EMR Serverless StarRocks 的特点我这里列举了很多条,大家可以自己看一下。其中有些是社区版就有的能力,有些是我们自己的增强。我们做的优化一方面是易用性上的,可以看最右边这一列,阿里云的 EMR Serverless StarRocks 是完全云原生的,可以做到免运维的即开即用,弹性能力也很好;另一方面是生态上的,针对我们云上的产品做了各种各样的集成,非常便利。对内核我们根据情况也做了一些改进,这里我就不一一列举了,感兴趣的大家欢迎来体验。

可以看到,我们的产品是分为三个版本的。存算一体就是比较传统的,大家最常遇到的那个版本,数据都存放在云盘或本地盘上,好处是读写性能非常高,适合追求高效率和高并发的大数据分析的用户;存算分离是指存储在 OSS 上,能够节约大量存储成本,并且存算分离集群的缓存做的也是很不错的,对于经常访问的热数据,能够做到查询性能对齐存算一体;前不久我们产品的存算分离版本也是摘掉了 beta 版的标识,意味着基本稳定,可以放心使用了。数据湖分析版本则是纯数据湖查询的版本,和今天我们讲的场景关系比较密切;如果大家想找一个 Trino 集群的平替,就想把一个 Trino 集群平迁过来,选择数据湖分析版本是不会有错的,当然,这并不是说其他版本就不能查外表了,也能查,但毕竟专业的人做专业的事,如果用户只有湖分析的需求,选这个版本是更好的。

接下来我会用几张图来展示一下 EMR Serverless StarRocks 产品层面的一些功能与亮点。可以看到,这是一个即开即用的 SQL Editor,伴随着新建集群会自动创建好。我们想要执行任何 SQL,比如我这里的 Create external Catalog,只需要写在这里然后用鼠标选中,就可以运行了。这个 SQL 也是可以持久保留的,不会丢。这里建好后,左边数据库管理这里就能过看到我们创建的 Catalog,可以看到这个 Catalog 下有哪些数据库哪些数据表,甚至可以展开看到每张表的结构是什么样的。我们如果想创建物化视图也是一样的,可以看到这个 CREATE MATERIALILZED VIEW 语句,我们设置刷新间隔 INTERNAL 为1小时,这个物化视图就会异步的以一小时一次的频率去刷新当然也可以手动刷新,相关的命令有很多,大家可以参考文档。

前面还提到了 StarRocks 的 Profile,这是一个非常常用且好用的工具,但是我们也看到了,打印出来的 profile 是一大长串文本,虽然兼容并包,但里面各种各样的指标实在太多太长了,非常不清晰,可能光是找哪个地方慢都要找半天还不一定能找到。于是我们就产品化了一个分析诊断工具,就是图上的这个,我们 Manager 里是能看到所有查询的,然后我们会把所有慢查询单拎出来,默认是执行了5秒以上的就是慢查询,当然也可以自己调。对于执行缓慢的查询,在开启 profile 后,我们会提供一个可视化的 profile 查看页面,就像这幅图的右半部分,执行越慢的算子就越红。我这里是查了一个 Paimon 外表,然后做了一个两表 join,可以看到这个查询中执行最慢的就是 Connector Scan 也就是扫描表的时候慢,我们如果想要优化就可以考虑加缓存等等。总之这里就可以非常直观的看到查询的瓶颈在什么地方,方便我们进行针对性的优化。

最后再简单看一下监控。监控不止能看到集群的概况,也就是 CPU 内存磁盘使用率这些,也能看到集群使用的相关信息,我图里展示的只是很小的一部分,这是一个我自己的测试集群啊所以可能使用率不是很高,这里对查询数、查询延迟等大家关注的很多指标都进行了统计,并且这个界面是十分简洁的,一眼就能看清楚。

三、StarRocks + Paimon的最新进展

接下来是第三部分,我会介绍一下 StarRocks 集成 Paimon 这块,最近上新了什么东西,也就是一个简短的 Release Note,告诉大家我们支持了这些新特性。

首先一个比较重要的是,我们支持了读 Paimon 的 Deletion Vector,目前还只支持了主键表,这类表相比传统的 MOR 主键表,读性能会好非常多。其次,我们支持了物化视图改写,前面提到的很多场景都是建立在物化视图改写之上的,没有这个功能,我们之前聊的场景二和场景三都没办法实现,所以改写也是非常重要。第三,我们把 Paimon 也接入了 StarRocks 的 unified catalog,关于这个是什么我后面会说。最后,我们之前也提到过, StarRocks 现在也能写 Paimon了,虽然这个功能现在还没发布,但是我可以告诉大家,现在已经具备这个能力了。

说 Deletion Vector 很多人可能一脸懵,不知道这是个什么,接下来简单讲讲。Append 表的 Deletion Vector Paimon 那边也在做,但是现在,Deletion Vector 还是一种特殊的主键表。传统的主键表都是 MOR 的,也就是在读的时候,会根据元数据对 Upsert 了的文件进行 Merge 也就是合并,每次读都要 Merge。而 Deletion Vector则是用 Bitmap 来标记,我数据文件中的哪些列是被 Upsert 掉了需要删除的,也就不需要合并,只要跳过就可以了。右边是我随便找了一个 Deletion Vector 表的文件结构。如果大家对PK表的目录结构比较熟悉应该一眼就能看出来,和传统的 PK 表相比多了什么?多了一个 index 路径,以及 manifest 路径下面多了一个 index manifest。左边这幅图是摘自 Paimon 的官方网站,index 就是哪个 bitmap,用来标记哪些行是需要被删掉的,index meta则是index的元数据,用来表示 index 里的这段 bitmap 对应的是哪个数据文件等等。这和 Iceberg 的 Position Delete 在思想上是差不多的,只是实现方式不太一样。很多人的业务实际使用中,由于查全量PK表太慢,都是查ro 表也就是做了 Full Compaction 的那一部分,这样当然会比较快,但是也就丧失了实时性。改用 Deletion Vector 后会怎么样呢?我们来看实测。

下面的图这是我们的实际测试结果。构造了一个表,共有5亿行数据,其中主键的范围是0到10亿,但由于主键是随机的肯定会出现碰撞,这样构造的表的独立主键数,也就是如果我们进行 FullCompaction 剩下的条数大概 3.9亿多不到4亿。然后为了能看出来差异我故意搞了个最小的集群,一台FE一台BE,并且每个节点都只有8CU,这已经不能再小了,用这个集群,可以看右边,相比传统的 MOR 表,Deletion Vector 执行这种简单的聚合查询会有 6~8 倍不等的性能提升,这可以说是非常可观的。Deletion Vector 的缺点是写速度会比 MOR 慢一些,因为毕竟写的时候要写 Index,做的事情更多了,只要能接受这个缺点,大家使用 PaimonPK 表的时候,我们都会推荐开启 Deletion Vector。

然后是物化视图改写,我们前面说了那么半天其实大家可能已经知道改写是什么了。我这里再描述一下。直接看左边吧,这是一个 TPCH 的例子,我创建了一个 t1 t2 join 的物化视图,之后我执行了一个t1 t2 t3三个表 join 的查询,查的是原表。这种情况下,StarRocks 发现,这个查询中有一个 join 命中了物化视图,然后这一部分的查询就会自动被改写到物化视图上,这个过程用户是不感知的,也就是所谓透明加速。目前来说,支持的改写类型有我右边列出来的这些。分别是完全匹配,部分匹配也就是表命中了但是 join 没命中,以及 query delta 也就是左边的这种情况。

物化视图包含3表 join,查一个两表 join 这个场景也就是 View delta Join 目前还命中不了,我们也在努力支持中。

然后是 Unified Catalog。有人可能会觉得,我所有湖格式的外表都在一个统一的 Metastore 里,我要给每种湖格式都建一个 catalog,我觉得这还是太麻烦了,怎么办?StarRocks提供了 Unified Catalog 这个功能,专门就针对这种情况,这是一个特殊的外表 catalog,里面什么表都有,只要元数据一样,就都能查。Paimon也是支持了 unified catalog,目前支持的元数据类型是 hms 和 dlf,filesystem 算是 Paimon 特有的,目前确实还不太能支持。

最后来说说 Paimon Sink,所谓Sink就是写入,业务中常常用来降冷。目前,我们把Paimon也接入了StarRocks的 Connector Sink 框架,可以说基本的写入功能已经具备了。但是当前版本的限制还比较多,因为我们是用JNI实现了一个 Writer,导致不但速度比较慢,内存消耗还很大。所以当前版本的 Paimon Sink 我们还没有对外正式发布。我们下一步会对接 StarRocks 的 Native Writer,争取将一个完备的 Paimon Sink,在测试充分之后带给用户们。

四、StarRocks + Paimon未来规划

最后一部分,请允许我小小的剧透一下未来的规划,其中每一项都是现在进行时,我们来看看都有什么。

首先是元数据缓存。Paimon 的物化视图改写现在有个痛点,就是每次改写的时候都会去重新确认一下分区的最后更新时间来判断分区是否过期,这个过程是要访问远端文件系统的,这就导致了当数据量很小的时候,可能开了改写比不开还要慢。我们会实现一个 Catalog 级别的缓存,这个据我所知应该是在 Paimon 里实现了的,这能够减少远端文件读取,有效提高响应速度。第二是一个分布式 Plan,这适用于 Big Meta Data 场景,因为 StarRocks 在查询的时候FE是单点的,我们知道 Paimon 的元数据都是文件系统中的实体文件,虽然文件不大但是需要FE这边来进行反序列化和解析,这种多生产者单消费者的场景很容易导致FE成为瓶颈乃至Full GC。所以我们就在想能不能用BE来做这件事。这在 Iceberg 那边已经实现了,Paimon 也正在进行中。第三点是 Paimon 最近支持了索引,有 Bloom Filter 和 Bitmap 两种,这两个我们也都会支持。第四,StarRocks+Paimon 是我们 DLF 2.0 云上统一湖仓的主力,这个场景未来会用的越来越多,希望能够尽快给大家带来喜讯。最后,上面这些工作在 Paimon 和 StarRocks 两边都会有涉及,欢迎有意愿的开发者来共创。

最后大家如果对刚才介绍的阿里云 EMR StarRocks 感兴趣的话,可以登录我们的官网开通,我们有免费试用以及一些资源包供大家开通体验,使用中有什么问题也可以加群和我们交流。