1. 实时需求日趋迫切
目前各大公司的产品需求和内部决策对于数据实时性的要求越来越迫切, 需要实时数仓的能⼒来赋能 。传统离 线数仓的数据时效性是 T+1,调度频率以天为单位,⽆法⽀撑实时场景的数据需求 。即使能将调度频率设置成⼩时,也只能解决部分时效性要求不高的场景,对于实效性要求很高的场景还是⽆法优雅的⽀撑 。因此实时使 用数据的问题必须得到有效解决。
2. 实时技术日趋
实时计算框架已经经历了三代发展,分别是:Storm 、SparkStreaming 、Flink,计算框架越来越成熟。
⼀⽅⾯, 实时任务的开发已经能通过编写 SQL 的⽅式来完成,在技术层⾯能很好地继承离线数仓的架构设计 思想;
另⼀⽅⾯,在线数据开发平台所提供的功能对实时任务开发 、调试 、运维的⽀持也⽇渐趋于成熟, 开发成本逐 步降低,有助于去做这件事。
3、实时数仓建设目的
3.1. 解决传统数仓的问题
从目前数仓建设的现状来看, 实时数仓是⼀个容易让⼈产生混淆的概念,根据传统经验分析,数仓有⼀个重要 的功能, 即能够记录历史 。通常,数仓都是希望从业务上线的第⼀天开始有数据,然后⼀直记录到现在。
但实时流处理技术, ⼜是强调当前处理状态的⼀个技术,结合当前⼀线大⼚的建设经验和滴滴在该领域的建设 现状,我们尝试把公司内实时数仓建设的目的定位为, 以数仓建设理论和实时技术,解决由于当前离线数仓数 据时效性低解决不了的问题。
现阶段我们要建设实时数仓的主要原因是:
公司业务对于数据的实时性越来越迫切, 需要有实时数据来辅助完成决策;
实时数据建设没有规范,数据可用性较差,⽆法形成数仓体系, 资源大量浪费;
数据平台⼯具对整体实时开发的⽀持也⽇渐趋于成熟, 开发成本降低。
3.2. 实时数仓的应用场景
- 实时 OLAP 分析;
- 实时数据看板;
- 实时业务监控;
- 实时数据接⼝服务。
4、实时数仓建设方案
接下来我们分析下目前实时数仓建设比较好的⼏个案例,希望这些案例能够给⼤家带来⼀些启发。
4.1. 滴滴顺风车实时数仓案例
滴滴数据团队建设的实时数仓,基本满足了顺风车业务方在实时侧的各类业务需求,初步建立起顺风车实时数 仓,完成了整体数据分层, 包含明细数据和汇总数据,统⼀了 DWD 层, 降低了⼤数据资源消耗,提高了数据
复用性,可对外输出丰富的数据服务。
数仓具体架构如下图所示:
从数据架构图来看,顺风车实时数仓和对应的离线数仓有很多类似的地方 。例如分层结构; 比如 ODS 层, 明 细层,汇总层, 乃至应用层,他们命名的模式可能都是⼀样的 。但仔细比较不难发现,两者有很多区别:
与离线数仓相比, 实时数仓的层次更少⼀些
从目前建设离线数仓的经验来看,数仓的数据明细层内容会非常丰富,处理明细数据外⼀般还会包含轻度 汇总层的概念, 另外离线数仓中应用层数据在数仓内部,但实时数仓中,app 应用层数据已经落⼊应用系 统的存储介质中,可以把该层与数仓的表分离;
应用层少建设的好处:实时处理数据的时候,每建⼀个层次,数据必然会产生⼀定的延迟;
汇总层少建的好处:在汇总统计的时候,往往为了容忍⼀部分数据的延迟,可能会⼈为的制造⼀些延迟来 保证数据的准确 。举例,在统计跨天相关的订单事件中的数据时,可能会等到 00¸00¸05 或者 00¸00¸ 10
再统计,确保 00¸00 前的数据已经全部接受到位了,再进⾏统计 。所以,汇总层的层次太多的话,就会更 大的加重⼈为造成的数据延迟。
与离线数仓相比, 实时数仓的数据源存储不同
在建设离线数仓的时候, 目前滴滴内部整个离线数仓都是建⽴在 Hive 表之上 。但是,在建设实时数仓的 时候, 同⼀份表,会使用不同的方式进⾏存储 。比如常⻅的情况下, 明细数据或者汇总数据都会存在
Kafka 里⾯ ,但是像城市 、渠道等维度信息需要借助 Hbase, mysql 或者其他 KV 存储等数据库来进⾏存 储。
接下来,根据顺⻛车实时数仓架构图,对每⼀层建设做具体展开:
4.1.1实时数仓分层建设
4.1.1.1 ODS 贴源层建设
根据顺⻛车具体场景, 目前顺⻛车数据源主要包括订单相关的 binlog 日志, 冒泡和安全相关的 public 日志, 流量相关的埋点日志等。
这些数据部分已采集写⼊ kafka 或 ddmq 等数据通道中,部分数据需要借助内部自研同步⼯具完成采集, 最终 基于顺⻛车数仓 ods 层建设规范分主题统⼀写⼊ kafka 存储介质中。
命名规范:ODS 层实时数据源主要包括两种。
⼀种是在离线采集时已经自动生产的 DDMQ 或者是 Kafkatopic, 这类型的数据命名方式为采集系统自动
生成规范为:cn-binlog-数据库名-数据库名 eg: cn-binlog-ihap_fangyuan-ihap_fangyuan ⼀种是需要自⼰进⾏采集同步到 kafkatopic 中,生产的 topic 命名规范同离线类似:ODS 层采 用: realtime_ods_binlog_{源系统库/表名}/ods_log_{日志名} eg:
realtime_ods_binlog_ihap_fangyuan
4.1.1.2. DWD 明细层建设
根据顺⻛车业务过程作为建模驱动,基于每个具体的业务过程特点,构建最细粒度的明细层事实表;结合顺⻛ 车分析师在离线侧的数据使用特点,将明细事实表的某些重要维度属性字段做适当冗余,完成宽表化处理, 之 后基于当前顺⻛车业务方对实时数据的需求重点, 重点建设交易 、财务 、体验 、安全 、流量等⼏大模块;该层 的数据来源于 ODS 层,通过大数据架构提供的 Stream SQL 完成 ETL ⼯作,对于 binlog 日志的处理主要进⾏ 简单的数据清洗 、处理数据漂移和数据乱序, 以及可能对多个 ODS 表进⾏ Stream Join,对于流量日志主要是 做通用的 ETL 处理和针对顺⻛车场景的数据过滤,完成⾮结构化数据的结构化处理和数据的分流;该层的数据 除了存储在消息队列 Kafka 中,通常也会把数据实时写⼊ Druid 数据库中,供查询明细数据和作为简单汇总数 据的加⼯数据源。
命名规范:DWD 层的表命名使用英文⼩写字母, 单词之间用下划线分开,总⻓度不能超过 40 个字符,并且应 遵循下述规则: realtime_dwd_{业务/pub}_{数据域缩写}_ [{业务过程缩写}]_ [{自定义表命名标签缩写}]
{业务/pub}:参考业务命名
{数据域缩写}:参考数据域划分部分
{自定义表命名标签缩写}:实体名称可以根据数据仓库转换整合后做⼀定的业务抽象的名称,该名称应该
准确表述实体所代表的业务含义
样例:realtime_dwd_trip_trd_order_base
4.1.1.3. DIM 层
公共维度层,基于维度建模理念思想, 建⽴整个业务过程的⼀致性维度, 降低数据计算⼝径和算法不统⼀ ⻛险;
DIM 层数据来源于两部分:⼀部分是 Flink 程序实时处理 ODS 层数据得到, 另外⼀部分是通过离线任务 出仓得到;
DIM 层维度数据主要使用 MySQL 、Hbase 、fusion(滴滴自研 KV 存储) 三种存储引擎,对于维表数据比较 少的情况可以使用 MySQL,对于单条数据大⼩比较⼩ ,查询 QPS 比较高的情况,可以使用 fusion 存
储, 降低机器内存资源占用,对于数据量比较大,对维表数据变化不是特别敏感的场景,可以使用 HBase 存储。
命名规范:DIM 层的表命名使用英⽂⼩写字母, 单词之间用下划线分开,总⻓度不能超过 30 个字符,并且应 遵循下述规则: dim_{业务/pub}_{维度定义}[_{自定义命名标签}] :
{业务/pub}:参考业务命名
{维度定义}:参考维度命名
{自定义表命名标签缩写}:实体名称可以根据数据仓库转换整合后做⼀定的业务抽象的名称,该名称应该
准确表述实体所代表的业务含义
样例:dim_trip_dri_base
4.1.1.4. DWM 汇总层建设
在建设顺⻛车实时数仓的汇总层的时候,跟顺⻛车离线数仓有很多⼀样的地⽅ ,但其具体技术实现会存在很大 不同。
第⼀: 对于⼀些共性指标的加⼯, 比如 pv, uv,订单业务过程指标等,我们会在汇总层进⾏统⼀的运算,确 保关于指标的⼝径是统⼀在⼀个固定的模型中完成 。对于⼀些个性指标,从指标复用性的⻆度出发,确定唯⼀ 的时间字段, 同时该字段尽可能与其他指标在时间维度上完成拉齐,例如⾏中异常订单数需要与交易域指标在 事件时间上做到拉齐。
第⼆:在顺⻛车汇总层建设中, 需要进⾏多维的主题汇总, 因为实时数仓本身是⾯向主题的,可能每个主题会 关⼼的维度都不⼀样,所以需要在不同的主题下, 按照这个主题关⼼的维度对数据进⾏汇总, 最后来算业务⽅ 需要的汇总指标 。在具体操作中,对于 pv 类指标使用 Stream SQL 实现 1 分钟汇总指标作为最⼩汇总单位指 标,在此基础上进⾏时间维度上的指标累加;对于 uv 类指标直接使用druid 数据库作为指标汇总容器,根据 业务⽅对汇总指标的及时性和准确性的要求, 实现相应的精确去重和⾮精确去重。
第三: 汇总层建设过程中, 还会涉及到衍生维度的加⼯ 。在顺⻛车券相关的汇总指标加⼯中我们使用 Hbase 的版本机制来构建⼀个衍生维度的拉链表,通过事件流和 Hbase 维表关联的⽅式得到实时数据当时的准确维 度
命名规范:DWM 层的表命名使用英⽂⼩写字母, 单词之间用下划线分开,总⻓度不能超过 40 个字符,并且 应遵循下述规则: realtime_dwm_{业务/pub}_{数据域缩写}_{数据主粒度缩写}_ [{自定义表命名标签缩写}]_{统计时 间周期范围缩写} :
{业务/pub}:参考业务命名
{数据域缩写}:参考数据域划分部分
{数据主粒度缩写}:指数据主要粒度或数据域的缩写,也是联合主键中的主要维度
{自定义表命名标签缩写}:实体名称可以根据数据仓库转换整合后做⼀定的业务抽象的名称,该名称应该 准确表述实体所代表的业务含义
{统计时间周期范围缩写}:1d:天增量;td:天累计(全量);1h:小时增量;th:小时累计(全量);1min:分钟增 量;tmin:分钟累计(全量)
样例: realtime_dwm_trip_trd_pas_bus_accum_1min
4.1.1.5. APP 应用层
该层主要的工作是把实时汇总数据写⼊应用系统的数据库中, 包括用于大屏显示和实时 OLAP 的 Druid 数据库 (该数据库除了写⼊应用数据,也可以写⼊明细数据完成汇总指标的计算)中,用于实时数据接⼝服务的 Hbase 数据库,用于实时数据产品的 mysql 或者 redis 数据库中。
命名规范:基于实时数仓的特殊性不做硬性要求。
顺风车实时数仓建设成果
截止目前,⼀ 共为顺风车业务线建立了增长 、交易 、体验 、安全 、财务五大模块,涉及 40+ 的实时看板,涵盖 顺风车全部核⼼业务过程, 实时和离线数据误差<0.5%, 是顺风车业务线数据分析⽅面的有利补充,为顺风车 当天发券动态策略调整, 司乘安全相关监控, 实时订单趋势分析等提供了实时数据⽀持,提高了决策的时效 性。
同时建立在数仓模型之上的实时指标能根据用户需求及时完成⼝径变更和实时离线数据⼀致性校验,大大提高 了实时指标的开发效率和实时数据的准确性,也为公司内部大范围建设实时数仓提供了有⼒的理论和实践⽀ 持。
4.2. 快手实时数仓场景化案例
4.2.1. 目标
首先由于是做数仓, 因此希望所有的实时指标都有离线指标去对应,要求实时指标和离线指标整体的数据 差异在 1% 以内, 这是最低标准。
其次是数据延迟,其 SLA 标准是活动期间所有核⼼报表场景的数据延迟不能超过 5 分钟, 这 5 分钟包括 作业挂掉之后和恢复时间,如果超过则意味着 SLA 不达标。
最后是稳定性,针对⼀些场景, 比如作业重启后,我们的曲线是正常的,不会因为作业重启导致指标产出 ⼀些明显的异常。
4.2.2. 难点
第⼀个难点是数据量大 。每天整体的⼊口流量数据量级大概在万亿级 。在活动如春晚的场景, QPS 峰值能 达到亿 / 秒。
第⼆个难点是组件依赖比较复杂 。可能这条链路里有的依赖于 Kafka,有的依赖 Flink, 还有⼀些依赖 KV 存储 、RPC 接口 、OLAP 引擎等,我们需要思考在这条链路里如何分布,才能让这些组件都能正常工作。
第三个难点是链路复杂 。目前我们有 200+ 核⼼业务作业, 50+ 核⼼数据源,整体作业超过 1000。
4.2.3 实时数仓 - 分层模型
基于上面三个难点,来看⼀下数仓架构:
如上所示:
最下层有三个不同的数据源,分别是客户端日志 、服务端日志以及 Binlog 日志;
在公共基础层分为两个不同的层次,⼀个是 DWD 层,做明细数据, 另⼀个是 DWS 层,做公共聚合数
据, DIM 是我们常说的维度 。我们有⼀个基于离线数仓的主题预分层, 这个主题预分层可能包括流量 、用 户 、设备 、视频的生产消费 、风控 、社交等。
DWD 层的核⼼工作是标准化的清洗;
DWS 层是把维度的数据和 DWD 层进行关联,关联之后生成⼀些通用粒度的聚合层次。
再往上是应用层, 包括⼀些大盘的数据, 多维分析的模型以及业务专题数据;
最上面是场景。
整体过程可以分为三步:
第⼀步是做业务数据化,相当于把业务的数据接进来;
第⼆步是数据资产化,意思是对数据做很多的清洗,然后形成⼀些规则有序的数据;
第三步是数据业务化,可以理解数据在实时数据层面可以反哺业务,为业务数据价值建设提供⼀些赋能。
4.2.4 实时数仓 - 保障措施
基于上面的分层模型,来看⼀下整体的保障措施:
保障层面分为三个不同的部分,分别是质量保障, 时效保障以及稳定保障。
我们先看蓝色部分的质量保障 。针对质量保障,可以看到在数据源阶段,做了如数据源的乱序监控, 这是
我们基于自⼰的 SDK 的采集做的, 以及数据源和离线的⼀致性校准 。研发阶段的计算过程有三个阶段, 分别是研发阶段 、上线阶段和服务阶段。
研发阶段可能会提供⼀个标准化的模型,基于这个模型会有⼀些 Benchmark,并且做离线的比对验
证,保证质量是⼀致的;
上线阶段更多的是服务监控和指标监控;
在服务阶段,如果出现⼀些异常情况,先做 Flink 状态拉起,如果出现了⼀些不符合预期的场景,我
们会做离线的整体数据修复。
第⼆个是时效性保障 。针对数据源,我们把数据源的延迟情况也纳⼊监控 。在研发阶段其实还有两个事
情:
首先是压测, 常规的任务会拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任务延迟的情况;
⼦ 。
最后⼀个是稳定保障, 这在大型活动中会做得比较多, 比如切换演练和分级保障 。我们会基于之前的压测
结果做限流, 目的是保障作业在超过极限的情况下, 仍然是稳定的,不会出现很多的不稳定或者 CP 失败 的情况 。之后我们会有两种不同的标准,⼀ 种是冷备双机房, 另外⼀种是热备双机房。
冷备双机房是: 当⼀个单机房挂掉,我们会从另⼀个机房去拉起;
热备双机房:相当于同样⼀份逻辑在两个机房各部署⼀次。
以上就是我们整体的保障措施。
4.2.5 快手场景问题及解决方案
4.2.5.1. PV/UV 标准化
4.2.5.1.1 场景
第⼀个问题是 PV/UV 标准化, 这里有三个截图:
第⼀张图是春晚活动的预热场景,相当于是⼀种玩法,第二和第三张图是春晚当天的发红包活动和直播间截 图。
在活动进行过程中,我们发现 60~70% 的需求是计算页面里的信息,如:
这个页面来了多少人,或者有多少人点击进入这个页面;
活动⼀共来了多少人;
页面里的某⼀个挂件,获得了多少点击 、产生了多少曝光。
4.2.5.1.2 方案
抽象⼀下这个场景就是下面这种 SQL:
简单来说,就是从⼀张表做筛选条件,然后按照维度层面做聚合,接着产生⼀些 Count 或者 Sum 操作。基于这种场景,我们最开始的解决方案如上图右边所示。
我们用到了 Flink SQL 的 Early Fire 机制,从 Source 数据源取数据, 之后做了 DID 的分桶 。比如最开始紫色 的部分按这个做分桶,先做分桶的原因是防止某⼀个 DID 存在热点的问题 。分桶之后会有⼀个叫做 Local Window Agg 的东西,相当于数据分完桶之后把相同类型的数据相加 。Local Window Agg 之后再按照维度进 行 Global Window Agg 的合桶,合桶的概念相当于按照维度计算出最终的结果 。Early Fire 机制相当于在 Local Window Agg 开⼀个天级的窗⼝ ,然后每分钟去对外输出⼀次。
这个过程中我们遇到了⼀些问题,如上图左下角所示。
在代码正常运行的情况下是没有问题的,但如果整体数据存在延迟或者追溯历史数据的情况, 比如⼀分钟 Early Fire ⼀次, 因为追溯历史的时候数据量会比较大,所以可能导致 14¸00 追溯历史, 直接读到了 14¸02 的 数据,而 14¸01 的那个点就被丢掉了,丢掉了以后会发生什么?
在这种场景下, 图中上方的曲线为 Early Fire 回溯历史数据的结果 。横坐标是分钟,纵坐标是截止到当前时刻 的页面 UV,我们发现有些点是横着的,意味着没有数据结果,然后⼀个陡增,然后⼜横着的,接着⼜⼀个陡 增,而这个曲线的预期结果其实是图中下方那种平滑的曲线。
为了解决这个问题,我们用到了 Cumulate Window 的解决方案, 这个解决方案在 Flink 1.13 版本里也有涉 及,其原理是⼀样的。
数据开⼀个大的天级窗口,大窗口下又开了⼀个小的分钟级窗口,数据按数据本身的 Row Time 落到分钟级窗 口。
Watermark 推进过了窗口的 event_time, 它会进行⼀次下发的触发,通过这种方式可以解决回溯的问 题,数据本身落在真实的窗口, Watermark 推进,在窗口结束后触发。
此外, 这种方式在⼀定程度上能够解决乱序的问题 。比如它的乱序数据本身是⼀个不丢弃的状态,会记录 到最新的累计数据。
最后是语义⼀致性, 它会基于事件时间,在乱序不严重的情况下, 和离线计算出来的结果⼀致性是相当高 的。
4.2.5.3. DAU 计算
4.2.5.3.1 背景介绍
下面介绍⼀下 DAU 计算:
我们对于整个大盘的活跃设备 、新增设备和回流设备有比较多的监控。
活跃设备指的是当天来过的设备;
新增设备指的是当天来过且历史没有来过的设备;
回流设备指的是当天来过且 N 天内没有来过的设备。
但是我们计算过程之中可能需要 5~8 个这样不同的 Topic 去计算这⼏个指标。
我们看⼀下离线过程中,逻辑应该怎么算。
首先我们先算活跃设备,把这些合并到⼀起,然后做⼀个维度下的天级别去重,接着再去关联维度表, 这个维 度表包括设备的首末次时间,就是截止到昨天设备首次访问和末次访问的时间。
得到这个信息之后,我们就可以进⾏逻辑计算,然后我们会发现新增和回流的设备其实是活跃设备里打的⼀个 ⼦标签 。新增设备就是做了⼀个逻辑处理, 回流设备是做了 30 天的逻辑处理,基于这样的解决方案,我们能 否简单地写⼀个 SQL 去解决这个问题?
其实我们最开始是这么做的,但遇到了⼀些问题:
第⼀个问题是:数据源是 6~8 个, 而且我们大盘的⼝径经常会做微调,如果是单作业的话,每次微调的过 程之中都要改, 单作业的稳定性会非常差;
第⼆个问题是:数据量是万亿级, 这会导致两个情况, 首先是这个量级的单作业稳定性非常差,其次是实 时关联维表的时候用的 KV 存储,任何⼀个这样的 RPC 服务接⼝ ,都不可能在万亿级数据量的场景下保证 服务稳定性;
第三个问题是:我们对于时延要求比较高,要求时延⼩于⼀分钟 。整个链路要避免批处理,如果出现了⼀ 些任务性能的单点问题,我们还要保证高性能和可扩容。
4.2.5.3.2 技术方案
针对以上问题,介绍⼀下我们是怎么做的:
如上图的例⼦ ,第⼀步是对 A B C 这三个数据源,先按照维度和 DID 做分钟级别去重,分别去重之后得到三个 分钟级别去重的数据源,接着把它们 Union 到⼀起,然后再进行同样的逻辑操作。
这相当于我们数据源的⼊⼝从万亿变到了百亿的级别,分钟级别去重之后再进行⼀个天级别的去重,产生的数 据源就可以从百亿变成了⼏⼗亿的级别。
在⼏⼗亿级别数据量的情况下, 我们再去关联数据服务化, 这就是⼀种比较可行的状态,相当于去关联用户画 像的 RPC 接⼝ ,得到 RPC 接⼝之后, 最终写⼊到了目标 Topic。这个目标 Topic 会导⼊到 OLAP 引擎,供给
多个不同的服务, 包括移动版服务,大屏服务,指标看板服务等。
这个⽅案有三个⽅面的优势,分别是稳定性 、时效性和准确性。
首先是稳定性 。松耦合可以简单理解为当数据源 A 的逻辑和数据源 B 的逻辑需要修改时,可以单独修改。
第⼆是任务可扩容, 因为我们把所有逻辑拆分得非常细粒度, 当⼀些地⽅出现了如流量问题,不会影响后 面的部分,所以它扩容比较简单, 除此之外还有服务化后置和状态可控。
其次是时效性,我们做到毫秒延迟,并且维度丰富,整体上有 20+ 的维度做多维聚合。最后是准确性,我们⽀持数据验证 、实时监控 、模型出⼝统⼀等。
此时我们遇到了另外⼀个问题 - 乱序 。对于上⽅三个不同的作业, 每⼀个作业重启至少会有两分钟左右的延 迟,延迟会导致下游的数据源 Union 到⼀起就会有乱序。
4.2.5.3 延迟计算⽅案
遇到上面这种有乱序的情况下, 我们要怎么处理?
我们总共有三种处理方案:
第⼀种解决方案是用“did + 维度 + 分钟”进行去重,Value 设为“是否来过”。比如同⼀个 did, 04:01 来
了⼀条, 它会进行结果输出 。同样的, 04:02 和 04¸04 也会进行结果输出 。但如果 04:01 再来, 它就会 丢弃,但如果 04¸00 来,依旧会进行结果输出。
这个解决方案存在⼀些问题, 因为我们按分钟存,存 20 分钟的状态大⼩是存 10 分钟的两倍,到后面这个 状态大⼩有点不太可控, 因此我们⼜换了解决方案 2。
第⼆种解决方案,我们的做法会涉及到⼀个假设前提,就是假设不存在数据源乱序的情况 。在这种情况 下, key 存的是“did + 维度”,Value 为“时间戳”, 它的更新方式如上图所示。
04¸01 来了⼀条数据, 进行结果输出 。04:02 来了⼀条数据,如果是同⼀个 did,那么它会更新时间戳, 然后仍然做结果输出 。04:04 也是同样的逻辑,然后将时间戳更新到 04:04,如果后面来了⼀条 04:
01 的数据, 它发现时间戳已经更新到 04¸04, 它会丢弃这条数据。
这样的做法大幅度减少了本身所需要的⼀些状态,但是对乱序是零容忍,不允许发生任何乱序的情况, 由 于我们不好解决这个问题, 因此我们⼜想出了解决方案 3。
方案 3 是在方案 2 时间戳的基础之上, 加了⼀个类似于环形缓冲区,在缓冲区之内允许乱序。
比如 04¸01 来了⼀条数据, 进行结果输出;04¸02 来了⼀条数据, 它会把时间戳更新到 04¸02,并且会记 录同⼀个设备在 04¸01 也来过 。如果 04¸04 再来了⼀条数据,就按照相应的时间差做⼀个位移, 最后通 过这样的逻辑去保障它能够容忍⼀定的乱序。
综合来看这三个方案:
方案 1 在容忍 16 分钟乱序的情况下, 单作业的状态大⼩在 480G 左右 。这种情况虽然保证了准确性,但 是作业的恢复和稳定性是完全不可控的状态, 因此我们还是放弃了这个方案;
方案 2 是 30G 左右的状态大⼩ ,对于乱序 0 容忍,但是数据不准确, 由于我们对准确性的要求非常高, 因此也放弃了这个方案;
方案 3 的状态跟方案 1 相比, 它的状态虽然变化了但是增加的不多,而且整体能达到跟方案 1 同样的效
果 。方案 3 容忍乱序的时间是 16 分钟,我们正常更新⼀个作业的话, 10 分钟完全足够重启, 因此最终选 择了方案 3。
3. 运营场景
3.1 背景介绍
运营场景可分为四个部分:
第⼀个是数据大屏支持, 包括单直播间的分析数据和大盘的分析数据, 需要做到分钟级延迟,更新要求比
较高;
第⼆个是直播看板支持, 直播看板的数据会有特定维度的分析,特定⼈群支持,对维度丰富性要求比较
高;
第三个是数据策略榜单, 这个榜单主要是预测热门作品 、爆款,要求的是⼩时级别的数据,更新要求比较
低;
第四个是 C 端实时指标展示,查询量比较大,但是查询模式比较固定。
下面进行分析这 4 种不同的状态产生的⼀些不同的场景。
前 3 种基本没有什么差别, 只是在查询模式上, 有的是特定业务场景,有的是通用业务场景。
针对第 3 种和第 4 种, 它对于更新的要求比较低,对于吞吐的要求比较高,过程之中的曲线也不要求有⼀致 性 。第 4 种查询模式更多的是单实体的⼀些查询, 比如去查询内容,会有哪些指标,而且对 QPS 要求比较 高。
3.2 技术方案
针对上方 4 种不同的场景,我们是如何去做的?
首先看⼀下基础明细层 (图中左方),数据源有两条链路,其中⼀条链路是消费的流, 比如直播的消费信 息, 还有观看 / 点赞 / 评论 。经过⼀轮基础清洗,然后做维度管理 。上游的这些维度信息来源于 Kafka,
Kafka 写⼊了⼀些内容的维度,放到了 KV 存储里边, 包括⼀些用户的维度。
这些维度关联了之后, 最终写⼊ Kafka 的 DWD 事实层, 这里为了做性能的提升,我们做了⼆级缓存的操 作。
如图中上⽅ ,我们读取 DWD 层的数据然后做基础汇总,核⼼是窗⼝维度聚合生成 4 种不同粒度的数据,
分别是大盘多维汇总 topic 、直播间多维汇总 topic 、作者多维汇总 topic 、用户多维汇总 topic, 这些都是 通用维度的数据。
如图中下⽅ ,基于这些通用维度数据,我们再去加⼯个性化维度的数据,也就是 ADS 层 。拿到了这些数
据之后会有维度扩展, 包括内容扩展和运营维度的拓展,然后再去做聚合, 比如会有电商实时 topic,机 构服务实时 topic 和大 V 直播实时 topic。
分成这样的两个链路会有⼀个好处:⼀个地⽅处理的是通用维度, 另⼀个地⽅处理的是个性化的维度 。通 用维度保障的要求会比较高⼀些,个性化维度则会做很多个性化的逻辑 。如果这两个耦合在⼀起的话,会 发现任务经常出问题,并且分不清楚哪个任务的职责是什么,构建不出这样的⼀个稳定层。
如图中右⽅, 最终我们用到了三种不同的引擎 。简单来说就是 Redis 查询用到了 C 端的场景, OLAP 查询 用到了大屏 、业务看板的场景。
5) 未来规划
上⽂⼀共讲了三个场景,第⼀个场景是标准化 PU/UV 的计算,第⼆个场景是 DAU 整体的解决⽅案,第三个场 景是运营侧如何解决 。基于这些内容,我们有⼀些未来规划,分为 4 个部分。
第⼀部分是实时保障体系完善:
⼀⽅面做⼀些大型的活动, 包括春晚活动以及后续常态化的活动 。针对这些活动如何去保障,我们有
⼀套规范去做平台化的建设;
第⼆个是分级保障标准制定, 哪些作业是什么样的保障级别 / 标准,会有⼀个标准化的说明;
第三个是引擎平台能⼒推动解决, 包括 Flink 任务的⼀些引擎,在这上面我们会有⼀个平台,基于这
个平台去做规范 、标准化的推动。
第⼆部分是实时数仓内容构建:
⼀⽅面是场景化⽅案的输出, 比如针对活动会有⼀些通用化的⽅案,而不是每次活动都开发⼀套新的
解决⽅案;
另⼀⽅面是内容数据层次沉淀, 比如现在的数据内容建设,在厚度⽅面有⼀些场景的缺失, 包括内容
如何更好地服务于上游的场景。
第三部分是 Flink SQL 场景化构建, 包括 SQL 持续推⼴ 、SQL 任务稳定性和 SQL 任务资源利用率 。我们 在预估资源的过程中,会考虑比如在什么样 QPS 的场景下, SQL 用什么样的解决⽅案,能⽀撑到什么情
况 。Flink SQL 可以⼤幅减少⼈效,但是在这个过程中,我们想让业务操作更加简单。
第四部分是批流⼀体探索 。实时数仓的场景其实就是做离线 ETL 计算加速,我们会有很多⼩时级别的任
务,针对这些任务,每次批处理的时候有⼀些逻辑可以放到流处理去解决, 这对于离线数仓 SLA 体系的提 升⼗分巨⼤ 。
3. 腾讯看点实时数仓案例
腾讯看点业务为什么要构建实时数仓? 因为原始的上报数据量非常⼤,⼀ 天上报峰值就有上万亿条 。而且上报 格式混乱 。缺乏内容维度信息 、用户画像信息,下游没办法直接使用。
而我们提供的实时数仓, 是根据腾讯看点信息流的业务场景, 进行了内容维度的关联,用户画像的关联,各种 粒度的聚合,下游可以非常⽅便的使用实时数据,而且实时数据仓库可以提供给下游的用户反复的消费使用, 可以⼤量的减少重复的⼯作。
1) 方案选型
那就看下我们多维实时数据分析系统的⽅案选型,选型我们对比了行业内的领先⽅案,选择了最符合我们业务 场景的⽅案。
第⼀块是实时数仓的选型,我们选择的是业界比较成熟的 Lambda 架构,他的优点是灵活性高 、容错性
高 、成熟度高和迁移成本低;缺点是实时 、离线数据用两套代码,可能会存在⼀个⼝径修改了, 另⼀个没 改的问题,我们每天都有做数据对账的⼯作,如果有异常会进行告警。
第⼆块是实时计算引擎选型, 因为 Flink 设计之初就是为了流处理,SparkStreaming 严格来说还是微批处 理,Strom 用的已经不多了 。再看 Flink 具有 Exactly-once 的准确性 、轻量级 Checkpoint 容错机制 、低
延时高吞吐和易用性高的特点,我们选择了 Flink 作为实时计算引擎。
第三块是实时存储引擎,我们的要求就是需要有维度索引 、支持高并发 、预聚合 、高性能实时多维 OLAP
查询 。可以看到, Hbase 、Tdsql 和 ES 都不能满足要求, Druid 有⼀个缺陷, 它是按照时序划分 Segment,无法将同⼀个内容,存放在同⼀个 Segment 上, 计算全局 TopN 只能是近似值,所以我们选 择了最近两年大火的 MPP 数据库引擎 ClickHouse。
2) 设计目标与设计难点
我们多维实时数据分析系统分为三大模块
1. 实时计算引擎
2. 实时存储引擎
3. App 层
难点主要在前两个模块:实时计算引擎和实时存储引擎。
1. 千万级/s 的海量数据如何实时接⼊ ,并且进行极低延迟维表关联。
2. 实时存储引擎如何支持高并发写⼊ 、高可用分布式和高性能索引查询, 是比较难的。这⼏个模块的具体实现,看⼀下我们系统的架构设计。
3) 架构设计
前端采用的是开源组件 Ant Design,利用了 Nginx 服务器,部署静态页面,并反向代理了浏览器的请求到后 台服务器上。
后台服务是基于腾讯自研的 RPC 后台服务框架写的,并且会进⾏⼀些⼆级缓存。
实时数仓部分,分为了接⼊层 、实时计算层和实时数仓存储层。
接⼊层主要是从千万级/s 的原始消息队列中,拆分出不同⾏为数据的微队列,拿看点的视频来说,拆分过 后,数据就只有百万级/s 了;
实时计算层主要负责, 多⾏⾏为流水数据进⾏⾏转列, 实时关联用户画像数据和内容维度数据;
实时数仓存储层主要是设计出符合看点业务的,下游好用的实时消息队列 。我们暂时提供了两个消息队
列,作为实时数仓的两层 。⼀层 DWM 层是内容 ID-用户 ID 粒度聚合的,就是⼀条数据包含内容 ID-用户 ID 还有 B 侧内容数据 、C 侧用户数据和用户画像数据;另⼀层是 DWS 层, 是内容 ID 粒度聚合的,⼀ 条 数据包含内容 ID, B 侧数据和 C 侧数据 。可以看到内容 ID-用户 ID 粒度的消息队列流量进⼀步减小到⼗ 万级/s, 内容 ID 粒度的更是万级/s,并且格式更加清晰,维度信息更加丰富。
实时存储部分分为实时写⼊层 、OLAP 存储层和后台接⼝层。
实时写⼊层主要是负责 Hash 路由将数据写⼊;
OLAP 存储层利用 MPP 存储引擎,设计符合业务的索引和物化视图, 高效存储海量数据;后台接⼝层提供高效的多维实时查询接⼝ 。
4) 实时计算
这个系统最复杂的两块, 实时计算和实时存储。
先介绍实时计算部分:分为实时关联和实时数仓。
1. 实时高性能维表关联
实时维表关联这⼀块难度在于 百万级/s 的实时数据流,如果直接去关联 HBase, 1 分钟的数据,关联完 HBase 耗时是⼩时级的,会导致数据延迟严重。
我们提出了⼏个解决方案:
第⼀个是,在 Flink 实时计算环节,先按照 1 分钟进行了窗⼝聚合,将窗⼝内多行行为数据转⼀行多列的 数据格式,经过这⼀步操作,原本⼩时级的关联耗时下降到了⼗⼏分钟,但是还是不够的。
第二个是,在访问 HBase 内容之前设置⼀层 Redis 缓存, 因为 1000 条数据访问 HBase 是秒级的,而访
问 Redis 是毫秒级的,访问 Redis 的速度基本是访问 HBase 的 1000 倍 。为了防止过期的数据浪费缓 存,缓存过期时间设置成 24 ⼩时, 同时通过监听写 HBase Proxy 来保证缓存的⼀致性 。这样将访问时间
从⼗⼏分钟变成了秒级。
第三个是,上报过程中会上报不少非常规内容 ID, 这些内容 ID 在内容 HBase 中是不存储的,会造成缓存
穿透的问题 。所以在实时计算的时候,我们直接过滤掉这些内容 ID, 防止缓存穿透, ⼜减少⼀些时间 。 第四个是, 因为设置了定时缓存,会引⼊⼀个缓存雪崩的问题 。为了防止雪崩,我们在实时计算中, 进行
了削峰填谷的操作,错开设置缓存的时间。
可以看到,优化前后,数据量从百亿级减少到了⼗亿级,耗时从⼩t时级减少到了数⼗秒,减少 99%。
2. 下游提供服务
实时数仓的难度在于:它处于比较新的领域,并且各个公司各个业务差距比较大,怎么能设计出⽅便,好用, 符合看点业务场景的实时数仓是有难度的。
先看⼀下实时数仓做了什么, 实时数仓对外就是⼏个消息队列,不同的消息队列里面存放的就是不同聚合粒度 的实时数据, 包括内容 ID 、用户 ID 、C 侧行为数据 、B 侧内容维度数据和用户画像数据等。
我们是怎么搭建实时数仓的,就是上面介绍的实时计算引擎的输出,放到消息队列中保存,可以提供给下游多 用户复用。
我们可以看下, 在我们建设实时数据仓库前后, 开发⼀个实时应用的区别 。没有数仓的时候,我们需要消费千 万级/s 的原始队列, 进行复杂的数据清洗,然后再进行用户画像关联 、内容维度关联,才能拿到符合要求格式 的实时数据, 开发和扩展的成本都会比较高,如果想开发⼀个新的应用, ⼜要走⼀遍这个流程 。有了数仓之 后,如果想开发内容 ID 粒度的实时应用,就直接申请 TPS 万级/s 的 DWS 层的消息队列 。开发成本变低很 多, 资源消耗⼩很多,可扩展性也强很多。
看个实际例⼦, 开发我们系统的实时数据大屏,原本需要进行如上所有操作,才能拿到数据 。现在只需要消费 DWS 层消息队列, 写⼀条 Flink SQL 即可,仅消耗 2 个 CPU 核⼼, 1G 内存。
可以看到, 以 50 个消费者为例, 建立实时数仓前后,下游开发⼀个实时应用,可以减少 98%的资源消耗 。包 括计算资源,存储资源,⼈⼒成本和开发⼈员学习接⼊成本等等 。并且消费者越多, 节省越多 。就拿 Redis 存 储这⼀部分来说,⼀个⽉就能省下上百万⼈民币。
5) 实时存储
介绍完实时计算,再来介绍实时存储。
这块分为三个部分来介绍
第⼀是 分布式-高可用
第⼆是 海量数据-写⼊
第三是 高性能-查询
我们这里听取的是 Clickhouse 官方的建议,借助 ZK 实现高可用的方案 。数据写⼊⼀个分片,仅写⼊⼀个副 本,然后再写 ZK,通过 ZK 告诉同⼀个分片的其他副本,其他副本再过来拉取数据,保证数据⼀致性。
这里没有选用消息队列进⾏数据同步, 是因为 ZK 更加轻量级 。而且写的时候,任意写⼀个副本,其它副本都 能够通过 ZK 获得⼀致的数据 。而且就算其它节点第⼀次来获取数据失败了,后⾯只要发现它跟 ZK 上记录的 数据不⼀致,就会再次尝试获取数据,保证⼀致性。
2. 海量数据-写入
数据写⼊遇到的第⼀个问题是,海量数据直接写⼊ Clickhouse 的话,会导致 ZK 的 QPS 太高,解决方案是改 用 Batch 方式写⼊ 。Batch 设置多大呢, Batch 太⼩的话缓解不了 ZK 的压⼒ , Batch 也不能太大,不然上游 内存压⼒太大,通过实验, 最终我们选用了大⼩⼏⼗万的 Batch。
第⼆个问题是, 随着数据量的增⻓, 单 QQ 看点的视频内容每天可能写⼊百亿级的数据,默认方案是写⼀张分 布式表, 这就会造成单台机器出现磁盘的瓶颈,尤其是 Clickhouse 底层运用的是 Mergetree,原理类似于 HBase 、RocketsDB 的底层 LSM-Tree。在合并的过程中会存在写放大的问题,加重磁盘压⼒ 。峰值每分钟⼏ 千万条数据, 写完耗时⼏⼗秒,如果正在做 Merge,就会阻塞写⼊请求,查询也会⾮常慢 。我们做的两个优化 方案:⼀是对磁盘做 Raid,提升磁盘的 IO;⼆是在写⼊之前进⾏分表, 直接分开写⼊到不同的分片上, 磁盘 压⼒直接变为 1/N。
第三个问题是,虽然我们写⼊按照分片进⾏了划分,但是这里引⼊了⼀个分布式系统常⻅的问题,就是局部的 Top 并非全局 Top 的问题 。比如同⼀个内容 ID 的数据落在了不同的分片上, 计算全局 Top100 阅读的内容 ID,有⼀个内容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,导致汇总的时候,会丢失⼀部分 数据,影响最终结果 。我们做的优化是在写⼊之前加上⼀层路由,将同⼀个内容 ID 的记录,全部路由到同⼀ 个分片上, 解决了该问题。
介绍完写⼊,下⼀步介绍 Clickhouse 的高性能存储和查询。
3. 高性能-存储-查询
Clickhouse 高性能查询的⼀个关键点是稀疏索引 。稀疏索引这个设计就很有讲究,设计得好可以加速查询,设 计不好反而会影响查询效率 。我根据我们的业务场景, 因为我们的查询大部分都是时间和内容 ID 相关的, 比 如说,某个内容,过去 N 分钟在各个⼈群表现如何?我按照⽇期,分钟粒度时间和内容 ID 建立了稀疏索引 。 针对某个内容的查询, 建立稀疏索引之后,可以减少 99%的⽂件扫描。
还有⼀个问题就是,我们现在数据量太大,维度太多 。拿 QQ 看点的视频内容来说,⼀ 天流水有上百亿条,有 些维度有⼏百个类别 。如果⼀次性把所有维度进⾏预聚合,数据量会指数膨胀,查询反而变慢,并且会占用大 量内存空间 。我们的优化,针对不同的维度, 建立对应的预聚合物化视图,用空间换时间, 这样可以缩短查询 的时间。
腾讯看点高性能存储
分布式表查询还会有⼀个问题,查询单个内容 ID 的信息,分布式表会将查询下发到所有的分片上, 然后再返 回查询结果进⾏汇总 。实际上, 因为做过路由,⼀个内容 ID 只存在于⼀个分片上, 剩下的分片都在空跑 。针 对这类查询,我们的优化是后台按照同样的规则先进⾏路由, 直接查询目标分片, 这样减少了 N-1/N 的负载, 可以大量缩短查询时间 。而且由于我们是提供的 OLAP 查询,数据满足最终⼀致性即可,通过主从副本读写分 离,可以进⼀步提升性能。
我们在后台还做了⼀个 1 分钟的数据缓存,针对相同条件查询,后台就直接返回了。
4. 扩容
这里再介绍⼀下我们的扩容的⽅案,调研了业内的⼀些常⻅⽅案。
比如 HBase,原始数据都存放在 HDFS 上, 扩容只是 Region Server 扩容,不涉及原始数据的迁移 。但是 Clickhouse 的每个分片数据都是在本地, 是⼀个比较底层存储引擎,不能像 HBase 那样⽅便扩容。
Redis 是哈希槽这种类似⼀致性哈希的方式, 是比较经典分布式缓存的方案 。Redis slot 在 Rehash 的过程中 虽然存在短暂的 ask 读不可用,但是总体来说迁移是比较方便的,从原 h[0]迁移到 h[1], 最后再删除 h[0]。但是 Clickhouse ⼤部分都是 OLAP 批量查询,不是点查,而且由于列式存储,不支持删除的特性,⼀ 致性哈 希的方案不是很适合。
目前扩容的方案是, 另外消费⼀份数据, 写⼊新 Clickhouse 集群,两个集群⼀起跑⼀段时间, 因为实时数据 就保存 3 天,等 3 天之后,后台服务直接访问新集群。
4. 有赞实时数仓案例
1) 分层设计
传统离线数仓的分层设计⼤家都很熟悉,为了规范的组织和管理数据,层级划分会比较多,在⼀些复杂逻辑处 理场景还会引⼊临时层落地中间结果以方便下游加⼯处理 。实时数仓考虑到时效性问题,分层设计需要尽量精
简, 降低中间流程出错的可能性,不过总体而⾔, 实时数仓还是会参考离线数仓的分层思想来设计。
实时数仓分层架构如下图所示 :
ODS ( 实时数据接入层)
ODS 层, 即实时数据接⼊层,通过数据采集⼯具收集各个业务系统的实时数据,对非结构化的数据进⾏结构化 处理,保存原始数据,⼏乎不过滤数据;该层数据的主要来源有三个部分:第⼀部分是业务方创建的 NSQ 消 息,第⼆部分是业务数据库的 Binlog 日志,第三部分是埋点日志和应用程序日志, 以上三部分的实时数据最终 统⼀写⼊ Kafka 存储介质中。
ODS 层表命名规范:部门名称.应用名称.数仓层级主题域前缀数据库名/消息名
例如:接⼊业务库的 Binlog
实时数仓表命名: deptname.appname.ods_subjectname_tablename
例如:接⼊业务方的 NSQ 消息
实时数仓表命名: deptname.appname.ods_subjectname_msgname
DWS ( 实时明细中间层)
DWS 层, 即实时明细中间层,该层以业务过程作为建模驱动,基于每个具体的业务过程事件来构建最细粒度 的明细层事实表; 比如交易过程,有下单事件 、支付事件 、发货事件等,我们会基于这些独立的事件来进行明 细层的构建 。在这层,事实明细数据同样是按照离线数仓的主题域来进行划分,也会采用维度建模的方式组织 数据,对于⼀些重要的维度字段,会做适当冗余 。基于有赞实时需求的场景, 重点建设交易 、营销 、客户 、店 铺 、商品等主题域的数据 。该层的数据来源于 ODS 层,通过 FlinkSQL 进行 ETL 处理, 主要⼯作有规范命 名 、数据清洗 、维度补全 、多流关联, 最终统⼀写⼊ Kafka 存储介质中。
DWS 层表命名规范: 部门名称 .应用名称 .数仓层级_主题域前缀_数仓表命名
例如:实时事件 A 的中间层
实时数仓表命名: deptname.appname.dws_subjectname_tablename_eventnameA
例如:实时事件 B 的中间层
实时数仓表命名: deptname.appname.dws_subjectname_tablename_eventnameB
DIM ( 实时维表层)
DIM 层, 即实时维表层,用来存放维度数据, 主要用于实时明细中间层宽化处理时补全维度使用, 目前该层的 数据主要存储于 HBase 中,后续会基于 QPS 和数据量大⼩提供更多合适类型的存储介质。
DIM 层表命名规范: 应用名称_数仓层级_主题域前缀_数仓表命名
例如:HBase 存储, 实时维度表
实时数仓表命名: appname_dim_tablename
DWA ( 实时汇总层)
DWA 层, 即实时汇总层,该层通过 DWS 层数据进行多维汇总,提供给下游业务方使用,在实际应用过程中, 不同业务方使用维度汇总的方式不太⼀样,根据不同的需求采用不同的技术方案去实现 。第⼀种方式,采用 FlinkSQL 进行实时汇总,将结果指标存⼊ HBase 、MySQL 等数据库,该种方式是我们早期采用的方案,优点 是实现业务逻辑比较灵活,缺点是聚合粒度固化,不易扩展;第⼆种方式,采用实时 OLAP ⼯具进行汇总,该 种方式是我们目前常用的方案,优点是聚合粒度易扩展,缺点是业务逻辑需要在中间层预处理。
DWA 层表命名规范: 应用名称_数仓层级_主题域前缀_聚合粒度_数据范围
例如:HBase 存储,某域当日某粒度实时汇总表
实时数仓表命名: appname_dwa_subjectname_aggname_daily
APP ( 实时应用层)
APP 层, 即实时应用层,该层数据已经写入应用系统的存储中,例如写入 Druid 作为 BI 看板的实时数据集;写入 HBase 、MySQL 用于提供统⼀数据服务接口;写入 ClickHouse 用于提供实时 OLAP 服务 。因为该层非 常贴近业务,在命名规范上实时数仓不做统⼀要求。
2) 实时 ETr
实时数仓 ETL 处理过程所涉及的组件比较多,接下来盘点构建实时数仓所需要的组件以及每个组件的应用场 景 。如下图所示:
具体实时 ETL 处理流程如下图所示:
1. 维度补全
创建调用 Duboo 接口的 UDF 函数在实时流里补全维度是最便捷的使用方式,但如果请求量过大,对 Duboo 接口压力会过大 。在实际应用场景补全维度首选还是关联维度表,但关联也存在⼀定概率的丢失问题,为了弥 补这种丢失,可以采用 Duboo 接口调用兜底的方式来补全 。伪代码如下:
create function call_dubbo as 'XXXXXXX';
create function get_json_object as 'XXXXXXX';
case
when cast( b.column as bigint) is not null
then cast( b.column as bigint)
else cast(coalesce(cast(get_json_object(call_dubbo( 'clusterUrl '
, 'serviceName '
, 'methodName '
,cast(concat( ' [ ',cast(a.column as varchar), '] ') as varchar)
, 'key '
)
, 'rootId ')
as bigint)
,a.column)
as bigint) end
2. 幕等处理
实时任务在运⾏过程中难免会遇到执⾏异常的情况, 当任务异常重启的时候会导致部分消息重新发送和消费, 从而引发下游实时统计数据不准确,为了有效避免这种情况,可以选择对实时消息流做幂等处理, 当消费完⼀ 条消息,将这条消息的 Key 存⼊ KV,如果任务异常重启导致消息重新发送的时候,先从 KV 判断该消息是否 已被消费,如果已消费就不再往下发送 。伪代码如下:
create function idempotenc as 'XXXXXXX';
insert into table
select
order_no
from
(
select
a.orderNo as order_no
, idempotenc( 'XXXXXXX', coalesce( order_no, ' ') ) as rid
from
table1
) t
where
t.rid = 0;
3. 数据验证
由于实时数仓的数据是⽆边界的流,相比于离线数仓固定不变的数据更难验收 。基于不同的场景,我们提供了 2 种验证⽅式,分别是:抽样验证与全量验证 。如下图所示
抽样验证⽅案
该⽅案主要应用在数据准确性验证上, 实时汇总结果是基于存储在 Kafka 的实时明细中间层计算而来,但 Kafka 本身不⽀持按照特定条件检索,不⽀持写查询语句,再加上消息的⽆边界性,统计结果是在不断变化 的,很难寻找参照物进⾏比对 。鉴于此,我们采用了持久化消息的⽅法,将消息落盘到 TiDB 存储,基于 TiDB 的能⼒对落盘的消息进⾏检索 、查询 、汇总 。编写固定时间边界的测试用例与相同时间边界的业务库数据或者 离线数仓数据进⾏比对 。通过以上⽅式,抽样核⼼店铺的数据进⾏指标准确性验证,确保测试用例全部通过。
全量验证⽅案
该⽅案主要应用在数据完整性和⼀致性验证上, 在实时维度表验证的场景使用最多 。大体思路:将存储实时维 度表的在线 HBase 集群中的数据同步到离线 HBase 集群中,再将离线 HBase 集群中的数据导⼊到 Hive 中, 在限定实时维度表的时间边界后,通过数据平台提供的数据校验功能, 比对实时维度表与离线维度表是否存在 差异, 最终确保两张表的数据完全⼀致。
4. 数据恢复
实时任务⼀旦上线就要求持续不断的提供准确 、稳定的服务 。区别于离线任务按天调度,如果离线任务出现 Bug,会有充足的时间去修复 。如果实时任务出现 Bug,必须按照提前制定好的流程,严格按照步骤执行,否 则极易出现问题 。造成 Bug 的情况有非常多, 比如代码 Bug 、异常数据 Bug 、实时集群 Bug,如下图展示了 修复实时任务 Bug 并恢复数据的流程。
5. 腾讯全场景实时数仓建设案例
在数仓体系中会有各种各样的大数据组件,譬如 Hive/HBase/HDFS/S3,计算引擎如 MapReduce 、Spark 、 Flink,根据不同的需求,用户会构建大数据存储和处理平台,数据在平台经过处理和分析,结果数据会保存到 MySQL 、Elasticsearch 等支持快速查询的关系型 、非关系型数据库中,接下来应用层就可以基于这些数据进 行 BI 报表开发 、用户画像,或基于 Presto 这种 OLAP 工具进行交互式查询等。
1) Lambda 架构的痛点
在整个过程中我们常常会用⼀些离线的调度系统,定期的 ( T+1 或者每隔⼏⼩时) 去执行⼀些 Spark 分析任 务,做⼀些数据的输⼊ 、输出或是 ETL ⼯作 。离线数据处理的整个过程中必然存在数据延迟的现象,不管是数 据接⼊还是中间的分析,数据的延迟都是比较大的,可能是⼩时级也有可能是天级别的 。另外⼀些场景中我们 也常常会为了⼀些实时性的需求去构建⼀个实时处理过程, 比如借助 Flink+Kafka 去构建实时的流处理系统。
整体上, 数仓架构中有非常多的组件,大大增加了整个架构的复杂性和运维的成本。
如下图, 这是很多公司之前或者现在正在采用的 Lambda 架构, Lambda 架构将数仓分为离线层和实时层,相 应的就有批处理和流处理两个相互独立的数据处理流程, 同⼀份数据会被处理两次以上, 同⼀套业务逻辑代码 需要适配性的开发两次 。Lambda 架构大家应该已经非常熟悉了,下面我就着重介绍⼀下我们采用 Lambda 架 构在数仓建设过程中遇到的⼀些痛点问题。
例如在实时计算⼀些用户相关指标的实时场景下, 我们想看到当前 pv 、uv 时,我们会将这些数据放到实时层 去做⼀些计算, 这些指标的值就会实时呈现出来,但同时想了解用户的⼀个增⻓趋势, 需要把过去⼀天的数据 计算出来 。这样就需要通过批处理的调度任务来实现, 比如凌晨两三点的时候在调度系统上起⼀个 Spark 调度 任务把当天所有的数据重新跑⼀遍。
很显然在这个过程中, 由于两个过程运⾏的时间是不⼀样的,跑的数据却相同, 因此可能造成数据的不⼀致 。因为某⼀条或⼏条数据的更新, 需要重新跑⼀遍整个离线分析的链路,数据更新成本很⼤, 同时需要维护离线 和实时分析两套计算平台,整个上下两层的开发流程和运维成本其实都是⾮常高的。
为了解决 Lambda 架构带来的各种问题,就诞生了 Kappa 架构, 这个架构⼤家应该也⾮常的熟悉。
2) Kappa 架构的痛点
我们来讲⼀下 Kappa 架构,如下图, 它中间其实用的是消息队列,通过用 Flink 将整个链路串联起来 。Kappa 架构解决了 Lambda 架构中离线处理层和实时处理层之间由于引擎不⼀样,导致的运维成本和开发成本高昂的 问题,但 Kappa 架构也有其痛点。
首先,在构建实时业务场景时,会用到 Kappa 去构建⼀个近实时的场景,但如果想对数仓中间层例如
ODS 层做⼀些简单的 OLAP 分析或者进⼀步的数据处理时,如将数据写到 DWD 层的 Kafka,则需要另外 接⼊ Flink。同时, 当需要从 DWD 层的 Kafka 把数据再导⼊到 Clickhouse, Elasticsearch, MySQL 或 者是 Hive 里⾯做进⼀步的分析时, 显然就增加了整个架构的复杂性。
其次, Kappa 架构是强烈依赖消息队列的,我们知道消息队列本身在整个链路上数据计算的准确性是严格
依赖它上游数据的顺序,消息队列接的越多,发生乱序的可能性就越⼤ 。ODS 层数据⼀般是绝对准确的, 把 ODS 层的数据发送到下⼀个 kafka 的时候就有可能发生乱序, DWD 层再发到 DWS 的时候可能⼜乱序 了, 这样数据不⼀致性就会变得很严重。
第三, Kafka 由于它是⼀个顺序存储的系统,顺序存储系统是没有办法直接在其上⾯利用 OLAP 分析的⼀ 些优化策略,例如谓词下推这类的优化策略,在顺序存储的 Kafka 上来实现是比较困难的事情。
那么有没有这样⼀个架构, 既能够满足实时性的需求, ⼜能够满足离线计算的要求,而且还能够减轻运维开发 的成本,解决通过消息队列构建 Kappa 架构过程中遇到的⼀些痛点?答案是肯定的,后⾯的篇幅会详细论 述。
3) 痛点总结
传统 T+1 任务
海量的TB级 T+ 1 任务延迟导致下游数据产出时间不稳定。
任务遇到故障重试恢复代价昂贵
数据架构在处理去重和 exactly-once语义能⼒⽅面比较吃⼒
架构复杂,涉及多个系统协调, 靠调度系统来构建任务依赖关系
Lambda 架构痛点
同时维护实时平台和离线平台两套引擎, 运维成本高
实时离线两个平台需要维护两套框架不同但业务逻辑相同代码, 开发成本高
数据有两条不同链路,容易造成数据的不⼀致性
数据更新成本大, 需要重跑链路
Kappa 架构痛点
对消息队列存储要求高,消息队列的回溯能⼒不及离线存储
全链路依赖消息队列的实时计算可能因为数据的时序性导致结果不正确
4)实时数仓建设需求
是否存在⼀种存储技术, 既能够⽀持数据高效的回溯能⼒ ,⽀持数据的更新, ⼜能够实现数据的批流读写,并 且还能够实现分钟级到秒级的数据接⼊?
这也是实时数仓建设的迫切需求 。实际上是可以通过对 Kappa 架构进行升级, 以解决 Kappa 架构中遇到的⼀ 些问题,接下来主要分享当前比较火的数据湖技术--Iceberg。
5) 数据湖 Apache Iceberg 的介绍
1.Iceberg 是什么
首先介绍⼀下什么是 Iceberg。官网描述如下:
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.
Iceberg 的官方定义是⼀种表格式,可以简单理解为是基于计算层 ( Flink , Spark) 和存储层 ( ORC, Parqurt,Avro) 的⼀个中间层,用 Flink 或者 Spark 将数据写入 Iceberg,然后再通过其他方式来读取这个 表, 比如 Spark, Flink, Presto 等。
Iceberg 是为分析海量数据准备的,被定义为 table format,table format 介于计算层和存储层之间。
tableformat 主要用于向下管理在存储系统上的⽂件, 向上为计算层提供⼀些接⼝ 。存储系统上的⽂件存储都 会采用⼀定的组织形式,譬如读⼀张 Hive 表的时候, HDFS ⽂件系统会带⼀些 partition,数据存储格式 、数 据压缩格式 、数据存储 HDFS 目录的信息等, 这些信息都存在 Metastore 上, Metastore 就可以称之为⼀种⽂ 件组织格式。
⼀个优秀的⽂件组织格式,如 Iceberg,可以更高效的⽀持上层的计算层访问磁盘上的⽂件,做⼀些 list、 rename 或者查找等操作。
3.Iceberg 的能力总结
Iceberg 目前⽀持三种⽂件格式 parquet,Avro, ORC,⽆论是 HDFS 或者 S3 上的⽂件,可以看到有⾏存也 有列存,后面会详细的去介绍其作用 。Iceberg 本身具备的能⼒总结如下, 这些能⼒对于后面我们利用 Iceberg 来构建实时数仓是非常重要的。
4.Iceberg 的文件组织格式介绍
下图展示的是 Iceberg 的整个⽂件组织格式 。从上往下看:
首先最上层是 snapshot 模块 。Iceberg 里面的 snapshot 是⼀个用户可读取的基本的数据单位,也就是说 用户每次读取⼀张表里面的所有数据,都是⼀个snapshot 下的数据。
其次, manifest。⼀个 snapshot 下面会有多个 manifest,如图 snapshot-0 有两个 manifest,而 snapshot-1 有三个 manifest,每个 manifest 下面会管理⼀个至多个 DataFiles ⽂件。
第三, DataFiles。manifest ⽂件里面存放的就是数据的元信息,我们可以打开 manifest ⽂件,可以看到 里面其实是⼀⾏⾏的 datafiles ⽂件路径。
从图上看到,snapshot-1 包含了 snapshop-0 的数据,而 snapshot-1 这个时刻写⼊的数据只有 manifest2, 这个能⼒其实就为我们后面去做增量读取提供了⼀个很好的⽀持。
5.Iceberg 读写过程介绍
Apache Iceberg 读写
首先,如果有⼀个 write 操作,在写 snapsho-1 的时候,snapshot-1 是虚线框,也就是说此时还没有发生 commit 操作 。这时候对 snapshot-1 的读其实是不可读的, 因为用户的读只能读到已经 commit 之后的 snapshot。发生 commit 之后才可以读 。同理,会有 snapshot-2,snapshot-3。
Iceberg 提供的⼀个重要能力,就是读写分离能力 。在对 snapshot-4 进行写的时候,其实是完全不影响对 snapshot-2 和 snapshot-3 的读 。Iceberg 的这个能力对于构建实时数仓是非常重要的能力之⼀ 。
同理,读也是可以并发的,可以同时读 s1 、s2 、s3 的快照数据, 这就提供了回溯读到 snapshot-2 或者 snapshot-3 数据的能力 。Snapshot-4 写完成之后,会发生⼀次 commit 操作, 这个时候 snapshot-4 变成了
实心,此时就可以读了 。另外,可以看到 current Snapshot 的指针移到 s4,也就是说默认情况下, 用户对⼀ 张表的读操作,都是读 current Snapshot 指针所指向的 Snapshot,但不会影响前面的 snapshot 的读操作。
Apache Iceberg 增量读
接下来讲⼀下 Iceberg 的增量读 。首先我们知道 Iceberg 的读操作只能基于已经提交完成的 snapshot-1,此 时会有⼀个 snapshot-2,可以看到每个 snapshot 都包含前面 snapshot 的所有数据,如果每次都读全量的数 据,整个链路上对计算引擎来说,读取的代价非常高。
如果只希望读到当前时刻新增的数据, 这个时候其实就可以根据 Iceberg 的 snapshot 的回溯机制,仅读取 snapshot1 到 snapshot2 的增量数据,也就是紫色这块的数据可以读的。
同理 s3 也是可以只读黄色的这块区域的数据, 同时也可以读 s3 到 s1 这块的增量数据,基于 Flink source 的 streaming reader 功能在内部我们已经实现这种增量读取的功能,并且已经在线上运行了 。刚才讲到了⼀个非 常重要的问题, 既然 Iceberg 已经有了读写分离,并发读,增量读的功能, Iceberg 要跟 Flink 实现对接,那 么就必须实现 Iceberg 的 sink。
实时小文件问题
社区现在已经重构了 Flink 里面的 FlinkIcebergSink,提供了 global committee 的功能,我们的架构其实跟社 区的架构是保持⼀致的, 曲线框中的这块内容是 FlinkIcebergSink。
在有多个 IcebergStreamWriter 和⼀个 IcebergFileCommitter 的情况下,上游的数据写到 IcebergStreamWriter 的时候,每个 writer 里面做的事情都是去写 datafiles ⽂件。
当每个 writer 写完自己当前这⼀批 datafiles 小文件的时候,就会发送消息给 IcebergmileCommitter,告诉它 可以提交了 。而 IcebergmileCommitter 收到信息的时,就⼀次性将 datafiles 的文件提交, 进行⼀次 commit 操作。
commit 操作本身只是对⼀些原始信息的修改, 当数据都已经写到磁盘了, 只是让其从不可见变成可见 。在这 个情况下, Iceberg 只需要用⼀个 commit 即可完成数据从不可见变成可见的过程。
实时小文件合并
mlink 实时作业⼀般会长期在集群中运行,为了要保证数据的时效性,⼀ 般会把 Iceberg commit 操作的时间周 期设成 30 秒或者是⼀分钟 。当 mlink 作业跑⼀天时,如果是⼀分钟⼀次 commit,⼀ 天需要 1440 个 commit,如果 mlink 作业跑⼀个月commit 操作会更多 。甚至 snapshot commit 的时间间隔越短,生成的 snapshot 的数量会越多 。当流式作业运行后,就会生成大量的小文件。
这个问题如果不解决的话, Iceberg 在 mlink 处理引擎上的 sink 操作就不可用了 。我们在内部实现了⼀个叫做 data compaction operator 的功能, 这个 operator 是跟着 mlink sink ⼀起走的 。当 Iceberg 的 mlinkIcebergSink 每完成⼀次 commit 操作的时候, 它都会向下游 mileScanTaskGen 发送消息,告诉 mileScanTaskGen 已经完成了⼀次 commit。
FileScanTaskGen 里面会有相关的逻辑,能够根据用户的配置或者当前磁盘的特性来进⾏文件合并任务的生成 操作 。FileScanTaskGen 发送到 DataFileRewitre 的内容其实就是在 FileScanTaskGen 里面生成的需要合并的 文件的列表 。同理, 因为合并文件是需要⼀定的耗时操作,所以需要将其进⾏异步的操作分发到不同的task rewrite operator 中。
上面讲过的 Iceberg 是有 commit 操作,对于 rewrite 之后的文件需要有⼀个新的 snapshot 。这里对 Iceberg 来说,也是⼀个 commit 操作,所以采用⼀个单并发的像 commit 操作⼀样的事件。
整条链路下来,小文件的合并目前采用的是 commit 操作,如果 commit 操作后面阻塞了,会影响前面的写⼊ 操作, 这块我们后面会持续优化。
6) Flink+Iceberg 构建实时数仓
1.近实时的数据接入
前面介绍了 Iceberg 既支持读写分离, 又支持并发读 、增量读 、小文件合并, 还可以支持秒级到分钟级的延 迟,基于这些优势我们尝试采用 Iceberg 这些功能来构建基于 Flink 的实时全链路批流⼀体化的实时数仓架 构。
如下图所示, Iceberg 每次的 commit 操作,都是对数据的可⻅性的改变, 比如说让数据从不可⻅变成可⻅, 在这个过程中,就可以实现近实时的数据记录。
2.实时数仓 - 数据湖分析系统
此前需要先进行数据接⼊, 比如用 Spark 的离线调度任务去跑⼀些数据,拉取,抽取最后再写⼊到 Hive 表里 面, 这个过程的延时比较大 。有了 Iceberg 的表结构,可以中间使用 Flink,或者 spark streaming,完成近实 时的数据接⼊ 。
基于以上功能,我们再来回顾⼀下前面讨论的 Kappa 架构, Kappa 架构的痛点上面已经描述过, Iceberg 既然 能够作为⼀个优秀的表格式, 既支持 Streaming reader, 又可以支持 Streaming sink, 是否可以考虑将 Kafka 替换成 Iceberg?
Iceberg 底层依赖的存储是像 HDFS 或 S3 这样的廉价存储,而且 Iceberg 是支持 parquet 、orc 、Avro 这样的 列式存储 。有列式存储的支持,就可以对 OLAP 分析进行基本的优化,在中间层直接进行计算 。例如谓词下推 最基本的 OLAP 优化策略,基于 Iceberg snapshot 的 Streaming reader 功能,可以把离线任务天级别到小时 级别的延迟大大的降低, 改造成⼀个近实时的数据湖分析系统。
在中间处理层,可以用 presto 进行⼀些简单的查询, 因为 Iceberg 支持 Streaming read,所以在系统的中间 层也可以直接接⼊ mlink, 直接在中间层用 mlink 做⼀些批处理或者流式计算的任务,把中间结果做进⼀步计算 后输出到下游。
替换 Kafka 的优劣势:
总的来说,Iceberg 替换 Kafka 的优势主要包括:
实现存储层的流批统⼀
中间层支持 OLAP 分析
完美支持高效回溯
存储成本降低
当然,也存在⼀定的缺陷,如:
数据延迟从实时变成近实时
对接其他数据系统需要额外开发工作
秒级分析 - 数据湖加速:
由于 Iceberg 本身是将数据文件全部存储在 工DmS 上的, 工DmS 读写这块对于秒级分析的场景, 还是不能够完 全满足我们的需求,所以接下去我们会在 Iceberg 底层支持 Alluxio 这样⼀个缓存,借助于缓存的能⼒可以实 现数据湖的加速 。这块的架构也在我们未来的⼀个规划和建设中。