Kafka 源代码分析之Message

时间:2022-12-06 21:08:50

这里主要分析一下message的格式.

一条message的构成由以下部分组成

  val CrcOffset = 0    //crc校验部分和字长
val CrcLength = 4
val MagicOffset = CrcOffset + CrcLength //消息协议版本和字长
val MagicLength = 1
val AttributesOffset = MagicOffset + MagicLength //独立版本,压缩类型之类的标识和字长
val AttributesLength = 1
val KeySizeOffset = AttributesOffset + AttributesLength //key标识和字长
val KeySizeLength = 4
val KeyOffset = KeySizeOffset + KeySizeLength
val ValueSizeLength = 4 //消息内容长度标识和字长

由上面可看出一个消息的内容部分的起始位置实际上是keyoffset+keysize+valuesizeLength

def keySize: Int = buffer.getInt(Message.KeySizeOffset)   //获取key大小

private def payloadSizeOffset = Message.KeyOffset + max(0, keySize)  //加上key大小

private def sliceDelimited(start: Int): ByteBuffer = {
val size = buffer.getInt(start) //获取消息内容长度
if(size < 0) {
null
} else {
var b = buffer.duplicate
b.position(start + 4) //加上内容长度的字节数.
b = b.slice()
b.limit(size)
b.rewind
b //返回消息内容对应的buffer
}
}

一条完整的消息由上面几部分组成.

这里是用java实现的一个打印log文件里的offset和对应消息的小工具.

https://github.com/cloud-zhao/java/blob/master/kafka/Kmessages.java