导读:在日常线上生产环境中有大量的数据需要被访问,为了保证数据同步以及查询效率,需要耗费较大的资源,同时,很多被查询和访问的数据是重复的,这对数据库系统造成极大压力。
为了解决这一问题,B 站采用 Presto 搭配 Alluxio 的模式来提高系统性能,本次分享从以下四方面展开讲解:
分享嘉宾|杨洋 bilibili 大数据开发工程师
编辑整理|张玮
出品社区|DataFun
01
B 站离线集群架构
B 站 SQL On Hadoop 的架构,整体由 5 个部分组成:
- 最上层服务端,包括 BI 报表、数据质量校验、ADHOC 查询等
- 服务端由 Dispatcher 引擎接入,并提供路由服务
- 支持多种形式提交 ETL 作业到 Yarn 来进行调度的调度平台
Dispatcher 是 B 站团队(下称团队)自研的路由服务。通过 Dispatcher,用户提交Query 作业不需要指定使用哪些执行引擎,Dispatcher 会根据提交的 Query 语句来分析需要读取的 HDFS 的数据量,或结合计算引擎的负载情况,路由给相应的计算引擎执行。大多数情况下的 SQL 的语法是兼容的,但 Presto 语法和 Hive,Spark 冲突,这时如果用户编写 Hive 语句到 Presto 上执行会由于不兼容而导致执行失败。对此,团队采用了 Linkedin 开源的 Coral 组件对语句进行转换。查询量比较小的 SQL,优先转给 Presto 去执行,如果 Presto 执行失败,再依次降级到 Spark 和 Hive 上进行执行。目前该架构的 Presto、Hive、Spark 引擎均接入 Ranger 来实现权限控制,Ranger 可以对计算引擎做表、字段级别的权限控制,以及 Column Masking 和 Row Filter 的权限控制。
用户通过 Client 端提交的查询语句,由 Gateway 来进行路由。Presto-Gateway 是开源项目,它可以为 Presto 提供基础路由服务,我们对其改造使查询路由到不同机房下不同 Presto 集群的 Coordinator 上。此外,团队对 Presto 进行改造,使 Coordinator 支持多活,解决了 Coordinator 的单点问题。目前共有 7 个集群,分布在两个机房,且实现了跨机房功能,单个集群节点最大可达441。平均下来Presto一个月执行约 500 万作业,每天执行作业数约 16 万,读取 HDFS 的数据量约 10PB。
02
Presto 是基于内存做运算,因此查询效率较高,但在使用过程中,仍存在一些问题:
- 计算存储分离架构带来网络开销。一般的数据库,如 MySQL,计算和存储都在同一台节点上,而 Presto 是存算分离的架构,本身不存储数据,只做计算,Presto 通过内部实现多个 Connector 获取远端数据,因此 Presto 可以做联邦查询,但从远端获取数据势必会有网络的性能开销。
- 查询性能不稳定,容易受慢 RPC 或热 DN 影响。Presto 在我们的场景中主要用于查询 Hive 表,需要从 HDFS 获取数据,HDFS 会存在慢 RPC 或者热点 DN 的情况,这会导致 Presto 查询性不稳定,如图,每隔一段时间就有一个比较长时间的 RPC 请求。
- Presto 读 HDFS 缺少 Data Locality,性能方面还待提升。
同时,通过 Presto 收集的血缘信息可以发现,有的表和分区是反复被访问的热数据,而有的表和分区却基本不会被访问,如果能将这些热数据提前进行缓存,查询效率将会大大提升。
因此,团队引入 Alluxio 来缓存这些热数据。整合 Alluxio,需要解决以下 3 个问题:
- Alluxio 与 HDFS 的 Scheme 不同
1. Alluxio 与 HDFS 的 Scheme 不同HDFS 的 Scheme 通常是 HDFS 开头,Alluxio Scheme 是 Alluxio 开头,Presto查询时远程到 HiveMetastore 获取相应的分区信息时,如果要到 Alluxio 里面获取信息,需要将 HMS 里分区的 Schema 信息替换成 Alluxio 的 Scheme,但有些查询引擎,比如 Spark,本身不通过 Alluxio 查询,这种情况会造成线上查询不可用。对于这个问题,Alluxio 社区(下称社区)的做法是在高版本中的 Presto 支持 Alluxio连接器,这个连接器可以直接从 Alluxio 中获取元数据,无需再访问 HiveMetastore 里面获取 Scheme 信息。同时,Alluxio 中有 SDS 模块,会与底层的 HiveMetastore 做一个通信,然后在 Alluxio 中把响应的逻辑封装好返还给 Presto。其他的一些互联网公司也采取类似的做法,通过维护新的一套 HiveMetastore,比如专门用于 ADHOC 的场景,会使新的 HiveMetastore 与原先的 HiveMetastore 保持同步,同时开发了白名单,通过白名单来确定哪些表和分区需要通过 Alluxio来进行缓存。但维护新的一套 HiveMetastore 运维成本高,而且,不同的表和分区被访问的频率差别很大,有的表数据几乎不被访问,Alluxio 的缓存空间是有限的,如果将这些很少被访问的表数据也进行缓存,势必要占用缓存资源。因此 B 站团队没有采用上述的两种做法,而是希望通过打 Tag 方式来控制哪些表走Alluxio 哪些走原来的逻辑。团队对 Hive Connector 进行了改造,通过 HMS 来获取 Partition 信息后,根据 Partition 信息来判断是否走 Alluxio。Alluxio 的缓存空间是有限的,不可能也没必要将所有数据都缓存在 Alluxio 中。那么哪些数据是需要缓存到 Alluxio 中的呢?团队的做法是将获取 Presto Query 血缘信息吐到 Kafka 中进行分析,将符合缓存条件的数据缓存到 Alluxio 中,通过以下方式可以确定哪些数据是需要缓存的:
- 计算访问热度:计算表一周内的访问频率均值,将均值和设定的阈值(比如 10)作比较,如超过阈值,则认为是访问频率较高的热数据,对计算获得的热数据打上 Tag 做标识,然后通过 Kafka 消费程序把血缘信息落地到 Tidb 中。
- 计算 TTL:计算离当前最远的热分区的时间跨度,第二天窗口移动,将超过 TTL 的分区剔除,把最新的分区加载到缓存中。
3. Alluxio 与 HDFS 数据一致性保证当底层 HDFS 发生变动的时候,Alluxio 中缓存的数据就成了旧数据,这时候计算引擎到 Alluxio 中查询数据,获得的计算结果是不准确的。社区对于数据同步已给出了解决办法:通过参数来控制 Alluxio 与 HDFS 的元数据同步。
考虑到查询慢 RPC 的情况,团队并没有在线上生产环境使用这个功能,而是开发了一套缓存失效服务,通过监听 Hive Meta Event 事件来做缓存更新。当监听到事件是 Drop Partition 或 Alter Partition 时,并且这个执行了 Alter 和 Drop 的分区刚好在 Alluxio 缓存中,就会触发缓存失效,Invalidate 这个分区的数据;当监听到的是 Add Partition 事件,这个新增数据的表刚好是 Alluxio 中缓存的热数据,服务会新增一个分区,将新分区的数据 Distribute Load 到 Alluxio 中。团队发现线上的 Alluxio 集群 Master 进程会发生偶发的 Crash。当时 Alluxio 是部署在容器里面,容器重启后相关的日志会被消除掉,因此把 Alluxio 重新部署到物理机上,方便排查问题,当下一次 JVM 崩溃时就可以看到 JVM 打印出来的日志。如图,可以看到具体的调用栈过程:当 Client 端向 Alluxio 发送请求获取 File 的 GetStatus,由于我们线上 Alluxio 集群对元数据的存储开启了 RocksDB,因此会走到 RocksDB中,通过传入的 BlockID 信息获取相关 Location,在这过程中,有 Rocks Object 对象发生 GC 被 JVM 回收了,但 RocksDB 是 C++ 的 JNI 里面还有相关引用,导致 Segment Fault 报错,最终导致 JVM 崩溃。
社区里有一个相关的 Issue,地址:
如果有遇到 Alluxio 集群中的线上 Master 进程偶发的 Crash,并且是用 RocksDB做元数据存储,可以看看是否是这个问题导致的 JVM崩溃。
如图,绿色和蓝色折线分别是 Presto 查询 HDFS 和 Presto 查询 Alluxio 的时间,通过对比可看到 Presto 查询 Alluxio 花费的时间明显比查询 HDFS 的时间要少,平均节省约 20% 的查询时间。
- 引入 Alluxio 之后 Presto 架构的变化
未引入 Alluxio 之前的 Presto 执行流程:① 用户通过 Client 端将 Query 作业提交到 Presto 集群;② Coordinator 利用 SQL 解析器对 SQL 进行解析生成相应的语法树;③ Logicalplanner 对语法树进行解析,生成逻辑执行计划,利用优化器进行优化;④ DistributedPlanner 将执行计划进行切分,生成多个 Stage,内部生成多个 Task;⑤ 通过 Scheduler 调度器将任务调度到不同的 Worker 上,进行任务执行。
引入 Alluxio 之后,Presto 可以直接通过 Alluxio 读取数据,不需要每次都访问HDFS,只有当 Alluxio 里面没有这个数据的缓存时,再到 HDFS 中获取数据。
目前已有约 30% 的线上 BI 业务接入了 Alluxio 缓存,缓存了 20w 分区,约 45TB的数据量,读 HDFS 的稳定性明显提升。
Presto 搭配 Alluxio Local 的使用Presto 在执行计划阶段需要访问 HMS 获取表和分区的信息,HMS 的响应受单点mysql的吞吐影响,存在慢查询。同时,Presto 在构建 Split 以及读数据的情况下需要访问 HDFS。HDFS 作为底层存储对接了许多计算引擎,对 RPC 请求存在 Slow RPC 情况。
RaptorX 是 Prestodb 通过数据缓存进行查询加速的项目(https://prestodb.io/blog/2021/02/04/raptorx),对 HMS 元数据与 HDFS 数据源做了全方面缓存,能够很好地解决上面提到的慢查询等问题,其功能包括:
- Hive meta cache:Presto 访问 HMS 时,在 Coordinator 侧采用版本号的方式缓存 HMS 的元信息,在下一次获取 Hive Meta Cache 时拿 Coordinator 的版本号和 HMS 里面的版本号做匹配,判断是需要进行缓存的新数据;
- File List Cache:在 Presto中缓存 HDFS File 的元信息,避免长时间的 List Status 操作;
- Fragment Result Cache:在 Presto Worker 节点缓存部分查询结果,避免重复计算;
- Orc/Parquet Footer Cache:存储 Orc/Parquet 文件的 Meta 信息,比如 File Footer,提升缓存效率;
- Alluxio Data Cache:Page 级别的缓存,可以只缓存常访问的某些数据,减少缓存开销;
- Soft Affinity Scheduling:搭配 Alluxio Data Cache 使用,将同一个文件的 Split 分发到同一台 Worker 节点上,提高缓存命中率。
这是 Presto on Alluxio 的简图,将 Alluxio 嵌入到 Worker 的进程中,使它整体归于 Presto Cluster 管理,这样相对于集群模式更加轻量级。为了保证缓存的命中率,要使同一个文件的 Split 尽可能分到同一台 Worker 上,实现这一点有两种方式:一是基于 Hash&Mod 的方式,另外一种是基于一致性 Hash。Hash&Mod 方式,是通过 Hash 进行计算,算出 Split 分发到哪台 Worker 节点,但当 Worker 节点发生变动,比如有 Worker 突然挂了,这时所有的 Split 会发生偏移,分发到其他 Worker 上。 为了解决这个问题,社区推出了一致性 Hash 策略。在 Presto 场景中,把 Presto Worker 和 Split 哈希到 Hash 环上,然后选取方向,比如顺时针方向,离这些 Split 最近一台的 Worker 选为需要执行这些 Split 的 Worker。这样处理的好处是,当 Presto 节点发生变动的时候,只有原先分配到这台 Worker 上的 Split 需要重新分发到其他 Worker 节点上。此外还有一个需要改造的点,单台 Worker 会存在负载比较高的情况,因此引入虚拟节点的概念,将单台 Worker 映射成多台 Worker,比如说一台 Worker 映射成 3 台 Worker 分配到 Hash 环上,然后再重新进行 Split 分发,这样可以做到 Split 更加均匀的分发到不同节点上。将改造后的 Local 模式与之前的 Cluster 模式做对比,可以发现:
- Local 模式以 Jar 包的形式嵌入在 Presto 进程中,更加轻量,Cluster 模式需要单独维护一套 Alluxio 集群,有比较大的运维压力;
- Local 模式缓存粒度更加细,可以做到 Page 级别缓存,Cluster 是文件级别的缓存;
- Local 模式离计算节点更加近,Cluster 模式需要部署额外的机器资源。
3. 对 Presto Local Cache 的改造我们知道,当底层 HDFS 数据发生变动时,Alluxio 如果没有及时对新数据进行缓存,Presto 查询使用的就是旧数据,这会影响查询结果的准确性,对此团队分别对 Presto 端和 Alluxio 端进行改造,使 Local Cache 与底层数据保持一致性。基于文件的 LastModifiedTime 来改造。Presto 获取 HDFS 文件元信息的时候可以获取到文件的 LastModifiedTime,然后将文件相关信息封装到 Split 中,通过 Scheduler 把 Split 发送到不同的 Presto Worker 节点中,Presto Worker 节点收到之后将相应的 Split 信息封装到 HiveFileContext 的类中,最后在构建 PageSource 时,将 HiveFileContext 中的相应信息传到本地的文件系统。
OpenFile 方法不是一个标准的 Hadoop API,通过 HiveFileContext 来判断之后的逻辑是要走 Alluxio 还是走原先的代码逻辑,HiveFileContext 里面主要有这几个比较关键的参数:Cacheable:搭配 Soft Affinity Scheduling 使用,Soft Affinity Scheduling 会尽最大可能将文件 Split 分发到一台 Worker,当这台 Worker 负载达到它的 MaxSplit 阈值时,Soft Affinity 会把 Split 分发到其他 Worker,但被分发的 Worker 只暂时处理这个 Split,下次不会再处理这个 Split 了,这个时候将 Cacheable 设置为 False 状态。ModificatitonTime:Presto 从 HDFS 文件里面获取 ModificatitonTime 信息传到Alluxio 中,判断 Alluxio 缓存的是否为新的数据。Alluxio 社区实现了基本的缓存功能,但没有对过期的数据进行处理,因此,团队在Alluxio 端做了几点改造:
- 读数据时校验文件的 LastModifiedTime,通过 CacheManager 里的 Get 方法获取相应数据,比较从 Presto 端传来的 Context 中带有的文件 LastModifiedTime 和 Alluxio内存中所保存的 LastModifiedTime 匹配,比较是否一致,如果一致说明缓存的是比较新的数据;
- 构建内存数据结构,保存文件及时间信息,比如 Map 文件和相关的时间信息;
- 持久化文件信息,在 Alluxio Page Restore 过程中通过读 Metadata 文件将 LastModifiedTime 加载赋到内存里面,并可在 Restore 过程中恢复;
通常 Local Cache 启动时首先会根据路径遍历其下的 Page,将获取到的 Page 信息加载到内存中,但如果遇到 Page 数比较多的情况,该操作会很耗时。
Presto Worker 节点进行 Local Cache Restore 是它第一次 getFileSystem时。然而 Presto Worker 第一次调用 getFileSystem 是在处理 Page Source,此时 Local Data 还没有加载完毕,这时候处理会导致缓存命中率下降。因此需改造为 Presto Worker 启动后同步对 Local Data 进行加载。 (3)Local Cache 支持 HDFS 文件系统 社区的外部文件系统要求 Scheme 为 Alluxio 与WS,B 站线上环境的主要数据还是存在 HDFS 中,加上一些历史原因,HDFS 还有基于 Viewfs 的 Scheme,因此需要对Alluxio 代码改造,添加 HDFS 和 Viewfs 的 Scheme 信息。
单个 Disk 空间是有限的,没办法存储较多 Page,同时单磁盘还有 IO 的限制,社区提供的解决方案是通过 Hash&Mod 的方式来写入多磁盘,但是这个方式没有考虑磁盘容量的问题。因此团队借鉴了 HDFS 选择 Volume 的策略,通过可用空间的大小来做磁盘的选择。团队基于 AvailableSpace 来做磁盘选择改造(借鉴 HDFS)。假设有 5 个 Disk,容量分别为 1g、50g、25g、5g、30g,现在需要基于该策略往某个盘写数据:第一步,校验 5 个盘是否处于 Balanced,给定一个平衡态的阈值(默认 10g),用最大的容量减最小容量,如果小于这个阈值,则认为是处于平衡状态,直接 RoundRobin 进行选择;第二步,判断 Disk 的容量是否大于最小容量和平衡态阈值的总和,将 Disk 划分为HighAvail 与 LowAvail;第三步,给定一个概率值(平衡概率值默认为 75%),选择某列进行 RoundRobin,若数据大小超过 LowAvail 列最大值,则选择 HighAvail 进行轮询。 那么为什么不直接把数据都写到磁盘容量较高的那个 Disk 中呢?如果每次都将数据写到磁盘容量比较高的 Disk,磁盘就接受了很多的 I/O 就可能会发生磁盘 Hang 住的情况,这种时候就不适合再选择这个 Disk 写入 Page 了。在单并发场景下,开启 Local Cache 缓存可以减少 20% 左右的查询时间,相比之下,4 并发场景下有一定的性能损失,但从总体上来看,无论对于简单查询还是复杂查询开启 Local Cache 都能够获得一定的性能提升。
目前 Local Cache 已上线三个 Presto 集群,整体的 Presto 集群的缓存命中率达到40% 左右。
- 改进 Soft-Affinity,用 Path + Start 作为 Key 来 Hash,分散大文件分到单个 Worker Split 的压力
- 改进 Soft-Affinity 排除不开启 Cache 的节点
Q1:Presto SQL 和 Spark SQL 语法不同,如何降级处理?A1:对于用户提交过来的查询语句,首先对 SQL 进行分析,看 SQL 执行涉及到的数据量等,数据量比较小的作业优先分发到 Presto 上去执行,由于 Presto 与其他引擎语法存在兼容性问题,所以会使用 Coral 做层转换,如果 Presto 执行失败,就降级到 Spark 上执行原 SQL,如果 Spark 上再执行失败则降级到 Hive 上执行,如果都失败,查询会直接报错。Q2:Presto 使用的是 PrestoSQL 还是 Presto db?A2:目前是基于 PrestoSQL(社区叫法 Trino)的 330 版本来进行二次开发,刚刚提到的 RaptorX 的功能是 Presto db 内部研发出来的,所以在 Presto db 这个分支中是已经实现了 RaptorX 的一些基本功能。团队基于自己内部的 Presto 分支将其进行改造,实现了 RaptorX 的一些基本功能。Q3:目前 B 站走 Alluxio 表的标准是什么?A3:主要是上文提到的缓存策略服务,对 Presto Query 的血缘信息进行解析,给每个表和分区计算热度,热度计算公式是计算表或分区一周访问次数的均值是否高于某个给定的阈值,比如给定阈值 10,大于 10 就可以认为是比较热的表,然后对比较热的表打 Tag 进行标识,下一次 Presto 执行时通过 HMS 获取分区的 Tag 信息来判断是否走 Alluxio。