如何基于 Apache Doris 与 Apache Flink 快速构建极速易用的实时数仓时间:2021-11-04 01:04:05随着大数据应用的不断深入,企业不再满足离线数据加工计算的时效,实时数据需求已成为数据应用新常态。伴随着实时分析需求的不断膨胀,传统的数据架构面临的成本高、实时性无法保证、组件繁冗、运维难度高等问题日益凸显。为了适应业务快速迭代的特点,帮助企业提升数据生产和应用的时效性、进一步挖掘实时数据价值,实时数仓的构建至关重要。 本文将分享如何基于 Apache Doris 和 Apache Flink 快速构建一个极速易用的实时数仓,包括数据同步、数据集成、数仓分层、数据更新、性能提升等方面的具体应用方案,在这之前,我们先可以先了解一下传统的数据架构如何设计的、又存在哪些痛点问题。 # **# 实时数仓的需求与挑战** ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d867393e45be425191b067aea5360d38~tplv-k3u1fbpfcp-zoom-1.image) 上图所示为传统的数据架构,如果我们**从数据流的⻆度分析传统的数据处理架构**,会发现从源端采集到的业务数据和日志数据主要会分为实时和离线两条链路: - 在实时数据部分,通过 Binlog 的⽅式,将业务数据库中的数据变更 (CDC,Change Data Capture)采集到实时数仓。同时,通过 Flume-Kafka-Sink 对日志数据进⾏实时采集。当不同来源的数据都采集到实时存储系统后,便可以基于实时存储系统来构建实时数仓。在实时数仓的内部,我们仍然会遵守传统数仓分层理论,将数据分为 ODS 层、DWD 层、DWS 层、 ADS 层以实现模型复用。 - 在离线数据部分,通过 DataX 定时同步的⽅式,批量同步业务库 RDS 中的数据。当不同来源的数据进⼊到离线数仓后,便可以在离线数仓内部,依赖 Spark SQL 或 Hive SQL 对数据进⾏定时处理,分离出不同层级 (ODS 、DWD 、ADS 等)的数据,并将这些数据存在⼀个存储介质上,⼀般会采用如 HDFS 的分布式文件系统或者 S3 对象存储上。通过这样的⽅式,离线数仓便构建起来了。与此同时,为了保障数据的⼀致性,通常需要数据清洗任务使⽤离线数据对实时数据进⾏清洗或定期覆盖,保障数据最终的⼀致性。 **从技术架构的⻆度对传统数据技术栈进行分析**,我们同样会发现,为了迎合不同场景的需求,往往会采用多种技术栈,例如在湖仓部分通常使用的是 Hive 、Iceberg 、Hudi 等数据湖;面向湖上数据的 Ad-hoc 查询一般选择 Impala 或 Presto;对于 OLAP 场景的多维分析,一般使⽤ Doris 或 Kylin、 Druid。除此之外,为应对半结构化数据的分析需求,例如日志分析与检索场景,通常会使⽤ ES 进行分析;面对高并发点查询的 Data Serving 场景会使⽤ HBase;在某些场景下可能还需要对外提供统⼀的数据服务,这时可能会使⽤基于 Presto/Trino 的查询⽹关,对⽤户提供统一查询服务。其中涉及到的数据组件有数十种,高昂的使用成本和组件间兼容、维护及扩展带来的繁重压力成为企业必须要面临的问题。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/8d507d4bbfa04729ae06090d2bb6f49c~tplv-k3u1fbpfcp-zoom-1.image) 从上述介绍即可知道,传统的数据架构存在几个核心的痛点问题: - 传统数据架构组件繁多,维护复杂,运维难度非常高。 - 计算、存储和研发成本都较高,与行业降本提效的趋势背道而驰。 - 同时维护两套数据仓库(实时数仓和离线数仓)和两套计算(实时数据量和实时计算任务),数据时效性与一致性无法保证。 在此背景下,**我们亟需⼀个“极速、易用、统一、实时”的数据架构来解决这些问题**: - 极速:更快的查询速度,最大化提升业务分析人员的效率; - 易用:对于用户侧的使用和运维侧的管控都提供了极简的使用体验; - 统⼀:异构数据与分析场景的统一,半结构化和结构化数据可以统⼀存储,多分析场景可以统一技术栈; - 实时:端到端的高时效性保证,发挥实时数据的价值。 # **# 如何构建极速易用的实时数仓架构** 基于以上的需求,我们采取 Apache Doris 和 Apache Flink 来构建极速易用的实时数仓,具体架构如下图所示。多种数据源的数据经过 Flink CDC 集成或 Flink Job 加⼯处理后,⼊库到 Doris 或者 Hive/Iceberg 等湖仓中,最终基于 Doris 提供统⼀的查询服务。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ff7fb44a13da46e5ab3b9fca783a6dca~tplv-k3u1fbpfcp-zoom-1.image) 在数据同步上,通过 Flink CDC 将 RDS 的数据实时同步到 Doris;通过 Routine Load 将 Kafka 等消息系统中的数据实时同步到 Doris 。在数仓分层上,ODS 层通常选择使用明细模型构建,DWD 层可以通过 SQL 调度任务对 ODS 数据抽取并获取,DWS 和 ADS 层则可以通过 Rollup 和物化视图进行构建。在数据湖上, Doris ⽀持为 Hive、Iceberg 、Hudi 以及Delta Lake(todo)提供联邦分析和湖仓加速的能⼒。在数据应用上,Apache Doris 既可以承载批量数据加工处理的需求,也可以承载高吞吐的 Adhoc 和高并发点查询等多种应⽤场景。 # **# 解决方案** ### **如何实现数据的增量与全量同步** **1. 增量及全量数据同步** 在全量数据和增量的同步上,我们采取了 Flink CDC 来实现。其原理非常简单,Flink CDC 实现了基于 Snapshot 的全量数据同步、基于 BinLog 的实时增量数据同步,全量数据同步和增量数据同步可以⾃动切换,因此我们在数据迁移的过程中,只需要配置好同步的表即可。当 Flink 任务启动时,优先进⾏历史表的数据同步,同步完后⾃动切换成实时同步。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/cc9d16b0c8c14571a19f0283f362c821~tplv-k3u1fbpfcp-zoom-1.image) **2. 数据一致性保证** 如何保证数据一致性是大家重点关注的问题之一,那么在新架构是如何实现的呢? > 数据⼀致性⼀般分为“最多⼀次” 、“⾄少⼀次”和“精 确⼀次”三种模型。 > > - 最多⼀次(At-Most-Once):发送⽅仅发送消息,不期待任何回复。在这种模型中,数据的⽣产和消费过程中可能出现数据丢失的问题。 > - ⾄少⼀次(At-Least-Once):发送⽅不断重试,直到对⽅收到为⽌。在这个模型中,⽣产和消费过程都可能出现数据重复。 > - 精 确⼀次(Exactly-Once):能够保证消息只被严格发送⼀次,并且只被严格处理⼀次。这种数据模型能够严格保证数据⽣产和消费过程中的准确⼀致性。 > - Flink CDC 通过 Flink Checkpoint 机制结合 Doris 两阶段提交可以实现端到端的 Exactly Once 语义。具体过程分为四步: - 事务开启(Flink Job 启动及 Doris 事务开启):当 Flink 任务启动后, Doris 的 Sink 会发起 Precommit 请求,随后开启写⼊事务。 - 数据传输(Flink Job 的运⾏和数据传输):在 Flink Job 运⾏过程中, Doris Sink 不断从上游算⼦获取数据,并通过 HTTP Chunked 的⽅式持续将数据传输到 Doris。 - 事务预提交:当 Flink 开始进⾏ Checkpoint 时,Flink 会发起 Checkpoint 请求,此时 Flink 各个算⼦会进⾏ Barrier 对⻬和快照保存,Doris Sink 发出停⽌ Stream Load 写⼊的请求,并发起⼀个事务提交请求到 Doris。这步完成后,这批数据已经完全写⼊ Doris BE 中,但在 BE 没有进⾏数据发布前对⽤户是不可⻅的。 - 事务提交:当 Flink 的 Checkpoint 完成之后,将通知各个算⼦,Doris 发起⼀次事务提交到 Doris BE ,BE 对此次写⼊的数据进⾏发布,最终完成数据流的写⼊。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1734b546703741a68245f72811f27c3b~tplv-k3u1fbpfcp-zoom-1.image) 综上可知,我们利用 Flink CDC 结合 Doris 两阶段事务提交保证了数据写入一致性。需要注意的是,在该过程中可能遇到一个问题:如果事务预提交成功、但 Flink Checkpoint 失败了该怎么办?针对该问题,Doris 内部支持对写⼊数据进⾏回滚(Rollback),从⽽保证数据最终的⼀致性。 **3. DDL 和 DML 同步** 随着业务的发展,部分⽤户可能存在 RDS Schema 的变更需求。当 RDS 表结构变更时,⽤户期望 Flink CDC 不但能够将数据变化同步到 Doris,也希望将 RDS 表结构的变更同步到 Doris,⽤户则无需担⼼ RDS 表结构和 Doris 表结构不⼀致的问题。 **Light Schema Change** 目前,[Apache Doris 1.2.0 已经实现了 Light Schema Change 功能](http://mp.weixin.qq.com/s?__biz=Mzg3Njc2NDAwOA==&mid=2247509133&idx=1&sn=a1c5a66404c0ceb0cd3015a4f2e13b9a&chksm=cf2faa8af858239c761a46e4f729191cce45cd8cd0e5e1dfd78e8a7263f32661814c590306af&scene=21#wechat_redirect),可满⾜ DDL 同步需求,快速⽀持 Schema 的变更。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7256b03cb46b4b37b37376feb0539d00~tplv-k3u1fbpfcp-zoom-1.image) Light Schema Change 的实现原理也比较简单,对数据表的加减列操作,不再需要同步更改数据文件,仅需在 FE 中更新元数据即可,从而实现毫秒级的 Schema Change 操作,且存在导入任务时效率的提升更为显著。在这个过程中,由于 Light Schema Change 只修改了 FE 的元数据,并没有同步给 BE。因此会产⽣ BE 和 FE Schema 不⼀致的问题。为了解决这种问题,我们对 BE 的写出流程进⾏了修改,具体包含三个⽅⾯。 - 数据写⼊:FE 会将 Schema 持久化到元数据中,当 FE 发起导⼊任务时,会把最新的 Schema 一起发给 Doris BE,BE 根据最新的 Schema 对数据进⾏写⼊,并与 RowSet 进⾏绑定。将该 Schema 持久化到 RowSet 的元数据中,实现了数据的各⾃解析,解决了写⼊过程中 Schema 不⼀致的问题。 - 数据读取:FE ⽣成查询计划时,会把最新的 Schema 附在其中⼀起发送给 BE,BE 拿到最新的 Schema 后对数据进⾏读取,解决读取过程中 Schema 发⽣不⼀致的问题。 - 数据 Compaction:当数据进⾏ Compaction 时,我们选取需要进⾏ Compaction 的 RowSet 中最新的 Schema 作为之后 RowSet 对应的 Schema,以此解决不同 Schema 上 RowSet 的合并问题。 经过对 Light Schema Change 写出流程的优化后, 单个 Schema Chang 从 **310 毫秒降低到了 7 毫秒**,整体性能有**近百倍的提升**,彻底的解决了海量数据的 Schema Change 变化难的问题。 **Flink CDC DML 和 DDL 同步** 有了 Light Schema Change 的保证, Flink CDC 能够同时⽀持 DML 和 DDL 的数据同步。那么是如何实现的呢? ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f8aabf72a8ff43cda130c51a4aaa6980~tplv-k3u1fbpfcp-zoom-1.image) - 开启 DDL 变更配置:在 Flink CDC 的 MySQL Source 侧开启同步 MySQL DDL 的变更配置,在 Doris 侧识别 DDL 的数据变更,并对其进⾏解析。 - 识别及校验:当 Doris Sink 发现 DDL 语句后,Doris Sink 会对表结构进⾏验证,验证其是否⽀持 Light Schema Change。 - 发起 Schema Change :当表结构验证通过后,Doris Sink 发起 Schema Change 请求到 Doris,从⽽完成此次 Schema Change 的变化。 解决了数据同步过程中源数据⼀致性的保证、全量数据和增量数据的同步以及 DDL 数据的变更后,一个完整的数据同步⽅案就基本形成了。 ### **如何基于 Flink 实现多种数据集成** ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/886352082b324c0786840dc0e039ba60~tplv-k3u1fbpfcp-zoom-1.image) 除了上文中所提及的基于 Flink CDC 进行数据增量/全量同步外,我们还可以基于 Flink Job 和 Doris 来构建多种不同的数据集成方式: - 将 MySQL 中两个表的数据同步到 Flink 后,在 Flink 内部进⾏多流 Join 完成数据打宽,后将⼤宽表同步到 Doris 中。 - 对上游的 Kafka 数据进⾏清洗,在 Flink Job 完成清洗后通过 Doris-Sink 写⼊ Doris 中。 - 将 MySQL 数据和 Kafka 数据在 Flink 内部进⾏多流 Join,将 Join 后的宽表结果写⼊ Doris中。 - 在 Doris 侧预先创建宽表,将上游 RDS 中的数据根据 Key 写入, 使⽤ Doris 的部分列更新将多列数据分别写⼊到 Doris 的⼤宽表中。 ### **如何选择数据模型** Apache Doris 针对不同场景,提供了不同的数据模型,分别为聚合模型、主键模型、明细模型。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c8e0f65bb61e4982b743fa400a3568c1~tplv-k3u1fbpfcp-zoom-1.image) **AGGREGATE 聚合模型** 在企业实际业务中有很多需要对数据进行统计和汇总操作的场景,如需要分析网站和 APP 访问流量、统计用户的访问总时长、访问总次数,或者像厂商需要为广告主提供广告点击的总流量、展示总量、消费统计等指标。在这些不需要召回明细数据的场景,通常可以使用聚合模型,比如上图中需要根据门店 ID 和时间对每个门店的销售额实时进行统计。 **UNIQUE KEY 主键模型** 在某些场景下用户对数据更新和数据全局唯 一性有去重的需求,通常使用 UNIQUE KEY 模型。在 UNIQUE 模型中,会根据表中的主键进⾏ Upsert 操作:对于已有的主键做 Update 操作,更新 value 列,没有的主键做 Insert 操作,比如图中我们以订单id为唯 一主键,对订单上的其他数据(时间和状态)进行更新。 **DUPLICATE 明细模型** 在某些多维分析场景下,数据既没有主键,也没有聚合需求,Duplicate 数据模型可以满足这类需求。明细模型主要用于需要保留原始数据的场景,如日志分析,用户行为分析等场景。明细模型适合任意维度的 Ad-hoc 查询。虽然同样无法利用预聚合的特性,但是不受聚合模型的约束,可以发挥列存模型的优势(只读取相关列,而不需要读取所有 Key 列)。 ### **如何构建数仓分层** 由于数据量级普遍较大,如果直接查询数仓中的原始数据,需要访问的表数量和底层文件的数量都较多,体现在日常工作中就是 SQL 异常复杂、计算耗时增高。而分层要做的就是对原始数据重新做归纳整理,在不同层级对数据或者指标做不同粒度的抽象,通过复用数据模型来简化数据管理压力,利用血缘关系来定位数据链路的异常,同时进一步提升数据分析的效率。在 Apache Doris 可以通过以下多种思路来构建数据仓库分层: **微批调度** 通过 INSERT INTO SELECT 可以将原始表的数据进行处理和过滤并写入到目标表中,这种 SQL 抽取数据的行为一般是以微批形式进行(例如 15 分钟一次的 ETL 计算任务),通常发生在从 ODS 到 DWD 层数据的抽取过程中,因此需要借助外部的调度工具例如 DolphinScheduler 或 Airflow 等来对 ETL SQL 进行调度。 **Rollup 与物化视图** 物化视图本质是一个预先计算的过程。我们可以在 Base 表上,创建不同的 Rollup 或者物化视图来对 Base 表进行聚合计算。通常在明细层到汇总层(例如 DWD 层到 DWS 层或从 DWS 层到 ADS 层)的汇聚过程中可以使用物化视图,以此实现指标的高度聚合。同时物化视图的计算是实时进行的,因此站在计算的角度也可以将物化视图理解为一个单表上的实时计算过程。 **多表物化视图** Apache Doris 2.0 将实现多表物化视图这一功能,可以将带有 Join 的查询结果固化以供用户直接查询,支持定时自动或手动触发的方式进行全量更新查询结果,未来还将进一步支持更加完善的自动增量刷新。基于多表物化视图这一功能的实现,我们可以做更复杂的数据流处理,比如数据源侧有 TableA、TableB、TableC,在多表物化视图的情况下,用户就可以将 TableA 和 TableB 的数据进行实时Join 计算后物化到 MV1 中。在这个角度上来看,多表物化视图更像一个多流数据实时 Join 的过程。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/db0a04bd858f48b391df4118006bc4ec~tplv-k3u1fbpfcp-zoom-1.image) ### **如何应对数据更新** 在实时数据仓库构建的过程中,还需要面临高并发写入和实时更新的挑战。如何在亿级数据中快速找到需要更新的数据,并对其进⾏更新,⼀直都是⼤数据领域不断追寻的答案。 **1. 高并发数据更新** 在 Apache Doris 中通过 Unique Key 模型来满足数据更新的需求,同时通过 MVCC 多版本并发机制来实现数据的读写隔离。当新数据写入时,如果不存在相同 Key 的数据则会直接写⼊;如果有相同 Key 的数据则增加版本,此时数据将以多个版本的形式存在。后台会启动异步的 Compaction 进程对历史版本数据进⾏清理,当⽤户在查询时 Doris 会将最新版本对应的数据返回给⽤户,这种设计解决了海量数据的更新问题。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/215634a775a745a3b7b9c35e8b50f54e~tplv-k3u1fbpfcp-zoom-1.image) **在 Doris 中提供了 Merge-on-Read 和 Merge-on-Write 两种数据更新模式。** ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6a91a561a8a24420a5c996d4008020f2~tplv-k3u1fbpfcp-zoom-1.image) 在此我们以订单数据的写入为例介绍 Merge-on-Read 的数据写入与查询流程,三条订单数据均以 Append 的形式写⼊ Doris 表中: - 数据 Insert:首先我们写入 ID 为 1,2,3 的三条数据; - 数据 Update:当我们将订单 1 的 Cost 更新为 30 时,其实是写⼊⼀条 ID 为 1,Cost 为 30 的新版本数据,数据通过 Append 的形式写⼊ Doris; - 数据 Delete:当我们对订单 2 的数据进⾏删除时,仍然通过 Append ⽅式,将数据多版本写⼊ Doris ,并将 _DORIS_DELETE_SIGN 字段变为 1 ,则表示这条数据被删除了。当 Doris 读取数据时,发现最新版本的数据被标记删除,就会将该数据从查询结果中进⾏过滤。 Merge-on-Read 的特点是写⼊速度比较快,但是在数据读取过程中由于需要进⾏多路归并排序,存在着大量非必要的 CPU 计算资源消耗和 IO 开销。 因此在 1.2.0 版本中,[Apache Doris 在原有的 Unique Key 数据模型上增加了 Merge-on-Write 的数据更新模式](http://mp.weixin.qq.com/s?__biz=Mzg3Njc2NDAwOA==&mid=2247512201&idx=1&sn=13e743781165eb3b28cef0a98aa09902&chksm=cf2fbe8ef858379879dcb3769285759c74515b6a7d069cea8e445c5200e053e4da44e8ff4b17&scene=21#wechat_redirect)。Merge-on-Write 兼顾了写入和查询性能。在写⼊的过程中引⼊了 Delete Bitmap 数据结构,使⽤ Delete Bitmap 标记 RowSet 中某⼀⾏是否被删除,为了保持 Unique Key 原有的语义, Delete Bitmap 也⽀持多版本。另外使⽤了兼顾性能和存储空间的 Row Bitmap,将 Bitmap 中的 MemTable ⼀起存储在 BE 中,每个 Segment 会对应⼀个 Bitmap。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/82225810487a4bf0a7d2e75b8d744257~tplv-k3u1fbpfcp-zoom-1.image) - **写入流程:** - DeltaWriter 先将数据 Flush 到磁盘 - 批量检查所有 Key,在点查过程中经过区间树,查找到对应的 RowSet。 - 在 RowSet 内部通过 BloomFilter 和 index 进行⾼效查询。 当查询到 Key 对应的 RowSet 后,便会覆盖 RowSet Key 对应的 Bitmap,接着在 Publish 阶段更新 Bitmap,从⽽保证批量点查 Key 和更新 Bitmap 期间不会有新的可⻅ RowSet,以保证 Bitmap 在更新过程中数据的正确性。除此之外,如果某个 Segment 没有被修改,则不会有对应版本的 Bitmap 记录。 - **查询流程:** - 当我们查询某⼀版本数据时, Doris 会从 LRU Cache Delete Bitmap 中查找该版本对应的缓存。 - 如果缓存不存在,再去 RowSet 中读取对应的 Bitmap。 - 使⽤ Delete Bitmap 对 RowSet 中的数据进⾏过滤,将结果返回。 该模式不需要在读取的时候通过归并排序来对主键进行去重,这对于高频写入的场景来说,大大减少了查询执行时的额外消耗。此外还能够支持谓词下推,并能够很好利用 Doris 丰富的索引,在数据 IO 层面就能够进行充分的数据裁剪,大大减少数据的读取量和计算量,因此在很多场景的查询中都有非常明显的性能提升。在真实场景的测试中,**通过 Merge-on-Write 可以在保证数万 QPS 的高频 Upset 操作的同时实现性能 3-10 倍的提升。** **2. 部分列更新** 部分列更新是一个比较普遍的需求,例如广告业务中需要在不同的时间点对同一个广告行为(展示、点击、转换等)数据的更新。这时可以通过 Aggregate Key 模型的`replace_if_not_null`实现。具体建表语句如下: ```sql CREATE TABLE IF NOT EXISTS request_log ( `session_id` LARGEINT NOT NULL COMMENT "id", `imp_time` DATE REPLACE_IF_NOT_NULL COMMENT "展示", #展示数据更新 `imp_data` VARCHAR(20) REPLACE_IF_NOT_NULL COMMENT "", `click_time` DATE REPLACE_IF_NOT_NULL COMMENT "点击",#点击数据更新 `click_data` VARCHAR(20) REPLACE_IF_NOT_NULL COMMENT "", `conv_time` DATE REPLACE_IF_NOT_NULL COMMENT "转化",#转换数据更新 `conv_data` VARCHAR(20) REPLACE_IF_NOT_NULL COMMENT "" ) AGGREGATE KEY(`session_id`) DISTRIBUTED BY HASH(`session_id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" ); ``` 具体更新过程如下: (1)更新展示数据 ```sql mysql> insert into request_log(session_id,imp_time,imp_data)VALUES(1,'2022-12-20','imp'); Query OK, 1 row affected (0.05 sec) {'label':'insert_31a037849e2748f6_9b00b852d106eaaa', 'status':'VISIBLE', 'txnId':'385642'} mysql> select * from request_log; +------------+------------+----------+------------+------------+-----------+-----------+ | session_id | imp_time | imp_data | click_time | click_data | conv_time | conv_data | +------------+------------+----------+------------+------------+-----------+-----------+ | 1 | 2022-12-20 | imp | NULL | NULL | NULL | NULL | +------------+------------+----------+------------+------------+-----------+-----------+ 1 row in set (0.01 sec) ``` (2)更新点击数据 ```sql mysql> insert into request_log(session_id,imp_time,imp_data)VALUES(1,'2022-12-20','imp'); Query OK, 1 row affected (0.05 sec) {'label':'insert_31a037849e2748f6_9b00b852d106eaaa', 'status':'VISIBLE', 'txnId':'385642'} mysql> select * from request_log; +------------+------------+----------+------------+------------+-----------+-----------+ | session_id | imp_time | imp_data | click_time | click_data | conv_time | conv_data | +------------+------------+----------+------------+------------+-----------+-----------+ | 1 | 2022-12-20 | imp | NULL | NULL | NULL | NULL | +------------+------------+----------+------------+------------+-----------+-----------+ 1 row in set (0.01 sec) ``` (3)更新转化数据 ```sql ysql> insert into request_log(session_id,click_time,click_data)VALUES(1,'2022-12-21','click'); Query OK, 1 row affected (0.03 sec) {'label':'insert_2649087d8dc046bd_a39d367af1f93ab0', 'status':'VISIBLE', 'txnId':'385667'} mysql> select * from request_log; +------------+------------+----------+------------+------------+-----------+-----------+ | session_id | imp_time | imp_data | click_time | click_data | conv_time | conv_data | +------------+------------+----------+------------+------------+-----------+-----------+ | 1 | 2022-12-20 | imp | 2022-12-21 | click | NULL | NULL | +------------+------------+----------+------------+------------+-----------+-----------+ 1 row in set (0.01 sec) mysql> ``` 同时部分列更新还可用于支持画像场景的宽表列实时更新。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/50a1fb0a20a94b659413e58dce93509a~tplv-k3u1fbpfcp-zoom-1.image) 另外值得期待的是 Apache Doris 的 Unique Key 模型也即将实现部分列更新的功能,可以通过 Apache Doris GitHub 代码仓库及官网,关注新版本或新功能的发布(相关地址可下滑至文章底部获取)。 ### 如何进一步提升查询性能 **1. 智能物化视图** 物化视图除了可以作为高度聚合的汇总层外,其更广泛的定位是加速相对固定的聚合分析场景。物化视图是指根据预定义的 SQL 分析语句执⾏预计算,并将计算结果持久化到另一张对用户透明但有实际存储的表中,在需要同时查询聚合数据和明细数据以及匹配不同前缀索引的场景,命中物化视图时可以获得更快的查询性能。在使用物化视图时需要建⽴ Base 表并基于此建⽴物化视图,同⼀张 Base 表可以构建多个不同的物化视图,从不同的维度进⾏统计。物化视图在查询过程中提供了智能路由选择的能力,如果数据在物化视图中存在会直接查询物化视图,如果在物化视图中不存在才会查询 Base 表。对于数据写入或更新时,数据会在写入 Base 表的同时写入物化视图,从⽽让 Doris 保证物化视图和 Base 表数据的完全⼀致性。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/68df972f6b7a4befa64247d961245402~tplv-k3u1fbpfcp-zoom-1.image) 智能路由选择遵循最⼩匹配原则,只有查询的数据集⽐物化视图集合⼩时,才可能⾛物化视图。如上图所示智能选择过程包括选择最优和查询改写两个部分: **选择最优** - 在过滤候选集过程中,被执行的 SQL 语句通过 Where 条件进⾏判断,Where 条件为advertiser=1。由此可⻅,物化视图和 Base 表都有该字段,这时的选集是物化视图和 Base 表。 - Group By 计算,Group By 字段是 advertiser 和 channel,这两个字段同时在物化视图和 Base 表中。这时过滤的候选集仍然是物化视图和 Base 表。 - 过滤计算函数,⽐如执⾏ count(distinctuser_id),然后对数据进⾏计算,由于 Count Distinct 的字段 user_id 在物化视图和 Base 表中都存在,因此过滤结果仍是物化视图和 Base 表。 - 选择最优,通过⼀系列计算,我们发现查询条件⽆论是 Where 、Group By 还是 Agg Function 关联的字段,结果都有 Base 表和物化视图,因此需要进⾏最优选择。Doris 经过计算发现 Base 表的数据远⼤于物化视图,即物化视图的数据更⼩。 由此过程可⻅,如果通过物化视图进行查询,查询效率更⾼。当我们找到最优查询计划,就可以进⾏⼦查询改写,将 Count Distinct 改写成 Bitmap ,从⽽完成物化视图的智能路由。完成智能路由之后,我们会将 Doris ⽣成的查询 SQL 发送到 BE 进⾏分布式查询计算。 **2. 分区分桶裁剪** Doris 数据分为两级分区存储, 第一层为分区(Partition),目前支持 RANGE 分区和 LIST 分区两种类型, 第二层为 HASH 分桶(Bucket)。我们可以按照时间对数据进⾏分区,再按照分桶列将⼀个分区的数据进行 Hash 分到不同的桶⾥。在查询时则可以通过分区分桶裁剪来快速定位数据,加速查询性能的同时实现高并发。 **3. 索引查询加速** 除了分区分桶裁剪, 还可以通过存储层索引来裁剪需要读取的数据量,仅以加速查询: - 前缀索引:在排序的基础上快速定位数据 - Zone Map 索引:维护列中 min/max/null 信息 - Bitmap 索引:通过 Bitmap 加速去重、交并查询 - Bloom Filter 索引:快速判断元素是否属于集合; - Invert 倒排索引:支持字符串类型的全文检索; **4. 执行层查询加速** 同时 Apache Doris 的 MPP 查询框架、向量化执行引擎以及查询优化器也提供了许多性能优化方式,在此仅列出部分、不做详细展开: - 算子下推:Limit、谓词过滤等算子下推到存储层; - 向量化引擎:基于 SIMD 指令集优化,充分释放 CPU 计算能力; - Join 优化:Bucket Shuffle Join、Colocate Join 以及 Runtime Filter 等; # 行业最 佳实践 截止目前,Apache Doris 在全球范围内企业用户规模已超过 **1500 家**,广泛应用于数十个行业中。在用户行为分析、AB 实验平台、日志检索分析、用户画像分析、订单分析等方向均有着丰富的应用。在此我们列出了几个基于 Doris 构建实时数据仓库的真实案例作为参考: ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/287272f50a614a9cbb96798bc1cf211d~tplv-k3u1fbpfcp-zoom-1.image) 第 1 个案例是较为典型的基于 Doris 构建实时数仓,下层数据源来自 RDS 业务库、⽂件系统数据以及埋点日志数据。在数据接⼊过程中通过 DataX 进⾏离线数据同步以及通过 Flink CDC 进⾏实时数据同步,在 Doris 内部构建不同的数据分层;最后在上层构建不同的数据应⽤,⽐如⾃助报表、⾃助数据抽取、数据⼤屏。除此之外,它还结合了⾃⼰的应⽤平台构建了数据开发与治理平台,完成了源数据管理、数据分析等操作。 **使用收益:** - 业务计算耗时从之前的两⼩时降低到三分钟。 - 全链路的更新报表的时间从周级别更新到⼗分钟级别。 - Doris ⾼度兼容 MySQL,报表迁移无压力,开发周期从周级别降至⾄天级别。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d75a5d8876ae40599ca733894ec36298~tplv-k3u1fbpfcp-zoom-1.image) 第 2 个案例是在某运营服务商的应用,其架构是通过 Flink CDC 将 RDS 的数据同步到 Doris 中,同时通过 Routine Load 直接订阅 Kafka 中接入的日志数据,然后在 Doris 内部构建实时数仓。在数据调度时, 通过开源 DolphinScheduler 完成数据调度;使⽤ Prometheus+Grafana 进⾏数据监控。 **使用收益:** 采⽤ Flink+Doris 架构体系后,架构简洁、组件减少,解决了多架构下的数据的冗余存储,服务器资源节省了 30%,数据存储磁盘占⽤节省了 60%,运营成本⼤幅降低。该案例每天在⽤户的业务场景上,⽀持数万次的⽤户的在线查询和分析。 ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/5151cce6dd7b4fac8c0f313dbaadf305~tplv-k3u1fbpfcp-zoom-1.image) 第 3 个应用是在供应链企业,在过去该企业采取了 Hadoop 体系,使用组件⽐较繁多,有 RDS、HBase、Hive、HDFS、Yarn、Kafka 等多个技术栈,在该架构下,查询性能无法得到有效快速的提升,维护和开发成本一直居高不下。 **使用收益:** 引入 Doris 之后,将 RDS 的数据通过 Flink CDC 实时同步到 Doris ⾥,服务器资源成本得到了很⼤的降低。数据的查询时间从 Spark 的 2~5 ⼩时,缩短到⼗分钟,查询效率也⼤⼤提升。在数据的同步过程中,使⽤了 Flink CDC+MySQL 全量加增量的数据同步⽅式,同时还利⽤ Doris 的 Light Schema Change 特性实时同步 Binlog ⾥的 DDL 表结构变更,实现数据接⼊数仓零开发成本。 # **# 总结** 凭借 Apache Doris 丰富的分析功能和 Apache Flink 强大的实时计算能力,已经有越来越多的企业选择基于 Apache Doris 和 Flink 构建极速易用的实时数仓架构,更多案例欢迎关注 SelectDB 公众号以及相关技术博客。后续我们仍会持续提升 Apache Doris 在实时数据处理场景的能力和性能,包括 Unique 模型上的部分列更新、单表物化视图上的计算增强、自动增量刷新的多表物化视图等,后续研发进展也将在社区及时同步。在构建实时数据仓库架构中遇到任何问题,欢迎联系社区进行支持。同时也欢迎加入 Apache Doris 社区,一起将 Apache Doris 建设地更加强大! **作者介绍:** 王磊, SelectDB 资 深大数据研发专家、Apache Doris Contributor、阿里云 MVP,具有超 10 年大数据领域工作经验,对数据治理、数据湖和实时数仓有深入理解和实践,人气技术畅销书《图解 Spark 大数据快速分析实战》、《offer 来了:Java 面试核心知识点精讲(原理篇&架构篇)》作者。 **# 相关链接:** **SelectDB 官网**: https://selectdb.com **Apache Doris 官网**: http://doris.apache.org **Apache Doris Github**: https://github.com/apache/doris