这里主要分析一下message的格式.
一条message的构成由以下部分组成
val CrcOffset = 0 //crc校验部分和字长
val CrcLength = 4
val MagicOffset = CrcOffset + CrcLength //消息协议版本和字长
val MagicLength = 1
val AttributesOffset = MagicOffset + MagicLength //独立版本,压缩类型之类的标识和字长
val AttributesLength = 1
val KeySizeOffset = AttributesOffset + AttributesLength //key标识和字长
val KeySizeLength = 4
val KeyOffset = KeySizeOffset + KeySizeLength
val ValueSizeLength = 4 //消息内容长度标识和字长
由上面可看出一个消息的内容部分的起始位置实际上是keyoffset+keysize+valuesizeLength
def keySize: Int = buffer.getInt(Message.KeySizeOffset) //获取key大小 private def payloadSizeOffset = Message.KeyOffset + max(0, keySize) //加上key大小 private def sliceDelimited(start: Int): ByteBuffer = {
val size = buffer.getInt(start) //获取消息内容长度
if(size < 0) {
null
} else {
var b = buffer.duplicate
b.position(start + 4) //加上内容长度的字节数.
b = b.slice()
b.limit(size)
b.rewind
b //返回消息内容对应的buffer
}
}
一条完整的消息由上面几部分组成.
这里是用java实现的一个打印log文件里的offset和对应消息的小工具.
https://github.com/cloud-zhao/java/blob/master/kafka/Kmessages.java