本篇文章为数禾科技数据开发专家杨涵冰的演讲内容整理。主要内容包括:
- 特征平台概览
- 特征存储服务
- 流批一体方案
- 模型策略调用方案
一、特征平台概览
首先是特征平台的概览,整个特征平台分成四层,分别是数据服务、存储服务、计算引擎、原始存储。数据服务层提供向外的服务,主要包括四种:
- 一是传统的 API 点查;
- 二是圈选查询;
- 三是事件消息;
- 四是同步调用计算。
其中同步调用计算服务是即时计算的,相当于现场进行策略运算,而 API 点查服务是预先计算并存储的。为了提供数据服务,提供特征行存和特征列存两种服务方式,分别支撑 API 点查和圈选查询。计算引擎有两个,分别是离线运算引擎和流批一体运算引擎。特征平台的最底层是原始存储,原始存储是为了支持离线运算功能,而事件存储是为了支持流批一体运算。
下面以 MySQL 为例介绍简化的特征平台数据流转过程。
首先是离线部分,通过 Sqoop 或者其他的抽取工具将 MySQL 数库的数据抽取到 EMR,然后经过 Hive 运算,把最终的运算结果存到 HBase 和 ClickHouse 中,分别对应特征行存和特征列存,以提供 API 点查和圈选查询服务。同时 MySQL 的 Binlog 会实时写入 Kafka,然后 Kafka 的数据会被消费进入 Flink 流批一体运算引擎,同时Kafka的数据也会被消费进入到事件存储 HBase,事件存储 HBase 的数据也会提供给Flink流批一体运算引擎。经过引擎计算以后,数据被写入 HBase 和 ClickHouse 中,此外还会发事件消息。中转到事件存储 HBase 的数据可以提供实时调用服务。
二、特征存储服务
接下来介绍特征存储服务。
我们将特征分为四类,分别是:
- 同步特征:实时写入、离线修正、流批一体。
- 即时计算特征:API 调用时运算、线下批量计算,逻辑一致。
- 实时特征:传统实时链路,实现复杂实时逻辑,一般可使用流批一体代替。
- 离线特征:传统离线链路,实现复杂离线逻辑。
为什么一定要有离线的链路,原因有以下几点:
一是实时链路是一个纯增量的链路,它的链路会很长,并且在任意的环节都有可能发生问题,一旦出错,数据将会在很长的一段时间都无法被自动修正。
二是实时链路对时效性有要求,特别是涉及到多流 join 的时候,一旦有延迟,需要尽快返回一个降级结果。为了控制实时特征的最终错误率,并且将错误限制在一个较小的时间段内,需要进行离线链路修正。特征存储服务会用两种方式来进行修正,一种就是同步特征,其本身自带修正,是流批一体的链路;其他特征一般是通过实时+离线+即时计算的组合方式。
下面以 MySQL 为例,介绍下存储服务的整体数据流。离线部分,通过 Sqoop 抽取工具将 MySQL 数库的数据抽取到 EMR,然后经过 Hive 运算,把最终的运算结果存到 HBase 和 ClickHouse 中。同时 Binlog 会实时写入 Kafka,然后 Kafka 的数据会被消费进入 Flink 流批一体运算引擎,经过引擎计算以后,数据被写入 HBase 和 ClickHouse 中。HBase 和 ClickHouse 提供 API 点查和圈选查询服务。
2.1 实时特征数据流
在实时特征的数据流中,MySQL 通过 Binlog 写入 Kafka,以及其他埋点类的 Kafka 数据,经过运算以后将结果写入另外一个 Kafka,最后消费数据写入 HBase 和 ClickHouse。
2.2 离线特征数据流
在离线特征数据流中,MySQL 通过 Sqoop,OSS 通过 Spark 或者其他方式抽取,Kafka 通过 Flume 抽取进入 EMR,然后用 Hive 或者 Spark 运算,同时写进 HBase 和 ClickHouse。
2.3 同步特征数据流
在同步特征数据流中,MySQL 的 Binlog 会写进实时的 Kafka,然后 Kafka 的数据会被实时写入事件存储,同时 MySQL 也会离线修正和初始化。Flink 同时做流处理和批处理,写进 HBase 和 ClickHouse。
2.4 即时计算特征数据流
在即时计算特征数据流中,依托于 HBase 和 ClickHouse 的数据,提供 API 点查和圈选查询服务。
以上就是整个存储服务的介绍,该部分内容涉及到特征存储服务的大部分,如图中橙色部分所示。
三、流批一体方案
在只提供了特征存储服务的时间里,我们发现了很多问题以及一些业务诉求。首先是一些问题:
- 在对现有模型策略精耕细作之前,还有没有什么数据没有被使用?比如说 MySQL 里面状态变化的时间点数据。
- 输入项离线逻辑是否已经足够完整,为什么实时输入项需要重新梳理与补充逻辑?离线输入项想要编程实时的,需要重新梳理逻辑,有些甚至过于复杂,以至于用传统的方式无法完成实时转换。
- 不确定使用场景,无法区分点查和跑批,能不能同时覆盖?对于很多业务人员来说,并不知道想要的模型和策略最终需要用跑批还是点查,有没有什么办法能同时满足这两种需求。
- 流式处理逻辑难以理解,为什么要流 Join,不能直接“取数”吗?对于模型开发人员,他们不了解流处理过程,因此实时特征的制作难以下沉到模型开发人员。
- 实时模型策略空跑测试需要很长时间,能不能缩短?
- 模型策略开发训练很快,上线开发实时输入项却需要很久,能不能加速?
对于这些问题,我们提出了一些方案:
- 【数据】 存储状态变化数据,支持还原任意时刻的数据切片状态。这样做还有一个额外的好处,通过流批一体方案进行模型训练的时候不会有特征穿越的问题,因为没有办法拿到未来的数。
- 【逻辑】 流批一体,以流为主,逻辑一致,无需验证口径。训练的时候用这份数据作为训练,上线及回测时也是用相同的数据,可以保证最终的结果一致。
- 【执行】 流、批、调用一体化,自适应不同场景。
- 【开发】 使用“取数”而不是流合并,封装实时流特有概念,降低实时开发门槛。
- 【测试】 支持任意时间段回溯测试,增加实时开发测试速度。
- 【上线】 自助式的流批一体模型开发上线,减少沟通环节,增加上线效率。
传统的实时流方案有 Lambda 和 Kappa 两种。
Lambda 提供了实时和离线的两套逻辑,最终在数据库中将两者合并起来。Lambda 的优点是架构简单,很好地结合了离线批处理和实时流处理的优点,稳定且实时计算成本可控,并且离线数据易于订正;缺点是实时、离线数据很难保持⼀致结果,并且需要维护两套系统。
3.1 流批一体方案
Kappa 则是全部都用实时的逻辑,将历史的数据存下来,每次得到一个切片数据,最后合并起来。Kappa 的优点是只需要维护实时处理模块,可以通过消息重放,无需离线实时数据合并;缺点是强依赖消息中间件缓存能力,实时数据处理时存在丢失数据,这个缺点在金融领域是不能容忍的。
由于 Kappa 在抛弃了离线处理模块的同时也抛弃了离线计算更加可靠稳定的特点,而 Lambda 虽然保证了离线计算的稳定,但是双系统的维护成本非常高,并且两套代码的运维非常复杂。
因此,我们提出了 Lambda+Kappa 的流批一体方案。如图所示,数据流转的前半部分是 Lambda 架构,其中心是一个 HBase 的事件存储;后半部分是 Kappa 架构,供用户完成流处理和批处理。
上图以 MySQL 为例展示了整体流批一体方案。 首先是 MySQL 的 Binlog 进入 Kafka,同时通过离线修正以及切片把数据送到事件中心,同时用相同的 Kafka 完成实时流的触发,然后事件中心会提供数据获取及离线跑批服务。最后由元数据中心统一管理数据,统一维护数据,以避免同步的问题。Flink 提供整个逻辑服务。
3.2 事件中心
图中的事件中心使用 Lambda 架构存储所有变化数据,每日修正,通过冷热混存与重加热机制追求最佳性价比。此外,我们参考 Flink 增加水印机制,确保当前值同步完成。最后,事件中心提供消息的转发机制以及异步转同步的的机制,以“取数”代替流 Join,消息转发机制,异步转同步。支持触发——消息接收及触发——轮询式调用,并同时赋予该接口回溯的能力。
下面介绍事件中心的村塾数据流,如图所示,MySQL、Kafka 等多个数据源通过不同的路径转发到 Kafka,然后 Flink 直接消费 Kafka,并会实时的写入 HBase 热存。此外,离线修正的数据通过 EMR 也写入 HBase 热存。另有一套 Replica 机制完成 HBase 热存和 HBase 冷存之间的复制。HBase 冷存的数据也会通过重新加热进入到 HBase 热存中。
整个事件中心的存储结构如图所示,冷存里面只放主体数据,热存里面除了主题数据以外,还有三个表用来做不同的 index 工作热存一般 TTL 为 32 天,有特殊的情况也可以调整。
事件中心的读取数据流中,实时触发是走是 Kafka,回溯和取数都是走 HBase 热存,内部的重加热机制完成 HBase 冷存到 HBase 热存数据的更新,这部分逻辑对于开发人员是透明的,开发人员不需要关注数据来自于哪里。
下面介绍事件中心的水印机制与流 Join。
假设我们要对两个流进行 Join,也可以简单理解为有两张表,通过某外键进行关联。当任何一张表发生变更时我们都需要至少触发一次最终的完整的 Join 后的记录。我们将两个流分别记录为 A 和 B,并且假设 A 流先到。那么在打开事件中心水印机制的情况下,A 流触发时,A 流的当前事件已经被记录在事件中心中。此时分为两种情况:
- 在事件中心中可以取到 B 流的相关数据,那么说明在A流当前事件记录进事件中心到运行至读取 B 流相关数据的时间段内,B 流已经完成了事件中心的记录,此时的数据已经完整。
- 在事件中心中无法取到 B 流的相关数据,那么由于事件中心水印机制,说明此时 B 流相关事件尚未触发。而由于A流当前事件已经被写入事件中心,那么当 B 流相关事件被触发时,一定能获得 A 流的当前事件数据,此时数据也是完整的。由此,通过事件中心水印机制,即可确保用“取数”取代流 Join 后至少会有一次拥有完整数据的计算。
触发消息接收通过消息转发完成。当外部系统发起请求后,会去转发 Kafka,然后 Kafka 的数据同时会进入事件中心,接下来触发相应的计算,最后去用消息队列发送计算结果,外部系统接收这个消息的结果。
同时也提供轮询式的服务,同样也是消息转发,前面与消息接收机制都一样,只是多了一个事件中心,重新存储计算结果,然后提供服务。
3.3 取数一致
在取数的时候还有另外一个问题,那就是不一定能拿到最新的数据,除非直接从元数据库获取数据,但这种操作一般是被禁止的,因为会给主库带来压力。为了保证数据的一致性,我们采取了一些措施。首先我们将取数一致性分为四个级别,分别是:
最终一致:经过一段时间后能访问到更新的数据,整个流批一体方案默认保证最终一致。
触发流强一致(可延迟) :保障触发流中的当前数据及早于当前数据的数据在对触发流的取数过程中能获取到。使用水印方案,水印不满足时进行延迟。
取数强一致(可延迟) :保障取数时早于用户提出的时间要求的数据均能获取到。使用水印方案,水印不满足时进行延迟。
取数强一致(无延迟) :保障取数时早于用户提出的时间要求的数据均能获取到。当水印不满足时直接从数据源增量补足,增量取数会对数据源带来压力。
3.4 流批一体作业
我们使用 PyFlink 实现流批一体作业,使用 python 是因为模型和策略开发人员更加熟悉 Python 语言,而Flink保证了逻辑一致性。基于 PyFlink,我们封装了复杂的触发逻辑、复杂的取数逻辑,并能够复用代码片段。
PyFlink 的代码组织结构如图所示,包含出发、主逻辑、输出三部分。这三部分可以不用自己实现,只需要选择已经封装好的输出。
Flink 整体数据流也简单,最上面是触发逻辑,然后触发主逻辑,主逻辑里面会有取数逻辑去完成取数,最后是输出的逻辑。在这里,触发逻辑、取数逻辑和输出逻辑的底层封装是随流批变化自适应的,所以可以同时确保输入和输出不变,逻辑本身在绝大多数情况下是不需要考虑流批环境变化。
下面介绍一个 PyFlink 典型的使用流程,首先选择触发流,编写取数及预处理逻辑,可引入已发布的取数或处理逻辑代码,设置取样逻辑并试运行,获取试运行结果,在分析平台中进一步分析与训练。训练结束想要发布模型时,可在作业中选择训练完成的模型,如有需要可以设置初始化相关参数。最后是模型发布上线。
四、模型策略调用方案
我们提供了四种调用方案:
- 特征存储服务方案。Flink 作业进行预运算,将运算结果写入特征存储服务平台,通过该数据服务平台对外服务。
- 接口触发——轮询方案。调用并轮询事件中心消息转发接口,直到 Flink 作业返回运算结果。
- 接口触发——消息接收方案。调用事件中心消息转发接口触发 Flink 作业运算,接收 Flink 作业返回的运算结果消息。
- 直接消息接收方案。直接接收 Flink 作业返回的运算结果消息。
4.1 特征存储服务方案
特征存储服务分为三种情况,分别是实时、离线修正和离线初始化。当有新的变量上线或者老的变量发生逻辑变化,需要对全量的数据进行一次刷新,这时候需要离线初始化。实时流是实时触发的,离线修正和离线初始化都是批量触发。如果有取数逻辑则从 HBase 里面取数,当然取数的过程中实时和离线的作业肯定不一样,但是开发人员不用关注,因为已经封装好了。实时的 Flink 作业结果会发到 Kafka,离线修正和离线初始化的结果都会进 EMR,最后写进特征存储,也就是 HBase 和 Clickhouse。
上图展示了特征存储服务方案的时序,Kafka 触发了 Flink,然后Flink运算完写如特征存储。那么在刚触发的时候,如果有外部调用,是无法获取到最新的数据的,必须要等到运算完成写入存储以后才能获取到更新的数据。
4.2 接口触发——轮询方案
在接口触发——轮询方案中,触发调用会触发到消息转发,转发给 Kafka,然后 Flink 将运算的结果吐入 Kafka。如果这个时候没有超过单次请求的时间,就会直接返回,这个时候触发轮询就退化成单词调用了。反之,则会继续进入事件存储 HBase,通过轮询调用获取结果。
接口触发——轮询方案的时序图如上图所示,当有一个外部调用触发以后,会有一个消息转发触发了 Flink 运算。在 Flink 运算及写入数据库的过程中,会有多次轮询,如果在固定的时间还没有办法获取到,则会提示超时;如果下一次轮询的时候,数据已经写入了,则获取成功。
4.3 接口触发——消息接收方案
接口触发——消息接收方案是对轮训的简化。如果业务系统支持支持消息接收,那么整个链路变得比较简单,只需要通过消息转发服务触发计算,然后监听结果消息就可以了。
接口触发——消息接收方案的时序是串行的。触发了以后,进行 Flink 运算,运算完成以后把结果数据通过消息接收机制传输给调用方。
4.4 直接消息接收方案
直接消息接收方案就是纯流式的,通过 Kafka 触发 Flink 计算,计算完数据传入消息队列,然后等对方订阅接收就可以了。整个时序也是非常的简单,如下图所示。
我们把数据的使用情况分成三种,分别是即时调用、实时流、离线批数据,他们的时效性是依次递减的。我们通过事件中心把这三种情况注册到一起,最后只要通过事件中心作为中转给 Flink 提供相关的数据,对于 Flink 来说,不用关心到底是通过哪种方式来调用。
最后,总结下流、批、调用一体化的四种方案:
- 特征存储服务方案:通过特征存储服务,提供持久化的特征存储。提供 API 点查与特征圈选服务。
- 接口触发——轮询方案:通过事件中心的消息转发与消息查询服务,提供同步调用计算服务。
- 接口触发——消息接收方案:通过事件中心的消息转发服务,提供事件消息服务。
- 直接消息接收方案: 支持复杂事件触发,提供事件消息服务。