本文翻译自:Streaming MySQL tables in real-time to Kafka
这是关于Yelp的实时流数据基础设施系列文章的第二篇。这个系列会深度讲解我们如何用“确保只有一次”的方式把MySQL数据库中的改动实时地以流的方式传输出去,我们如何自动跟踪表模式变化,如何处理和转换流,以及最终如何把这些数据存储到Redshift或Salesforce之类的数据仓库中去。
伴随着我们技术团队的扩张,我们意识到必须将系统从单一庞大的程序向服务转变,这样才会更容易使用,更方便扩展。向这样的架构迁移有许多优点,也有许多问题,最大的问题之一是:我们怎样才能尽快向服务提供它们需要的数据?
设想这样一个系统,里面有数以百计的服务,每个服务又自己管理着MySQL中的数以百万行计的数据,那服务之间为发送数据而进行的通信就会成为一个非常大的技术难题。正如上一篇文章中提到的,很快就会碰到“N+1次查询问题”。对于我们来说,解决方案就是构建一个名为MySQLStreamer的系统,来监控数据库中的数据变更,并将数据变更发送给所有订阅了这些内容的服务。
MySQLStreamer是一个数据库变更捕获和发布系统,它负责捕获数据库的每一条数据变更,将它们打包成消息并发布到Kafka中。“流表二象性”的概念就是讲述可以重放表中的每一条变更操作来将整张表重建为某个时间点的镜像,或者发送表中的每一条变更来重新生成流。
要理解我们是如何捕获和发布数据变更的,就必须了解我们的数据库基础架构。在Yelp,我们把绝大多数数据都保存在MySQL集群中。为了处理海量访问和管理读请求负载,Yelp必须维护几十个在地理上按区域分布的读副本。
数据复制的原理
作为数据复制的基础,MySQL主库上发生的数据变更操作会写到一种特定格式的二进制日志文件中。当一个从库连上主库时,从库会读取主库的二进制文件来获取数据变更信息。这个过程由从库上运行的两个线程完成,一个是IO线程,一个是SQL线程,如下图所示。IO线程主要负责从主库中读回二进制事件,在读到了二进制事件之后,就把它们保存在从库本地的Relay日志文件中。SQL线程就接着再把这些事件从Relay日志中读取出来,按收到的顺序把它们有顺序地重放写入从库中。
MySQL数据复制原理
要注意的是从库上SQL线程执行的事件并不一定是主库二进制文件中的最新事件,这个在上图中表现为复制延迟。Yelp的MySQLStreamer就做为一个从库,把更新操作持久化到Apache Kafka中,而不是象普通的MySQL从库一样持久化到数据表中。
MySQL数据复制类型
MySQL数据库有两种复制类型:
- 基于语句的复制(Statement-based replication,SBR)
- 基于行的复制(Row-based replication,RBR)
在基于语句的复制中,主库写到二进制日志文件中的是SQL语句,从库的SQL线程则直接在从库上重新执行这些语句。使用基于语句的复制有一些缺点,其中最重要的是有可能导致主从库之间的数据不一致。因为从库上的SQL线程只是简单的负责重新执行从主库上拷贝回来的操作语句,但事实上相同的语句在不同的时间不同的数据库上有可能会产生不同的结果。比如下面的语句:
- INSERT INTO places (name, location)
- (SELECT name, location FROM business)
这种情况下你本来是想查询出某些行并将它们插入到另一张表中,但是不带ORDER BY子句的语句在查询多次的情况下,查出的多条结果的顺序不一定每次都相同。而且,如果一个字段有AUTO_INCREMENT属性的话,每次执行相同的语句就可能会产生不同的自增值。另一个明显的例子就是使用RAND()或者NOW()函数,在不同的MySQL实例上运行时就会产生不同的结果。由于这些限制的存在,我们的MySQLStreamer就要求使用基于行的数据复制。在基于行的复制中,每个事件都会表明表中的单条记录是如何被改变的。UPDATE和DELETE语句的日志中都会包含对应数据行在修改之前的原始值。这样,重放这些数据变更记录时就会保持数据一致性。
为什么需要MySQLStreamer?
现在我们知道了数据复制是什么,那又为什么需要MySQLStreamer呢?
Yelp的实时流平台的一个重要用途就是将数据变更流出去,从而让下游的子系统可以处理它们并保持数据更新。有两类我们要知道的SQL变更事件:
- 数据定义语言(Data Definition Language,DDL ):定义或者修改数据库结构或模式;
- 数据操作语言(Data Manipulation Language,DML ):修改数据行;
MySQLStreamer则负责:
- 不断从MySQL二进制文件中查看最新的日志,读取这两种类型的事件;
- 根据事件类型不同而进行相应处理,将DML事件发布到Kafka Topic中;
MySQLStreamer会发布四种不同的事件类型:插入、更新、删除和刷新。前三种对应着相同类型的DML操作。刷新事件由我们的初始化Topic过程产生,在后文中会详细描述。对于每种事件类型,我们都会包含完整的数据行内容。更新事件包括相应数据行的更新前和更新后的全部字段值。这对处理环形操作非常重要。设想一个地理编码服务在处理了业务更新信息后,如果该行数据的地址变了,于是触发了对经度和纬度的更新。如果没有更新前的值,服务就不得不保存非常多的状态来判断该行的地址是不是已经修改完了,从而可以忽略掉经度和纬度的重复更新。可是如果有了更改前和更改后的内容,只需要做一个简单的对比操作就可以打破这样的环形操作了,不必保存任何状态。
事件类型 | 消息内容 |
插入(Insert) | 整行数据 |
更新(Update) | 更新前的整行数据和更新后的整行数据 |
删除(Delete) | 删除前的整行数据 |
刷新(Refresh) | 整行数据 |
MySQLStreamer 事件类型
数据库拓扑
MySQLStreamer由以下三种数据库组成:
数据库拓扑
源数据库
源数据库中存储上游数据的所有变更事件。MySQLStreamer会不断查看它的变更状态,把变更事件以流的形式传送给下游的消费者。MySQLStreamer的二进制日志流解析模块就负责解析二进制日志,找出新事件。我们的流解析模块是个对python-mysql-replication包的BinLogStreamReader 的进一步抽象,这个API主要提供了三个功能:跟踪下一个事件、读取下一个事件、从流的指定位置开始重新读取事件。
模式跟踪数据库
模式跟踪数据库和只有模式没有数据的从库差不多。它最初是只由源库中的模式生成,然后不断的在上面执行源库中的DDL操作来保持更新。这意味着它会略过数据更改,只保存所有表的表结构。我们通过Schematizer服务来在需要时从这个数据库中获取CREATE TABLE语句,并生成Avro模式。模式信息对于把二进制文件中的字段名与值映射起来也是非常必要的。由于存在复制延迟,MySQLStreamer当前处理的复制数据的位置所对应的数据库模式也不一定是与主库的最新状态完全一致的。因此,MySQLStreamer使用的模式定义不能简单地从主库上获取。我们决定用一个数据库来存储这个信息,来避免重新实现MySQL的DDL引擎。
由于DDL操作是非事务型的,因此假如系统执行一条SQL语句失败之后,数据库可能就处于崩溃的状态。为防止这样的事情发生,我们用事务的方式来处理整个数据库。我们在执行任何DDL操作之前先生成检查点,把整个模式跟踪数据库的全部模式创建语句都导出来,并保存在状态数据库中。然后再执行DDL事件。如果成功,就把刚才保存的模式导出文件删掉,再生成一个检查点。一个检查点通常包括保存的二进制文件名和位置,以及相关的Kafka偏移量信息。如果失败了,在MySQLStreamer重启之后,它会检查是否存在模式导出文件。如果有,就执行出错的DDL事件之前的模式导出文件来重建整个模式跟踪数据库。一旦回放完毕,MySQLStreamer就重启日志事件查看服务,从检查点位置开始恢复数据复制,从而最终追上主库,保持数据最新。
状态数据库
状态数据库保存MySQLStreamer的内部状态,它包含三张表,分别保存不同的状态信息:
DATA_EVENT_CHECKPOINT
保存每个Topic及相应的最后推送的偏移量信息。
GLOBAL_EVENT_STATE
这张表中保存的最重要的信息就是位置,具体是这样定义的:
heartbeat_signal | heartbeat_timestamp | log_file | log_position | offset |
位置信息
要保证数据复制在发生故障的情况下仍然是安全的,前提之一就是每个事务都必须有唯一的标识符。它不仅在故障恢复的过程中有用,还在分层复制架构中有用。全局事务标识符(Global Transaction IDentifier,GTID)就是一个这样的标识符,在一套主从复制环境中,它是在所有服务器中全局唯一的。我们的代码已经支持GTID了,但我们使用的MySQL的版本还没支持。所以我们只好寻找其它替代方案来保存这样的状态,要求必须是在整个复制体系中都非常容易解释的,于是我们就想到了Yelp现有的心跳守护进程。这个Python守护进程会负责周期性地向数据库中更新心跳信息,内容是一个序列号和一个时间戳。这个心跳会随后被复制到所有的从库中。MySQLStreamer从心跳信息中提取序列号和时间戳,再附上它现在正在处理的日志文件名和日志内偏移量,把这些信息保存在global_event_state表中。当主库由于某些原因出故障时, 一个脚本程序会利用心跳序列号和时间戳信息从新的主库中找出日志文件名和偏移量。
MYSQL_DUMPS
保存模式跟踪数据库导出的模式文件,用于在出错之后将数据库恢复到一个旧的稳定状态。
MySQLStreamer是怎么工作的?
MySQLStreamer工作机制
在MySQLStreamer进程启动时,它必须先去ZooKeeper上获取一个锁,然后才可以开始消息处理。获取锁的操作目的是避免在一个集群上会运行多个MySQLStreamer实例。在一个集群上运行多实例的问题在于日志复制本身是有顺序的,在某些场景下我们必须保留表内和表间的日志顺序。保证只有一个MySQLStreamer实例在处理日志,就可以保留日志顺序,避免消息被重复处理。
如前文所述,MySQLStreamer利用Binlog Parser从源数据库中获取事件。对于数据事件,就从中提取表的模式信息,并发送给Schematizer服务。Schematizer服务会返回相应的Avro模式和Kafka Topic。Schematizer服务是幂等的,不管你调用多少次,只要你发给它的是相同的建表语句,那它就会返回相同的Avro模式和Kafka Topic。
MySQLStreamer用收到的Avro模式将数据事件打包,然后发布到Kafka Topic中。Yelp数据管道的Kafka生产者会为所有发布到Kafka中的事件维护一个内部队列。当它从Binlog Parser中收到了一个模式事件时,MySQLStreamer就先把内部队列里的所有事件全都刷出去,再生成一个检查点,以备万一发生故障时恢复用。然后它再把模式事件应用到模式跟踪数据库上。
对数据事件的错误处理与模式事件略有不同。我们会在处理任何数据事件之前先生成检查点,然后在成功发布一批消息之后再继续生成检查点。我们信任成功的回复,而不是失败的。如果我们处理失败了,恢复时就去获取最新的检查点和Kafka高水位信息,将刚才没有成功发布的消息再发布一次。对于Kafka一端的设置,我们会要求收到所有处于复制状态的副本的响应消息才认为写入是成功的,而且我们也把min.isr设置得比较高,牺牲了可用性来换取一致性。通过一整套的验证错误和故障恢复手段,我们可以保证消息是确定只发布一次的。
初始化一个Topic
Yelp公司成立于2004年,有趣的是许多表和公司的年纪一样大。我们需要有方法来用表中的现有内容来初始化相应的Kafka Topic。我们设计了一套流程,来一边处理新的复制事件,一边完成一致性的Topic初始化过程。
在讨论具体的初始化过程之前,先看看数据复制拓朴架构。参考上图,有一个主库,它有一个叫做中间主库(Intermediate Master)的从库。中间主库下级还有许多的从库,叫本地主库(Local Master)。MySQLStreamer连接的从库叫做刷新主库(Refresh Primary),它实际上是某一个本地主库的从库。刷新主库的数据复制连接是基于行的,而其它的复制全都是基于语句的。
初始化过程首先要在MySQLStreamer的刷新主库上创建一张与原始表一样的表,区别只是要用MySQL的Blackhole引擎。
- blackhole_table = create_blackhole_table(original_table)
- while remaining_data(original_table)
- lock_table(original_table)
- copy_batch(original_table, blackhole_table, batch_size)
- unlock_table(original_table)
- wait_for_replication()
- drop_table(blackhole_table)
初始化Topic过程伪码
MySQL的Blackhole引擎就象是Linux系统上的’/dev/null’一样。我们使用Blackhole引擎的主要原因是写入这种表的数据都不会被保存下来,但却可以生成二进制日志,可供复制使用。这样我们就可以重现出原始表中的二进制日志,但不必担心数据要多存一份。
创建完Blackhole引擎的表之后,需要把本地主库上的原始表锁住,以避免导数据的过程中会发生数据更新操作。然后我们就把数据一批批的从原始表中拷贝到Blackhole表中。如上图所示,MySQLStreamer是连接到某个叶子节点上的,原因是我们不希望初始化过程中产生的日志会被复制到集群中的其它节点上。但我们还是希望在初始化的过程中原始表可以被更新,因此在批量拷数据的操作之间,我们会把原始表解锁一段时间,让本地主库跟上数据复制进度。锁住原始表会导致从本地主库到刷新主库之间的数据复制操作暂时失效,但这样可以保证我们拷到Blackhole表中的数据在那个时刻是与本地主库一致的。解锁可以让复制跟上进度,自然也会拖慢初始化过程的速度。但这个过程实际上是非常快的,因为数据没有离开过MySQL服务器。每一批导数据操作而导致的复制延迟大概都是微秒级的。
这个过程的所有复杂之处都在于数据库。在MySQLStreamer内部,我们的代码会把Blackhole表上的insert操作当成原始表上的刷新事件来处理。Topic中的多个刷新事件之间会被Insert、Update、Delete等正常的复制事件间隔开,因为在初始化过程中我们会不断地解锁原始表,所以复制过程中的这些操作都会发布出来。从语义上来说,许多消费者程序都把刷新事件当成Update来处理。
关键点
技术团队的时间是非常宝贵的。技术人员应该遵守一个好原则:任何重复性工作都应该自动化。伴随着Yelp的技术团队扩张,我们需要实现一个单一、有弹性的基础架构来方便实现众多应用程序。有了数据管道之后,我们就可以为检索数据方便而创建索引、将数据保存到数据仓库中、与其它的内部服务共享转换后的数据……等等。事实证明数据管道价值巨大,让我们朝着目标中的自动化状态迈进了一大步。MySQLStreamer则是数据管道非常重要的一个组成部分,它从MySQL二进制文件中提取所有的数据变更操作事件,再把这些事件发布到Kafka中。当改动信息都发布到Kafka Topic中之后,下游消费者程序就很容易根据自己的特定用途来使用数据了。
鸣谢
感谢整个业务分析与指标组(Business Analytics and Metrics,B.A.M),数据库项目组和所有为MySQLStreamer做出了贡献的同事们:Abrar Sheikh、Cheng Chen、Jenni Snyder和Justin Cunningham等。