导读: 近年来,马上消费的业务体量呈飞跃式增长,每天产生数据可达上千亿条,如何更高效挖掘这些数据的价值,成为了其必须要面临的挑战。随着各业务对实时数据分析的需求越来越强烈,马上消费于 2021 年引入 Apache Doris 构建实时数仓,目前已服务 10+业务团队的数据应用,99% 以上的查询响应耗时在 5 秒内,数据时效延迟均不超过 1 分钟,解决了其对于实时数据分析的强烈诉求,最终效果得到各业务部门的认可与肯定。
作者|大数据架构师 陈政
马上消费金融股份有限公司(简称“马上消费”)是一家经中国银保监会批准,持有消费金融牌照的科技驱动型金融机构。秉承“科技让生活更轻松”的使命,马上消费以用户为中心,聚焦普惠金融,通过科技赋能创新,致力于打造成为全球最被信赖的金融服务商,为有金融服务需求的社会各阶层和群体提供小额分散的消费金融服务。
自2015年6月正式开业至今,马上消费的业务体量呈飞跃式增长,截止目前品注册用户数达到一亿多。作为一家面向 C 端用户的互联网金融企业,我们不断在探索数据提升客户体验、数据驱动业务增长的方式,关注数据要素升级对生产关系所带来的深远影响。
业务需求
基于业务需要,马上消费内部自主研发了 400 多套与零售信贷相关的系统,同时与外部合作方多套系统进行对接,来服务整个业务链路的流转。这些系统源源不断的产生数据,每天可达上千亿条。如何更高效挖掘这些数据的价值,成为了我们必须要面临的挑战。
在正式建设实时数仓之前,业务上的数据应用以离线为主,用户使用 T+1 的数据来做数据建模和分析,依托数据进行业务决策,并回流到业务系统中。但随着业务体量的飞跃式增长、以及业务模式的不断优化升级,我们对数据分析的实时性提出了更高诉求,主要包括两类:
- 数据时效的实时。相比过去 T+1 的模式,希望在分析时的数据包含近 10 分钟以前的新增内容。
- 查询响应的实时。 数据的查询响应要足够快,最好能在几秒钟甚至毫米级返回结果。
早期架构
早期公司内统一的大数据平台,构建在传统的 Hadoop 体系之上,以服务离线数据分析为主,缺乏统一的实时数据平台。每天晚上集中抽取业务数据到 Hive 表中,通过离线调度任务完成数据加工后,提供给用户做数据查询和分析。
Hadoop 离线架构的不足
这套技术架构对离线数据业务有较好的支撑,但在应对更广泛的实时分析场景时,痛点也很明显:
- 数据的时效性不足。平台中的 Hive 表,都是 T+1 的数据,没有当日最新数据。用户若想在平台上分析当日最新业务情况,简直是无米之炊,业务决策的准确性和及时性很难得到提升。
- 数据查询效率不足。用户查询 Hive 表的数据时,通常需要等待数十秒甚至数分钟才能返回结果,工作效率受限。虽然也引入了 Presto、Spark 等计算引擎来提升查询效率,但查询效率的提升始终不理想,并且资源消耗也比较大。
- 数据维护成本高。离线数据从采集到加工,链路复杂耗时长,任何一个环节出问题都将影响数据的及时产出。另外,当需要修复历史数据、或者是调整数据计算逻辑时牵涉面广,更是费时费力。除此之外,常规的离线数仓分层模型也带来一些问题,所有数据都是单独一份物理表,导致衍生出来的表数量庞大、存储消耗不低。
- 技术成本较高。围绕 Hadoop 体系构建的平台,组件依赖较高,动辄需要数十种基础组件组合起来,才能满足不同功能需要。每一种组件复杂度都很高,各自难免存在一些缺陷,需投入大量的人力物力去维护。
探索之路
在公司没有统一建设实时数仓之前,部分业务团队为了满足实时数据分析的需要,也曾各自做过一些努力,但受限于实施的复杂度,基本都只局限在极少数特殊业务。
- 有的团队采用过相对粗暴的解决方式,直接在业务系统 MySQL 上面查询最新的数据,虽然查询速度慢但可以保证是实时数据,但对于跨数据库实例的关联分析,使用这种方式很难解决;
- 有的团队采用过相对进阶的方式,将数据实时同步到 Elasticsearch、HBase 中,在其中完成简单的数据分析;
- 有的团队也采用过较前沿的方式,使用 Flink 之类的流式计算框架,对流式数据进行预计算之后写入 HBase、ES 等存储,供用户和系统访问。
以上各业务团队对实时数据分析的分散式初步探索中,也发现了一些问题:
- 对分析的支持很不灵活。MySQL、ES、HBase 等存储组件,各自擅长的领域不同,对大规模数据的 OLAP 查询并不友好。用户要么只能用于简单的单表数据分析场景,要么需要设计很复杂的数据模型和数据处理逻辑来实现复杂的分析需求,这使得研发和维护成本非常高,需求交付时间很长。
- 重复建设带来资源浪费。各业务团队采用零散的方案去解决需求,各自搭建 ES、HBase 小规模集群,资源利用率低,并投入技术学习成本,最终却很难共享复用。另外,不同团队的业务分析中都会用到账务、审批等核心数据,最终造成数据的重复拉取和存储。
- 数据服务质量难以保障。数据通过不同的链路、不同的写入方式到达目标端供查询,服务链路的稳定性难以保障,导致数据的准确性、完整性、一致性等质量问题频发,各团队花费了大量精力也难以很好地保障整体数据服务质量。
架构选型
从我们当前大数据整体架构来看,实质上只需对原有架构做一些扩充和改造即可实现当前的需求。我们已有的平台产品,比如流式计算平台、数据交换平台等,已经足够支撑实时数据的采集和处理,这也为我们构建实时数仓奠定了良好基础。因此,我们当前较为迫切的是需要补充一款综合能力强劲的分析型数据库来解决问题。
对于这类分析型数据库,我们有一些基本诉求:
- 支持分钟级别延迟的数据近实时写入(insert)、更新(update)、删除(delete);
- 支持标准 SQL 语法来查询和分析数据,以及常用的函数;
- 千万级别数据量的多表关联查询,能够在秒级别返回结果;
- 存储容量和并发支持能够横向扩展;
- 既能较好支持 OLAP 查询,也能根据主键查询数据毫秒级响应;
- 具备一定的数据聚合模型或物化视图的能力,加速查询;
围绕以上基本诉求以及业务需求,我们对业界主流的商业和开源组件进行了调研,包括 Clickhouse、Doris、TiDB、Druid、Kudu、Kylin、Hologres 等产品。我们做调研的时间截止在 2020 年底,在这个时期内主流 OLAP 引擎或数据库都有其各自的优势与短板。我们对调研产品的功能特性和原理方面进行了分析对比,也对部分组件做了性能等方面的 POC 测试,最终决定引入 Apache Doris 作为我们实时数仓的基座。
我们对新技术的引入一直比较审慎,Apache Doris 以其全面的功能和极速的性能表现吸引了我们。自 2017年开源之后被小米、美团、链家、搜狐等数十家互联网知名企业引入使用。在 2018 年捐献到 Apache 基金会之后更是一路高歌猛进,各项核心能力大幅增强,并在 2022 年 6 月升级为 Apache *项目,截止 2022 年底,Apache Doris 已被全球 1000 多家企业生产系统使用。它的几大重点能力对我们而言非常适用:
- 简单易用:其架构简洁只有两个进程服务,不依赖其他系统,部署和运维管理简单;使用标准 SQL 做数据查询,兼容 MySQL 协议;
- 极致性能:依托现代化的 MPP 架构、列式存储、预聚合视图、数据索引等实现,在低延迟和高吞吐查询上达到了极速性能;
- 统一数仓服务:一套平台系统,可同时支持实时数据服务、交互式数据分析等场景;
- 支持联邦查询:支持对 Hive、MySQL、Elasticsearch 等数据库的联邦查询分析能力;
新架构演进
技术架构升级
各业务对实时数据分析的需求越来越强烈,但原有技术架构并不能很好的支撑当前的需求。在分析各业务的需求和痛点之后,我们决定对架构进行升级演进,构建公司级统一的实时数仓平台。
在选定 Apache doris 作为我们的实时数仓底座之后,我们规划并构建了统一的实时数据服务链路。一方面我们以 Apache Doris 为核心,将实时增量数据、历史存量数据整合到 Doris 内提供极速查询服务;另外我们对数据交换平台、流式计算平台、消息服务总线、元数据中心、数据服务平台等都做了适配改造;最终达到实时数据统一采集、统一分析、统一服务的目标。
同时对于整个大数据平台而言,我们也在离线数据平台之外扩充了实时数据服务能力,进一步增强了大数据平台对不同业务场景的支撑能力,完善了大数据技术架构蓝图:
架构升级的落地
我们对实时数仓的建设目标绝不仅仅是部署一套 Doris 数据库,当然也不是一朝一夕可建设成熟,只有诸多与之配套的能力逐步完善后,才能真正高效保质的运转起来。只不过,Doris 在我们实时数仓的建设中至关重要。
围绕新的实时数据架构,我们做了大量的配套工作:
- 实时数据的采集交换。 将实时数据采集到 Doris 中去。我们在现有数据交换平台上扩展功能,支持 MySQL Binlog 实时数据的采集和分发,并能在源头上对分库分表数据进行合并,以便下游处理。同时新增 Hive 到 Doris 的数据批量导入功能,解决大量历史数据需要加载到 Doris 库表的问题。
- 实时消息服务总线。 不管是采集自 MySQL 的 Binlog 数据,还是应用埋点上报的数据等,均制定统一的数据格式,同时提供标准 SDK 来保障数据的规范性。另外对于消息数据,我们增加了消息元数据中心服务,来方便上下游对数据 Schema 的解析与演变。源端的实时数据,由专人负责接入到统一的 Kafka 中,共享给下游各类业务使用,避免数据被重复接入。
- 扩展流式数据的处 理。我们的流式计算平台底层基于 Flink,为方便用户解析统一的消息数据,我们扩展了Flink SQL 函数以及 Kafka Connector。由于当时社区并未提供 Flink 跟 Doris 的读写连接器,我们自研了 Flink Doris Connector,并保障数据写入的 Exactly Once语义,性能上也压测到单表最大 20 万 QPS 的吞吐。
- 增强 Doris 的服务能力。我们部署多个 FE 节点提供服务,面向访问端我们基于 Nginx 反向代理来实现不同 FE 节点之间的负载均衡、故障转移。扩展 Bitmap、字符串解析等函数,方便用户分析数据。Doris 各节点上的日志,统一集成到 ELK 供查询,以及将性能指标集成到 Grafana 实现监控告警。期间偶尔触发到 Doris 的缺陷也对其进行修复(或通过升级 Doris 版本的方式解决),比如数据写入过于频繁时,个别表 Tablet 与 Partition 中的版本元信息不一致导致查询异常,以及 FE 节点 HTTP 接口的兼容性 Bug 导致启动时拉取元数据失败等等。
- 完善实时数据研发规范。 在 Doris 之上的数据模型设计、数据开发方式,相比常规的离线数仓都有较大区别。我们对于数据采集、数据表模型选择、数据域划分、分区分桶策略、数据生命周期、数据权限分配、各数据团队分工等方面都制定了规范,一方面期望实时数仓的数据从一开始就能够有序组织和管理,同时也罗列一些技术规范让开发者有据可循,达到性能最优,少走弯路。
业务实践
自 2021 年引入 Apache doris 之后,我们小步快跑,快速在一些业务场景中得到实践,初具效果。目前我们线上 Doris 集群有 50 个 BE 节点,已接入将近 800 个数据表,支撑每天约 5 万余次的查询分析;服务于 10 多个业务团队的数据应用,在其之上产出的数据报表、数据接口大约在 300 个左右;99% 以上的查询响应耗时在 5 秒内,数据时效延迟均不超过 1 分钟。
一些重点业务在对接实时数仓之后,均取得了不错的效果,比如几个典型的业务场景:
- 广告投放:营销团队观察不同广告投放渠道、素材等维度的曝光、花费及用户转化情况,及时调整资金投放策略;
- 风控策略评估:风控团队基于实时提现通过率等审批指标,判断审批策略效果,及时优化策略保障审批质量;
- 消保降诉:运营团队基于话务、客户关系相关实时数据指标,分析电销运营质量及关键事件,保障客户服务质量;
- 资管业务:资管团队基于现场过程实时数据、实时质检合规敏感词、实时催收投诉提供资管质检实时业务监控分析服务;
- 科技大屏:各大部门基于大屏实时展示和监控业务及技术指标,例如风控大屏、资管大屏、人工智能大屏等;
虽然各个业务场景关注的数据指标不同,数据处理逻辑不同,但围绕 Doris 的数据开发模式基本一致。我们通过自研的工具将不同来源的数据实时投递到 Kafka,用户使用 Flink SQL 消费 Kafka 数据后写入 Doris,在 Doris 中按照经典的数仓分层模型来组织数据。Doris 作为我们数据链路的末端,被报表工具、业务系统直接访问,快速查询数据。
在我们围绕 Doris 的实时业务实践过程中,也探索了一些不一样的实时数据开发模式:
- 数据不直接同步到 Doris。我们并没有选择将源头实时数据直接同步到 Doris,而是先把所有数据都集中发送到了 Kafka,启动一些 Flink 任务消费数据写入 Doris。这样做主要有两方面的考虑,一是 Kafka 中的数据可被重复利用,除了消费数据写入 Doris,其他特殊业务也可消费后流转到其他流程中,避免实时数据重复采集;二是数据在写入 Doris 之前,我们的 Flink 任务可预先做一些数据清洗或者聚合,尽量减少在 Doris 中再次处理。
- 上层尽量使用视图表。 我们对公司的核心数据做了主题分类,遵循维度建模的思想把数据也分为了ODS、DWD、DWS、ADS 等层次,每一层的数据的聚合维度不同。区别于离线数仓的数据加工模式,我们围绕 Doris 的实时加工方式有较大不同:我们基本只有 ODS、DWD 层的数据是物理表,DWS、ADS 层中的数据主要为视图表。这样做的初衷也很简单,就是想减少传统方式的层层数据预处理,最大化利用 Doris 的极速分析能力直接从明细表中分析数据。这样做有几个大的收益,一是物理表数量少了,降低了存储开销;二是减少了大量的数据预处理计算任务,资源消耗低、维护成本低、业务交付速度快;三是上层数据基于视图表,当业务规则变化时,快速重建视图逻辑即可,无需把层层数据再重新处理一遍,简单高效。
- 计算逻辑尽量交给 Doris。在没有 Doris 之前,比如 Flink 结合 HBase 的架构,我们通常会在 Flink 任务端做大量的数据预处理(包括复杂的流表关联、维度聚合、窗口计算)。而在引入 Doris 之后,我们将数据计算逻辑尽可能的转移到 Doris 内,Flink 只承担简单的数据读写与转换。这对数据开发者来说无疑会更友好,毕竟对于普通开发者来说 Flink 的门槛较高,比如对延迟数据的处理、历史数据回溯等方面很难控制。当然,我们会对数据的体量和分析复杂度做综合考虑,在必要时仍会在 Flink 端对数据先做聚合,灵活调整。
- 合理利用 Doris 特性。我们期望基于 Doris 自身的极致查询性能,来降低数据加工链路的复杂度,提高需求交付效率。但任何数据库或计算引擎的性能,都受其自身设计、集群规模、数据规模、计算复杂度的综合影响,Doris 性能虽然很优越但也不是万能。我们尽量合理使用 Doris,来让他的性能表现更好,比如合理设计表的分区分桶、设定数据清理周期、主键字段的类型等;ODS 和 DWD 层的数据优先使用 Unique 模型,部分大表配套建立 Rollup 表;查询语句的 Join 关联注意大表在左小表在右,合理使用 Runtime Filter 等。
总结与展望
自 2021 年引入 Doris 构建实时数仓以来,很多以前只能使用头一天数据做离线分析的业务,纷纷尝试以新的方式来完成实时数据分析、制作实时数据报表。在新的实时数据架构下,数据分析的时效性有了极大的提升,不同类型用户均从中获得收益:
- 对于业务人员、数据分析师、决策者,他们能够基于最新的数据,更快速的洞察业务状态,及时调整业务策略;
- 对于数据开发者,实时数据的开发变得更简单,以前需要花费好几天才能完成的工作,现在只需一天甚至半天时间即可完成;
- 对于实时平台管理方,数据的存储成本、系统的运维成本、业务的运营成本均得到一定程度的降低。
技术和数据服务业务的脚步永不停歇,当业务与数据规模持续变化后,新的技术挑战必然又会产生。在当前环境下,不管是解决现有问题、还是为将来做准备,我们还有很多工作需不断深入:
- Doris 能力的增强。由于引入 Doris 较早,我们升级几次之后停留在 0.15.3 版本,对比社区 1.1.4 版本和即将发版的 1.2 版本差距太大。新版本在性能和稳定性上都有了巨大的提升,比如向量化引擎等、Hudi 数据查询等,版本升级是我们近期首要的计划。另外 Doris 即将发布的版本将支持跨集群数据复制、存算分离的能力,我们的核心业务对这些需求也非常强烈,后续我们将联合社区一同完善,推进落地应用。
- 生态系统的完善。Doris 成为了我们实时数仓的主力数据库,但同属于我们大数据生态的一部分,数据安全、数据质量等要素也是需要被共同关注的内容。原有元数据管理、数据质量、数据安全、运维监控等平台侧重对离线 Hadoop 体系数据的管理,需进一步扩展,支持将 Doris 链路的实时数仓纳入统管。另外尤其是面向数据开发者、数据分析用户,优化数据开发流程和工具,让大家获得更好的体验。
- 更多业务的覆盖。一方面源自于业务侧旺盛的需求量,另一方面我们也期待基于 Doris 的实时数仓体系能够发挥更大的作用,在数据的接入数量和数据模型上,能够支撑到更多的业务域。在服务的应用类型上,除了传统的报表应用,也能支撑更多偏中后端的数据产品,比如用户行为分析平台等。