数据库相关中间件(下)

时间:2022-01-22 23:41:38

数据增量订阅与消费

基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql.

有关数据增量订阅与消费的中间件回顾一下:

数据库相关中间件(下)

  • 增量订阅和消费模块应当包括binlog日志抓取,binlog日志解析,事件分发过滤(EventSink),存储(EventStore)等主要模块。

  • 如果需要确保HA可以采用Zookeeper保存各个子模块的状态,让整个增量订阅和消费模块实现无状态化,当然作为consumer(客户端)的状态也可以保存在zk之中。

  • 整体上通过一个Manager System进行集中管理,分配资源。

Canal

Canal架构图:

数据库相关中间件(下)

说明:

  • server代表一个canal运行实例,对应于一个jvm

  • instance对应于一个数据队列 (1个server对应1..n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)

  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)

  • eventStore (数据存储)

  • metaManager (增量订阅&消费信息管理器)

说明:一台机器下部署一个canal,一个canal可以运行多个instance(通过配置destinations等), 一般情况下一个client连接一个instance(每个instance可以配置standby功能), 可以多个client连接同一个instance,但是同一时刻只能有一个client消费instance的数据,这个通过zookeeper控制。

数据库同步

Otter

背景:alibaba B2B因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了杭州和美国异地机房的需求,同时为了提升用户体验,整个机房的架构为双A,两边均可写,由此诞生了otter这样一个产品。

otter第一版本可追溯到04~05年,此次外部开源的版本为第4版,开发时间从2011年7月份一直持续到现在,目前阿里巴巴B2B内部的本地/异地机房的同步需求基本全上了otter4。

基于数据库增量日志解析,准实时同步到本地机房或异地机房的mysql/oracle数据库,一个分布式数据库同步系统。

工作原理

数据库相关中间件(下)


原理描述:

1. 基于Canal开源产品,获取数据库增量日志数据。

2. 典型管理系统架构,manager(Web管理)+node(工作节点)

  • manager运行时推送同步配置到node节点

  • node节点将同步状态反馈到manager上

3. 基于zookeeper,解决分布式状态调度的,允许多node节点之间协同工作。

Otter的作用

1. 异构库

  • mysql->mysql、oracle. (目前开原版只支持mysql增量,目标库可以是mysql或者oracle,取决于canal的功能)

2. 单机房同步(数据库之间RTT(Round-Trip Time)<1ms)

  • 数据库版本升级

  • 数据表迁移

  • 异步二级索引

3. 跨机房同步(比如阿里巴巴国际站就是杭州和美国机房的数据库同步,RTT>200ms)

  • 机房容灾

4. 双向同步

  • 避免回环算法(通用的解决方案,支持大部分关系型数据库)

  • 数据一致性算法(保证双A机房模式下,数据保证最终一直性)

5. 文件同步

  • 站点镜像(进行数据复制的同时,复制关联的图片,比如复制产品数据,同事复制产品图片)

单机房复制示意图

数据库相关中间件(下)

说明:

- 数据On-Fly, 尽可能不落地,更快的进行数据同步。(开启node load balance算法, 如果Node节点S+ETL落在不同的Node上,数据会有个网络传输过程)

- node节点可以有failover/loadBalancer.

SETL

S: Select

为解决数据来源的差异性,比如接入canal获取增量数据,也可以接入其他系统获取其他数据等。

E: Extract

T: Transform

L: Load

类似于数据仓库的ETL模型,具体可为数据join,数据转化,数据加载。

跨机房复制示意图

数据库相关中间件(下)

数据涉及网络传输,S/E/T/L几个阶段会分散在2个或者更多Node节点上,多个Node之间通过zookeeper进行协同工作(一般是Select和Extract在一个机房的Node, Transform/Load落在另一个机房的Node)

node节点可以有failover/loadBalancer。(每个机房的Node节点,都可以是集群,一台或者多台机器)

More:

  • Otter调度模型:batch处理+双节点部署。

  • Otter数据入库算法

  • Otter双向回环控制

  • Otter数据一致性

  • Otter高可用性

  • Otter扩展性

异地双活数据架构基础设施DRC

所谓DRC,就是Data Replication Center的缩写,数据复制中心。这种复制是同步的,支持异构的,高可用的(有严格容灾系统,实时性好),支持订阅分发的。项目期初是为了淘宝异地容灾而成立的,用于数据库之间主备同步,后来采用这套技术方案衍生出了DRC-TAIR, DRC-DUMP等项目。

所谓异地双活主要关注两件事,一个数据同步,一个数据分发。

到底怎样的应用会需要异地的双活?比较常见的场景有三个:

  1. 两个地域或多个地域都有大量用户的场景,比如在中国的用户希望他们用杭州的RDS服务,在美国的用户用美国的RDS服务,这就需要数据在异地同步。很多游戏,金融,传媒,电商业务都有这种需求。满足这个需求的难点在于跨地域的网络,比如网络延时长,丢包多,而且数据在公网传输会有数据泄露风险。

  2. 数据来源较多,需要介入各种异构数据的场景。比如一个应用需要从ODPS, RDS, OTS, OceanBase, PostgreSQL这几个服务介入数据,他们的数据结构和接口都不同,这种接入的成本会比较高。因此另一个可用的方法是数据写入的时候就一份多谢为不同数据结构

  3. 下游订阅很多的情况,比如一份数据,备份系统、通知系统、大数据分析系统、索引系统等等都要来取,如果用上面一份数据多写的方案是可以应对的,但这里还有其他难点,就是数据一致性、可扩展性、跨网同步稳定性、以及同步的实时性。

DRC支持读取集团MySQL, RDS, OceanBase, HBase, Oracle等多种不同的数据源的实时增量数据,支持写入数据库、MetaQ, ODPS等多种存储媒介.

数据库相关中间件(下)


数据库相关中间件(下)

以前在一个城市做双机房主备,两个机房是数据对等的,写入是随机分布,然后通过主备HA进行数据同步。这样机房对等的思路会导致业务增长、数据增长只能通过两个机房不停堆机器来解决。另一方面,如果整个城市断电,那么双活就成了双死。下一个思路是做跨城市,早期常用的做法是一个城市写,另一个城市冷备,就是晚上做同步,但这就意味着白天如果发生了什么事儿,这一天的数据就比较危险。另一个思路是两个城市多写,数据落两边,这样的问题是应用调用次数频繁的话,如果调用异地数据多来那么一两次,整个应用的延时就很长。这个思路再进一步发展,就是做单元内封闭以减少异地调用,这就涉及到业务上的改造。

顺着这个思路,阿里的异地双活重点做了几件事。一个是热插拔,可以做到在业务高峰时增加节点,高峰过了把增加的节点关闭。做到这个的一个关键是流量实时切换 ,DRC可以在20秒以内把一个单元(region)的流量迁移到另一个单元。另一个是数据实时恢复,就是通过一定的冗余设计,一旦一个单元挂掉了,可以在另一个单元做全量恢复。

数据库相关中间件(下)

异地多活在数据方面的挑战是非常大的。双十一期间,交易会激增,所以交易链路做了单元化。交易链路的数据分为三个维度:买家、卖家、商品。买家之间通常没有太多交叉,天然的适应这种隔离,而且卖家对延迟的敏感度非常高,所以按照卖家维度切分,在单元内封闭,而卖家和商品都是在中心写入。

数据方面的两个核心要求:

  1. 一致性,要求卖家和商品一致,单元和中心一致,也就是数据同步不能丢数据,不能错数据,还要保证事务。

  2. 实时性,需要做到秒级别的延迟。

双单元的同步架构有两种:

一种是读写分离的方式,中心写,单元读。单元需要的数据如果没有从中心及时同步过来,或者同步错了,那有问题这段时间的交易会全部收到影响。这里的核心是,保证秒级延迟,同时保证一致性。(JD的多中心交易系统就采用了这种方式)

数据库相关中间件(下)

第二种同步架构是单元封闭的方式。中心和单元各有写入,我们通过冗余是的中心和单元随时可以各自接管。(类似Otter)

数据库相关中间件(下)

这里的关键是:

  • 避免循环复制:通过在DB透传打标事务的方式实现。

  • 限流:峰值的压力,我们单元化本来就选取了流量激增业务,两边都实时同步100%全量数据,峰值对每个系统的压力有增无减。DRC的store和congo都可以根据TPS或者流量限流。限速算法的核心思想分为批量采样,奖惩时间,平滑变速。

Otter与DRC的区别:

- Otter是阿里B2B的产品,DRC是阿里技术保障团队的产品

- Otter是针对MySQL的,DRC可以支持多种类型的数据源

- DRC从业务上进行了划分,可以实现单元内封闭,Otter的实现不涉及业务,而是在纯数据库层打通的技术

- Otter是双写,DRC是中心写、分中心读,或者都部分写,相互同步。

- Otter所处的网络环境较DRC差,解决一致性问题也较复杂(基于trusted source的单向环回的补救,基于时间交集的补救),DRC有两种实现方式,具体参考上面。

异地多活中DRC的核心能力就是在低延迟,一致性和高可用。

数据库相关中间件(下)

  • 一致性:基于日志流式抓取、回放库表结构变更、基于事务的冲突检测。

  • 低延迟:最大延迟不超过1s, 消息协议优化,三级数据存储,预读优化IO, 多连接复用和传输压缩,高效的并发复制算法。

  • 高可用:主备切换,拓扑变化,心跳跟踪,多维度容灾。

JD多中心交易系统

JD数据复制中间件考察和借鉴了开源社区的实现,例如Databus、Canal/Otter、OpenReplicator等,解析部分使用了Canal的DBSync。

多中心交易本质上是一个更大的分布式系统,交易流程中依赖和产生的数据和服务有不同的特点,必然涉及到数据分区、路由、复制、读写一致性、延迟等分布式领域的常见问题。

其中,数据一致性是电商网站需要面临的首要问题,越是流量增大的时候越要保证数据更新的即时性和准确性。在多中心之间需要同步卖家数据和商品数据,如果同步的延时太长,买家、卖家都不可接受。比如,卖家改了价格或库存,用户不能过很久才看到。同样,数据正确性也是很大的挑战,卖掉的商品能够及时减少,退货的商品能够及时增加。这都时刻考验着后端系统和数据库平台的健壮性。

除了数据一致性之外,如何保证路由规则的一致性也是关键性的问题。从技术角度来说,要保障单一用户从登录到访问服务、到访问数据库,全链路的路由规则都是完全一致的。如果路由错误,看到的数据不正确,也会影响到最终用户的体验。

架构

数据库相关中间件(下)

系统包括一个主中心和多个分中心,主中心与分中心之间通过数据总线交换数据。数据流向中,主数据(商品数据、商家数据、用户数据等)的流向从主中心通过数据总线实时同步到分中心,分中心只读;而交易数据(订单数据)的流向从分中心实时同步到主中心;在故障时,会从分中心转移到主中心。

在这个系统中,有多处体现分流的概念。首先,买家访问京东网站下单时,会被优先分流到附近的交易中心;其次,根据交易系统的特点,接单前(包括购物车、结算页等),多中心交易按用户维度分流,如下图所示。用户登录时,查询用户与区域的映射关系表(类似你是哪个片区的),标识此用户属于哪个分中心,并保存标识到cookie中,然后将用户路由到指定的分中心。用户访问其他系统,如购物车和结算页时,从cookie中读取标识,重定向到相应分中心页面。

通过分流,将用户分配到相应的分中心,一方面响应速度快,用户体验更好,不用跨地域访问数据中心了;另一方面,每个中心服务一定数量的用户,水平扩展性好,也能支撑更大的交易规模了。当然,多数据中心不能盲目干活,还考虑到容灾备份的问题。(支付宝光纤事件)

交易系统包括应用和数据部分,应用部分是无状态的,就是说,这些工作是无差别的,一台服务器出问题,我换一台服务器来处理就是了,较容易实现多机房多活。但是数据不一样,多中心交易本质上是一个更大的分布式系统,必然涉及到数据分区、路由、复制、读写一致性、延迟等分布式领域的常见问题。

另外,交易流程中依赖和产生的数据和服务有不同的特点。比如商品、促销和价格、库存的读服务,我们可以将之称为基础主数据,它们在用户下单流程中是无法分区的,否则无法实现单机房内流量闭环,也就是说,不能因为分区数据的不一致,导致同一用户在单一流程中看到不同的数据(假如你加入购物车时是促销20块,结账是25块,你会不会表情扭曲?)而商品、促销和价格的写服务,是给采销、第三方POP商家应用调用的,这种业务场景的可用性目标,主机房部署和冷备模式即可满足,而且业务人员的操作流程会抵消写复制延迟。

简单来说,数据的问题表现在以下几个方面:一、 如何保证数据的即时性和准确性,多中心之间需要同步卖家数据和商品数据,如果同步的延时太长,买家、卖家都不可接受,由于是异地部署,最好延时能控制在1秒内。比如,卖家改了价格或库存,用户不能过很久才看到。同样,数据正确性也是很大的挑战,因为数据故障跟应用层故障不一样,应用出故障了,可能只影响用户访问;数据写错了无法恢复。2、如何保证路由规则的一致性,要保障这个用户从进来到访问服务,到访问数据库,全链路的路由规则都是完全一致的;如果路由错误,看到的数据不正确。

从同城双机房的分布转变为异地多机房的分布,给数据同步带来了新的挑战,因此如何设计数据总线也是项目能否实现的关键因素。京东的多中心交易系统通过数据总线JingoBus进行快速数据交换,同步性能是mysql的3倍以上,而且可用性高,架构灵活。其中,全新的总线设计解决了多中心交易跨机房的数据库复制和多数据源间的数据异构同步等难题,实现了高性能、低延时、健壮的数据同步机制。

数据库相关中间件(下)

如图所示,数据总线主要分Relay、Snapshot和Replicator三部分构成,其中Relay从来源数据库抽取事务日志,并对Replicator提供日志订阅服务,角色上相当于Mysql Slave IO Thread。Snapshot从Relay订阅所有事务日志,写入持久存储作为快照,同时向Replicator提供批量日志订阅服务,角色上相当于Mysql Slave Relay Log。Replicator:事务日志的消费端,从Relay或Snapshot拉取事务日志将事务日志按配置的一致性应用到目标数据库,角色上相当于Mysql Slave SQL Thread。(参考下面MySQL主备复制原理图)

数据库相关中间件(下)

正常情况下,Replicator直接连接Relay,消费Relay内存队列中的事务日志。但有些情况下,因为网络抖动、目标库的负载过高等因素,可能导致Replicator相对Relay落后很多。另外,当新的消费端加入同一数据源的订阅者时,新消费端有冷启动的问题。为了避免重新从数据源做全量快照,Snapshot作为Relay的一个特殊消费端,通过一种高吞吐的消费方式,从Relay源源不断的消费在线事务日志,通过对事务日志的有效处理,最终保存了数据源的一份一致快照(Consistent Snapshot),即包括了数据源库表中每一行的最新状态的快照,同时保留了一段比Relay buffer更旧的事务日志(Log Store)。由此看来,数据总线作为一个数据层的通用CDC组件,对于多中心交易项目以及异步复制场景提供了整体解决方案,奠定了项目的核心内容。

跨数据库(数据源)迁移

yugong

去Oracle数据迁移同步工具。定位:数据库迁移(目前主要支持Oracle->mysql/DRDS)

08年左右,阿里巴巴开始尝试MySQL的相关研究,并开发了基于MySQL分库分表技术的相关产品,Cobar/TDDL(目前为阿里云DRDS产品),解决了单机Oracle无法满足的扩展性问题,当时也掀起一股去IOE项目的浪潮,愚公这项目因此而诞生,其要解决的目标就是帮助用户完成从Oracle数据迁移到MySQL上,完成去IOE的第一步.

概述

整个数据迁移过程,分为两个部分:

  • 全量迁移

  • 增量迁移

数据库相关中间件(下)

过程描述:

  1. 增量数据收集(创建Oracle表的增量物化视图)

  2. 进行全量复制

  3. 进行增量复制(可并行进行数据校验)

  4. 原库停写,切换到新库

Oracle全量基于JDBC拉取数据,增量基于物化视图来实现。

架构

数据库相关中间件(下)

说明:

  1. 一个JVM Container 对应多个instance,每个instance对应于一张表的迁移任务

  2. instance分为三部分

  • extractor (从数据源库上提取数据,可分为全量/增量实现)

  • translator (将源库上的数据按照目标库的需求进行自定义转化)

  • applier(将数据更新到目标库,可分为全量/增量/对比的实现)

自定义数据转换

如果要迁移的Oracle和mysql的表结构不同,比如表名,字段名有差异,字段类型不兼容,需要使用自定义数据转换。如果完全相同则可以跳过。

整个数据流为:DB->Extractor->DataTranslator->Applier->DB, 本程序预留DataTranslator接口(仅支持Java),允许外部用户自定义数据处理逻辑。比如:

  1. 表名不同

  2. 字段名不同

  3. 字段类型不同

  4. 字段个数不同

  5. 运行过程join其他表的数据做计算等

运行模式介绍

1.MARK模式(MARK)

开启增量日志模式,如果是Oracle就是创建物化视图(materialized view)。

2.CLEAR模式(CLEAR)

清理增量日志的几率,如果是Oracle就是删除物化视图

3.全量模式(FULL)

全量模式,顾名思议即为对源表进行一次全量操作,遍历源表所有的数据后,插入目标表.

全量有两种处理方式:

  1. 分页处理:如果源表存在主键,只有一个主键字段,并且主键字段类型为Number类型,默认会选择该分页处理模式. 优点:支持断点续做,对源库压力相对较小。 缺点:迁移速度慢

  2. once处理:通过select * from访问整个源表的某一个mvcc版本的数据,通过cursor.next遍历整个结果集. 优点:迁移速度快,为分页处理的5倍左右。 缺点:源库压力大,如果源库并发修改量大,会导致数据库MVCC版本过多,出现栈错误. 还有就是不支持断点续做.

4.增量模式(INC)

全量模式,顾名思议即为对源表增量变化的数据插入目标表,增量模式依赖记录日志功能.

目前增量模式的记录日志功能,是通过oracle的物化视图功能。

5.自动模式(ALL)

自动模式,是对全量+增量模式的一种组合,自动化运行,减少操作成本.

自动模式的内部实现步骤:

  1. 开启记录日志功能. (创建物化视图)

  2. 运行全量同步模式. (全量完成后,自动进入下一步)

  3. 运行增量同步模式. (增量模式,没有完成的概念,所以也就不会自动退出,需要业务判断是否可以退出,可以看一下切换流程)

6.对比模式(CHECK)

对比模式,即为对源库和目标库的数据进行一次全量对比,验证一下迁移结果. 对比模式为一种可选运行,做完全量/增量/自动模式后,可选择性的运行对比模式,来确保本次迁移的正确性.

DataX

DataX是一个在异构的数据库/文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。

目前成熟的数据导入导出工具比较多,但是一般都只能用于数据导入或者导出,并且只能支持一个或者几个特定类型的数据库。

这样带来的一个问题是,如果我们拥有很多不同类型的数据库/文件系统(Mysql/Oracle/Rac/Hive/Other…),并且经常需要在它们之间导入导出数据,那么我们可能需要开发/维护/学习使用一批这样的工具(jdbcdump/dbloader/multithread/getmerge+sqlloader/mysqldumper…)。而且以后每增加一种库类型,我们需要的工具数目将线性增长。(当我们需要将mysql的数据导入oracle的时候,有没有过想从jdbcdump和dbloader上各掰下来一半拼在一起到冲动?)这些工具有些使用文件中转数据,有些使用管道,不同程度的为数据中转带来额外开销,效率差别很非常大。很多工具也无法满足ETL任务中常见的需求,比如日期格式转化,特性字符的转化,编码转换。另外,有些时候,我们希望在一个很短的时间窗口内,将一份数据从一个数据库同时导出到多个不同类型的数据库。DataX正是为了解决这些问题而生。

数据库相关中间件(下)

  • 左图:新增第n+1个数据源,是不是需要开发n个数据同步工具?

  • 右图:只需要针对新增的数据源开发一套Reader/Writer插件,即可实现任意数据的互导。

设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。

框架设计

数据库相关中间件(下)

DataX本身作为离线数据同步框架,采用Framework+plugin架构构建。将数据源读取和写入抽象称为Reader/Writer插件,纳入到整个同步框架中。

  • Reader: Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework.

  • Writer:Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端

  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓存,流控,并发,数据转换等核心技术问题。

DataX框架内部通过双缓冲队列、线程池封装等技术,集中处理了高速数据交换遇到的问题,提供简单的接口与插件交互,插件分为Reader和Writer两类,基于框架提供的插件接口,可以十分便捷的开发出需要的插件。比如想要从oracle导出数据到mysql,那么需要做的就是开发出OracleReader和MysqlWriter插件,装配到框架上即可。并且这样的插件一般情况下在其他数据交换场合是可以通用的。

核心架构

DataX3.0 开源版本支持单机多线程模式完成同步作业运行,这里按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

数据库相关中间件(下)

核心模块介绍:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

DataX调度流程:

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

1. DataXJob根据分库分表切分成了100个Task。

2. 根据20个并发,DataX计算共需要分配4个TaskGroup。

3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

Datax插件开发:https://github.com/alibaba/DataX/wiki/DataX%E6%8F%92%E4%BB%B6%E5%BC%80%E5%8F%91%E5%AE%9D%E5%85%B8