导读:Iceberg 是湖仓一体架构中常用的基础软件框架,已经在许多企业中得到应用。
本次分享将围绕 Iceberg 在华为终端云的探索实践,按照以下三个部分展开:
整体概述
特性应用
未来规划
分享嘉宾|李立伟 华为 大数据高级工程师
编辑整理|吕宴全 浙江大学
出品社区|DataFun
首先和大家交流下数据分析的演进趋势:
最初,数据分析主要集中在数据仓库如 Oracle 一体机等,以结构型数据为主,上层应用主要是 BI 报表;
之后,大数据框架开始成熟起来,这时就有一些非结构型数据出现,但结构型数据仍然是占据主流地位的。上层应用也拓展到了数据科学、机器学习和实时处理等。但这个阶段数据处理仍然是分成两个部分的,一部分数据在数据仓库中处理,一部分数据在大数据集群中处理,如 ETL 等;
直到最近,湖仓一体的架构开始兴起,我们可以将数据放到统一的存储里,用作整体分析。对于上层应用来说,能够享受到统一存储带来的方便。湖仓一体架构下有相当多的组件,终端云做了一些架构上的相关探索。
在传统的 Hive 解决方案里,我们将表定义为一个或者多个目录下的全部数据,每张表会包括多个目录,Hive 在目录级别对数据进行跟踪,这些目录信息被存储在 Hive 元数据中,分区信息也是通过路径来维护的。
这种方式的优点在于:几乎适用于所有的计算引擎,而且到目前为止已几乎成为事实上的标准;能够支持分区级别的原子性;与存储格式无关;提供了整个生态系统中的“元数据描述”。
而这种方式的缺点在于:即便微小的表结构、数据变更仍然效率低下;多分区修改时无法保证安全性,特别是在多个作业同时修改一份数据时无法保证安全性;列出大表的目录列表需要非常长的时间;用户必须了解表格的物理布局,才能写有出效的任务;Hive 的统计信息通常较少且过于陈旧,没有追踪最新的数据变化;在云对象存储上的性能比较差等。
为了解决传统目录级追踪的若干问题,一种形之有效的办法是将目录级别的数据追踪改成文件级别。与此同时,想要达成的目标还包括以下几点:
保证表的正确性与一致性;更快的 Planning 和 Execution;用户无需感知物理结构;支持表结构演进;在大规模数据上实现以上目标。最后一点目标至为重要,因为很多问题在小规模的数据集上无法暴露,或者无关紧要,而随着数据规模的暴涨,逐渐开始成为制约性能、体验的关键点。
2. Iceberg介绍
Iceberg 是一种 Table Format 的规范,它是一组 API 和库,可以用于各种计算引擎和遵循该规范的表进行交互。但要注意它并不是一种存储引擎,也不是一种计算引擎,更没有提供直接给业务使用的服务。
下面的数据流图展示了一种基于 Iceberg 进行数据处理的方式:数据从各种源端入湖之后经过分层处理,输出到目标端分析引擎进行分析。如果我们使用这种方式来完成统一的批、流开发的话,Iceberg 提供了几个优点:
能够实现存储层的批流统一,不需要两套存储和运算;通过中间层支持 OLAP 分析和实时化查询;支持 ACID 语义和高效的回溯;能够实现更低成本的存储。
但是引入这套流程也有一定的缺点,如果之前的业务流处理在 Kafka 等消息队列上,那么业务处理时效将由秒级别的实时处理降低到分钟级别的近实时,与此同时,我们需要对业务系统做一定程度的改造。
其实,仅仅使用 Iceberg 本身是远远不够的,一种方案是否可以商用,还涉及到方方面面,比如能否提供良好的业务使用体验和极低的维护成本,支持底层文件布局的自动优化和对接多种存储、计算引擎等。
所以,在 Iceberg 的基础上,我们构建了一整套的数据湖存储服务,此服务不仅可以提升任务的性能,加快业务对接速度,并且提供了可视化的湖内运维、监控等能力。
整个服务的基础是 Iceberg 提供的事件监听能力,因为我们内部使用HiveMetaStore 统一元数据,所以我们在 HiveMetastore 和 Iceberg 中间增加一个拦截层,对 DDL、Scan、Insert 等操作进行监听,这些数据将实时转发到我们的服务内。
分析上述监听数据的是多套规则引擎:服务规则引擎,用于提供生命周期处理、数据重分布、仓库压实和索引生成等功能;告警规则引擎,包括运维和安全两种,运维引擎主要用于统计 Iceberg 表的数据分布情况,安全规则引擎主要用于配置拦截、转发、权限管理等内容。
数据湖服务提供了基于 Iceberg 的生态支撑,使之更加易用。
1. 类 GIT 数据管理
提供基于 Iceberg 的类 GIT 式数据管理体验。在快照的线性历史之上,我们可以创建快照分支、快照标签,并使用它们进行数据读、写。类似于我们通过 GIT 来管理代码仓库,只不过这里我们管理的是数据。
我们可以周期的生成稳定、压实的快照,并且对重要时间点的历史快照进行标记,这样一些对过往时间数据的修改回填操作就可以在历史快照分支的基础上进行,而不影响当前分支的数据。对于一些探索性业务,如果修改表结构、配置实验数据的代价太大,开发者往往趋于保守,这不利于进行业务创新,而通过 GIT 式数据管理,我们可以创建实验分支,不再需要重新建新表并导入数据,实验分支上可以随意填入数据或者修改表结构以进行新业务测试,避免了对主数据和表结构的影响。当新业务稳定后,还可以把实验分支上的改动合并到主干分支,这就类似于 GIT 的 Merge、Cherry-Pick 等操作。
基于分支的管理可以允许企业基于安全合规要求等原因保留重要节点数据、进行更多实验性的探索。同时,我们可以通过 Iceberg 的增量存储等特性,将一些冗余存储全量数据的场景进行优化,还可以达到降低存储成本的目的。
以一个具体的场景为例:业务每天仅有若干新的数据流入,并对部分旧的数据进行更新,但因为要记录每天的历史快照,或一些其它安全等方面的考虑,每一个天分区都保存了一份全量数据,在这个分区里每天的变更数据和历史数据都存储在一起,所以产生大量的冗余,修改时也有较多合并操作,性能较差,此时开发者可利用类 GIT 的管理方式,并配合 Iceberg 批处理中提供的 Spark Merge Info 语义,只修改当天变更的数据(这会是一个典型的行级操作),在一天的数据处理完成后在当天数据之上创建一个天级分支,得益于 Iceberg 的增量存储的特性,在这个天级分区内,未修改的增量数据不会重复存储,极大的降低了表占用的空间。当需要查询或修改某一天的数据时,可以基于当天的分支快速进行时间旅行,非常方便。
这样做的好处是显而易见的,存储得到了降低,数据布局更加合理,如果配置了分支的 TTL,过期数据的删除也更加容易实现,不再像以前那样,过期数据冗余存于多个分区内部。
2. 实时化
Iceberg 本身是基于文件存储的,在文件存储层面上做到比较好的实时性是比较困难的,如果要让数据对下游业务更快可用可见,那么快照将以分钟甚至秒级来生成,这将导致非常多的快照和小文件,如果拉长快照的生成间隔,那么数据时效性就会变差,下游业务受到影响。
为了提升 Iceberg 整体的时效性,而又不产生过多的副作用,我们尝试在其基础上增加 LogStore 模块,它底层利用消息队列存储数据,并与本身的文件存储协同工作,以达到将端到端的时效降低到秒级的目的。
我们将文件存储和 Log Store 存储统一命名为 Iceberg Table。它为上层业务侧屏蔽了 Kafka、HDFS、OBS 等底层存储的差异,提供了统一的实时数据处理、存储方案,业务侧也不再需要为了批流运算对接不同的组件。
当创建 Iceberg Table 表时,数据可以实时进行写入,并对外提供三个读取模式:一是支持读取(历史)全量数据,这和之前读取文件存储内的数据没太多不同,用户将获取当前快照下的所有数据;二是流式读取增量数据,这部分将实时从 Iceberg Table 中获取变更的数据,它将数据从 LogStore 中读取出来,并支持 CDC;三是混合读取模式,先从底层文件存储中,把当前快照下的文件数据全量读取,再从 LogStore 中实时流式读取最近的数据。
为了实现实时化,有若干挑战点需要解决,比如:
更细粒度的快照冲突解决。在数据持续流式写入的同时,往往业务在 T+1 时刻会进行一些批式操作,此时我们不能仅仅使用快照 ID 进行冲突判定以重试提交,它会产生非常多的提交异常。因为实时写入的数据与批式写入的数据往往隶属于不同的分区甚至分支,所以此处采用了更细粒度的判断,只要写入任务不发生分支\分区冲突,就允许任务重新拿到最新快照 ID 进行提交,极大的避免了冲突的产生,使数据写入更加稳定。而在表存在主键的场景下,可以开启基于主键的行级冲突检测,即通过主键索引判断此数据是否同时进行了修改,但因为这种方式会频繁比较数据,过于昂贵,在测试中极为影响吞吐,所以并不推荐开启。
容灾。当数据写入失败时,数据需要同时从流式 LogStro 与文件存储进行恢复,这就对容灾提出了更高的要求,两者需要更好的配合,才能避免数据丢失、重复的问题。
3. 加速层
利用 Alluxio 或者其他内存加速技术,可以打造一个数据湖加速层,通过对近期和远期的数据进行冷/热分层,放到不同的存储中,能够对查询进行加速,并且降低集群的压力。
通过分层,冷、热数据按照各自的优先级被放置到不同的存储中。
4. Flink Unify Sink
我们基于 Flink 的 FLIP-143 和 FLIP-191 对 Iceberg 的 Flink Sink 做了重构,使 Sink 可以在任务提交之后生成索引信息,用于查询加速。在 Unify Sink 中还提供了小文件合并能力,用于实时地对小文件进行合并(当没有开启 Log Store 的情况下。
5. 其他增强举例
在 Iceberg 里对二级索引的实现还不是很完善,但是很多场景下对主键索引等是有要求的,比如使用 Iceberg 表作为关联表进行 Lookup 查询时,如果没有主键索引,查询代价就会很高;在一些需要根据主键查询出旧数据以判断是否需要进行更新的流式场景下,索引也是必要条件之一。
Iceberg 提供的快照管理、数据重写等功能,在 Spark Procedure 上已经有了比较多的开箱即用的语法,但是在 Flink 方面是缺失的,所以还要补齐 Flink 的相关支持,避免多引擎切换,这部分涉及到 Flink 与 Iceberg 两边的协同工作。
03
我们对未来的规划主要有以下三个方向:
更快速
使用包括索引等更多手段,强化读写性能,尤其是探索如何在实时的场景下提高实效性。
更丰富
在广告、推荐和特征工程等场景有更多使用,不同的场景对 Iceberg 的诉求是不同的,例如特征工程对部分列更新的诉求比较强烈,而目前 Iceberg 的 MOR 和 COW 模式都是针对行数据设计的,并无法满足诉求,而且 Flink Upsert 在没有部分列更新的能力前,也必须将全部列重新插入,这个消耗也是比较大的。
更易用
以产品化的角度,简化使用和运维等多个操作,不需要用户直接调用 Iceberg提供的 API,就能够完成快照管理和文件治理的目标。
最后是华为终端云和 Iceberg 社区的一些联系方式,欢迎大家加入我们。
04
A1: 当存在删除操作时,无论是业务需要高吞吐的写入还是高时效的查询,我们都推荐用户使用 Merge On Read 的模式,因为数据湖服务提供了异步数据压实的能力,可以对删除文件、小文件进行合并重写,并不会遗留大量的待合并删除文件,查询速度得以优化,所以 COW 的优势在 MOR 下也同样有效,唯一需要留意的点是,在读取的时候对于没有被后台服务压实过的部分数据,可能仍然会有性能损耗。
Q2:LogStore 里的数据在全量查询的时候也会被查询到吗?
A2:LogStore 里的数据会有一个生命周期,默认是三天,在全量读取的时候需要从 HDFS 上读取这之前的历史数据,再从 LogStore 中读取这个生命周期里的增量数据部分,如果数据已全部写入文件存储,则不需要从 LogStore 查询。
Q3:今天分享的特性有在商业上使用吗,或者有没有应用到生产环境?
A3:类 GIT 式数据管理和 Flink Unify Sink 的功能都有在商业上应用了,而实时化、查询加速等功能还在小批量范围的测试验证。
Q4:冷/热数据的标签是怎么判断的?
A4:根据用户在建表的时候指定的,用户在建表时可以指定某个时间段或者多少个分区周期的数据是热数据,过期数据将从热存储转移到冷存储介质。
Q5:如何支持按列(部分列)更新?
A5:我们会加入一种按列(部分列)插入的文件类型,在读取到这个文件时,对相同列值的数据进行聚合更新。
Q6:展开说一下 Iceberg 怎么对整列(部分列)进行添加删除?
A6:新构造的列(部分列)更新文件和 Iceberg 本身的按位置、按值删除行的文件类似,但它还包含了数据的类主键信息以指明需要更新的列。读取到部分列更新文件后会根据这个类主键值进行匹配,对匹配到的数据再进行部分列更新。