MySQL二进制日志分析-代码实现(FORMAT_DESCRIPTION_EVENT)

时间:2021-03-23 16:01:40

如前文概述,MySQL Binlog v3以前版本, 二进制日志文件的第一个事件是START_EVENT_V3, 从v4版本开始第一个事件为FORMAT_DESCRIPTION_EVENT(以下简称FD事件),替代掉START_EVENT_V3。具体到MySQL服务器版本来说,MySQL 5.0以前版本二进制日志的第一个事件是START_EVENT_V3,而后续版本的第一个事件都是FD事件,由于目前大部分MySQL都跑在5.0+,所以这里不讨论START_EVENT_V3事件,FD事件可以看成是START_EVENT_V3的继承和扩展,是相通的,有兴趣可以虫github 克隆(clone)BinlogMiner的源代码下来参考。

FD事件比较简单,顺带介绍一下BinlogMiner的程序实现,本文可能很拖沓。

文件I/O是编程语言的基本功能,也是编程的基本功。具体到Java来说,传统方式是通过I/O流来进行文件的读写,如"BufferedInputStream"。Java 1.4版本的开始引入新的IO访问方式Java NIO (New IO) ,其中的MappedByteBuffer允许将一个文件通过FileChannel.map,映射成一个内存缓存,跟简单的来说,映射成一个字节数组,可以通过直接访问数组的方式访问文件。这大大简化了文件的随机读写,而且相较于流方式,更接近人的思考方式,还有极高的读写效率。总的来说Java还是很全面的,对于底层的开发也支持很好,NO.1 霸榜多年还是有道理的。BinlogMiner基于MappedByteBuffer实现的。下面介绍的并非BinlogMiner的代码(因为实际代码需要处理很多细节,不适合来说原理),下面只是举个简单实现能跑的栗子。

只需要简单的3行就可以创建一个MappedByteBuffer:

String binlogFileName = "D:\\build\\binlogs\\5.7.18\\blog.000016";
RandomAccessFile binlogFile = new RandomAccessFile(binlogFileName, "r");
FileChannel binlogFileChannel = binlogFile.getChannel();
MappedByteBuffer blogFileBuffer = binlogFileChannel.map(MapMode.READ_ONLY, 0, binlogFile.length());

最后一行,创建一个将文件(D:\\build\\binlogs\\5.7.18\\blog.000016)映射到MappedByteBuffer,只读模式,范围从第一个字节开始(0),到文件结束(binlogFile.length())。通过MappedByteBuffer.position()可以获取当前的文件访问的位置,通过MappedByteBuffer.position(int position)可以设置当前文件访问的位置,在概述的时候提到,前4个字节是二进制文件的幻数(magic number),将访问指针直接跳过,就到了FD事件开始了。

blogFileBuffer.position(4);
int starPos = blogFileBuffer.position(); //记录事件开始位置

这里需要注意的是MappedByteBuffer的position是一个整型,最大的值是Integer.MAX_VALUE。也就是2GB,如果超过2GB,那么就需要重新映射,如:

blogFileBuffer = binlogFileChannel.map(MapMode.READ_ONLY, new_start_position , new_end_position);

这时候MappedByteBuffer的position就是一个相对偏移,需要根据new_start_position_position来获得绝对偏移量,当然对于Binlog不需要担心这个问题,因为截至当前的MySQL 8版本,二进制文件的最大限制还是1GB。

MySQL二进制日志分析-代码实现(FORMAT_DESCRIPTION_EVENT)

1. Common header的解析

根据前文,事件的第一部分是通用头,根据官方文档:
https://dev.mysql.com/doc/internals/en/binlog-event-header.html

Binlog header Payload:
4 timestamp
1 event type
4 server-id
4 event-size
if binlog-version > 1:
4 log pos
2 flags

接下来代码获取文件头的内容:

1.1.timestamp

前4个字节是该事件的时间戳:

byte[] rawTimestamp = new byte[4];
blogFileBuffer.get(rawTimestamp);

根据文档,timestamp是一个无符号的4字节长度的整型,这里需要注意的2个细节是:
  - Java没有unsigned关键字,没有无符号数,这里需要用第三放的库或者直接实现无符号数。
  - 不同平台,使用不同字节序的概念,如,一个数值,1234,litte-endian存储的是4321,而big-endian存储的是1234。
  - 在Java中,通过4个字节记录一个整型,8个字节记录一个长整型,但在binlog中,为节省空间,提高性能,可能是1个字节,2个字节,3个字节,。。。

在BinlogMiner中,是自己写函数实现的无符号数读取,具体可以看PaserHelper这个工具类,这里就不列出来:

ByteOrder order = ByteOrder.LITTLE_ENDIAN;  //little-endian,小字节序,x86平台都是。
long timestampValue = ParserHelper.getUnsignedLong(rawTimestamp, order); //获得了存储的值; //这里获得的timestamp是unix timestamp,是基于秒的,而java的Date是基于毫秒(milliseconds)的,所以这里转换要乘以1000.
Date timestamp = new Date(timestampValue * 1000L);
System.out.println(timestamp.toString()); // 这里就获得了可读的事件日期;

1.2. event type

1个字节的无符号数,说明该事件的类型:

byte[] rawEventType = new byte[1];
blogFileBuffer.get(rawEventType);
int eventTypeValue = ParserHelper.getUnsignedInteger(rawEventType, order);
System.out.print(eventTypeValue);

这里输出的事件类型为15,转换成十六进制为0x0F,根据文档:(https://dev.mysql.com/doc/internals/en/binlog-event-type.html)正好是FD事件。

1.3. server-id

MySQL的服务器id,4字节无符号:

byte[] rawServerId = new byte[4];
blogFileBuffer.get(rawServerId);
long serverId = ParserHelper.getUnsignedInteger(rawServerId, order);

1.4. event-size

该事件的长度(字节数),4字节无符号数;

byte[] rawEventSize = new byte[4];
blogFileBuffer.get(rawEventSize);
long eventSize = ParserHelper.getUnsignedInteger(rawEventSize, order);

1.5. log pos

该事件的结束位置,4字节无符号数,v4版本才包含,v4以前的START_EVENT_V3其实也是可以根据event-size计算得log pos的。

byte[] rawLogPos = new byte[4];
blogFileBuffer.get(rawLogPos);
long logPos = ParserHelper.getUnsignedInteger(rawLogPos, order);

1.6. flags

该事件的标志(https://dev.mysql.com/doc/internals/en/binlog-event-flag.html)

byte[] rawFlags = new byte[2];
blogFileBuffer.get(rawFlags);
long flags = ParserHelper.getUnsignedInteger(rawFlags, order);

至此,事件的通用头分析结束。

2. Event Body

Common Header后就是事件内容(Event Body),先看官方定义:

FORMAT_DESCRIPTION_EVENT:A format description event is the first event of a binlog for binlog-version 4. It describes how the other events are layed out.

Payload:
2 binlog-version
string[50] mysql-server version
4 create timestamp
1 event header length
string[p] event type header lengths

注意:这里的文档写的是Payload,但实际分析的时候会发现,应该理解为前文概述中的post header。实际的playload根据版本不同为1个字节,或者0个字节,用于标识CRC算法,先做个引子,后文会详细说明。

2.1.binlog-version

这里自描述了该二进制日志文件的版本,2个字节无符号数:

byte[] rawBinlogVersion = new byte[2];
blogFileBuffer.get(rawBinlogVersion);
int binlogVersion =ParserHelper.getUnsignedInteger(rawBinlogVersion, order);
System.out.print(binlogVersion); //--> 输出4,v4版本

2.2. mysql-server version

记录了服务器的版本,50个字节,字符串类型。

byte[] rawserverVersion = new byte[50];
blogFileBuffer.get(rawserverVersion);
String serverVersion = new String(rawserverVersion).trim();
System.out.print(serverVersion); //--> 输出 5.7.18-15-log

2.3. create timestamp

二进制文件创建的事件戳。4字节无符号数:

byte[] rawCreateTimestamp = new byte[4];
blogFileBuffer.get(rawCreateTimestamp);
long createTimestampValue = ParserHelper.getUnsignedInteger(rawCreateTimestamp, order);
Date createTimestamp = new Date(createTimestampValue * 1000L);

2.4. event header length

通用头(Common header)的长度, 1个字节,无符号数。

byte[] rawCommonHeaderSize = new byte[1];
blogFileBuffer.get(rawCommonHeaderSize);
int commonHeaderSize = ParserHelper.getUnsignedInteger(rawCommonHeaderSize, order);
System.out.print(commonHeaderSize); //--> 输出是19,除了v1版本,其他版本都是固定的19个字节。

2.5. event type header lengths

这里的是每个事件的post header的长度的字节数组,每一个事件一个字节。
首先这个数组长度不是固定的(每个版本包含的事件数很可能是不同的),还可能包含checksum部分,前文说过,在MySQL 5.6.2版本开始引入Checksum, 这里可能还有Checksum的信息。这部分解析就需要些技巧,首先假设“event type header lengths”一直到事件结束。

int pos = blogFileBuffer.position(); //get current position;
byte[] remainBytes = new byte[logPos - pos]; //根据当前的位置和结束位置,计算出该事件剩余的字节数
blogFileBuffer.get(remainBytes);

那么根据这个数组,FD事件的Post Header的长度是多少?根据官方文档,可以根据(事件的编号-1)作为索引获取:

byte rawFdPostHeaderLength = remainBytes[0x0f - 1]; //FD event = 0x0f
int fdPostHeaderLength = rawFdPostHeaderLength & 0xFF; //这里是byte转无符号int
System.out.println(fdPostHeaderLength); //--> FD的post header长度为95个字节。

可以通过log pos和post header来计算出checksum部分的长度,其中根据官方文档,用1个字节记录checksum类型,剩下的为checksum的值:

long crcSize = logPos - (starPos + 19 + fdPostHeaderLength); // 输出结构--> 5 (1 byte crc type + 4 byte crc value)

所以实际的Post header数组为:

byte[] postHeaderSizeArray = Arrays.copyOfRange(remainBytes, 0, remainBytes.length-5);

2.6. checksum算法

所以checksum的类型,和checksum的值为:

int crcType =remainBytes[postHeaderSizeArray.length] & 0xff;
System.out.println(ParserHelper.getHexString(crcValue)); // --> = 1, 根据文档也就是CRC32
byte[] crcValue = Arrays.copyOfRange(remainBytes, remainBytes.length-(5-1), remainBytes.length);
System.out.println(ParserHelper.getHexString(crcValue));
//--> 本案例的输出为F2FABAC5,可以通过mysqlbinlog比对(0xc5bafaf2), 注意,因为是little-endian平台,这里需要倒过来。

至此,第一个事件分析完毕。最后附上完整代码(注意:ParserHelper来自于BinlogMiner):

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.Arrays;
import java.util.Date; import org.littlestar.mysql.binlog.parser.ParserHelper; public class T2 {
public static void main(String[] args) throws IOException {
String binlogFileName = "D:\\build\\binlogs\\5.7.18\\blog.000020";
RandomAccessFile binlogFile = new RandomAccessFile(binlogFileName, "r");
FileChannel binlogFileChannel = binlogFile.getChannel();
MappedByteBuffer blogFileBuffer = binlogFileChannel.map(MapMode.READ_ONLY, 0, binlogFile.length()); //skip 4 bytes image number;
blogFileBuffer.position(4);
int starPos = blogFileBuffer.position();
//Common Header..
byte[] rawTimestamp = new byte[4];
blogFileBuffer.get(rawTimestamp); ByteOrder order = ByteOrder.LITTLE_ENDIAN;
long timestampValue = ParserHelper.getUnsignedLong(rawTimestamp, order);
Date timestamp = new Date(timestampValue * 1000L); byte[] rawEventType = new byte[1];
blogFileBuffer.get(rawEventType);
int eventTypeValue = ParserHelper.getUnsignedInteger(rawEventType, order); byte[] rawServerId = new byte[4];
blogFileBuffer.get(rawServerId);
long serverId = ParserHelper.getUnsignedInteger(rawServerId, order); byte[] rawEventSize = new byte[4];
blogFileBuffer.get(rawEventSize);
long eventSize = ParserHelper.getUnsignedInteger(rawEventSize, order); byte[] rawLogPos = new byte[4];
blogFileBuffer.get(rawLogPos);
int logPos = ParserHelper.getUnsignedInteger(rawLogPos, order); byte[] rawFlags = new byte[2];
blogFileBuffer.get(rawFlags);
int flags = ParserHelper.getUnsignedInteger(rawFlags, order); //Event Body byte[] rawBinlogVersion = new byte[2];
blogFileBuffer.get(rawBinlogVersion);
int binlogVersion = ParserHelper.getUnsignedInteger(rawBinlogVersion, order); byte[] rawserverVersion = new byte[50];
blogFileBuffer.get(rawserverVersion);
String serverVersion = new String(rawserverVersion).trim(); byte[] rawCreateTimestamp = new byte[4];
blogFileBuffer.get(rawCreateTimestamp);
long createTimestampValue = ParserHelper.getUnsignedInteger(rawCreateTimestamp, order);
Date createTimestamp = new Date(createTimestampValue * 1000L); byte[] rawCommonHeaderSize = new byte[1];
blogFileBuffer.get(rawCommonHeaderSize);
int commonHeaderSize = ParserHelper.getUnsignedInteger(rawCommonHeaderSize, order); int pos = blogFileBuffer.position(); //get current position;
byte[] remainBytes = new byte[logPos - pos];
blogFileBuffer.get(remainBytes); byte rawFdPostHeaderLength = remainBytes[0x0f - 1]; //FD event = 0x0f
int fdPostHeaderLength = rawFdPostHeaderLength & 0xFF; //
System.out.println(fdPostHeaderLength);
long crcSize = logPos - (starPos + 19 + fdPostHeaderLength); // --> 5 (1 byte crc type + crc value)
byte[] postHeaderSizeArray = Arrays.copyOfRange(remainBytes, 0, remainBytes.length-5);
int crcType =remainBytes[postHeaderSizeArray.length] & 0xff;
byte[] crcValue = Arrays.copyOfRange(remainBytes, remainBytes.length-4, remainBytes.length);
System.out.println(ParserHelper.getHexString(crcValue)); binlogFileChannel.close();
binlogFile.close(); }
}