kafka源码解析之八LogManager

时间:2022-12-24 17:27:05

8.1 kafka日志的组成

class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
private val logs = new Pool[TopicAndPartition, Log]()
}
class Log(val dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
……
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
}

Kafka创建topic时可以指定topic的partition个数,每个broker按照自己分到的topic的partition创建对应的log。其中每个log由多个LogSegment组成,每个LogSegment以本LogSegment的第一条message为索引供segments管理,如图:

kafka源码解析之八LogManager

kafka源码解析之八LogManager

其中LogSegment的组成如下:

class LogSegment(val log: FileMessageSet, 
val index: OffsetIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
time: Time) extends Logging {
……
}

其中FileMessageSet通过设定设定segment之内的start和end来读取segment内的文件,OffsetIndex是segment里面的message索引,它并不是每条message建立索引,而是间隔log.index.interval.bytes条message添加一条索引,即如下图所示:

kafka源码解析之八LogManager

kafka源码解析之八LogManager

因此查找一条记录的话,如果给定topic,partition和offset,则分2步完成:

1)快速定位segmentfile

先定位位于哪个segmentfile,因为segments是由ConcurrentSkipListMap组成的一个跳跃表,即:

kafka源码解析之八LogManager

kafka源码解析之八LogManager

通过跳跃表快速定位到位于哪个segment file

2)segment file中查找msg chunk

然后通过稀疏索引的方式进行二分查找,查找到对应的索引块:

 
   
class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {/** * Find the slot in which the largest offset less than or equal to the given * target offset is stored. *  * @param idx The index buffer * @param targetOffset The offset to look for *  * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty */private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {  // we only store the difference from the base offset so calculate that  val relOffset = targetOffset - baseOffset  // check if the index is empty  if(entries == 0)    return -1  // check if the target offset is smaller than the least offset  if(relativeOffset(idx, 0) > relOffset)    return -1  // binary search for the entry  var lo = 0  var hi = entries-1  while(lo < hi) {    val mid = ceil(hi/2.0 + lo/2.0).toInt    val found = relativeOffset(idx, mid)    if(found == relOffset)      return mid    else if(found < relOffset)      lo = mid    else      hi = mid - 1  }  lo}}

8.2 LogManager的启动

主要分2步:

第一步:根据每个kafka配置的log目录,重建logs

class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()

createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs()
……
}

recoveryPointCheckpoints记录了每个log最新的刷新的位置,即刷到磁盘的topic and partition的messages的偏移量,有可能在这之后的LogSegment和OffsetIndex刷新异常,需要特殊处理。

 

第二步:启动3个task和是否开启日志合并功能

def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
//删除过期的数据+删除冗余的数据
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
//flush脏数据
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)

scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
//日志合并,把小的多个logsegment合并为大的一个logsegment
if(cleanerConfig.enableCleaner)
cleaner.startup()
}

cleanupLogs:负责删除任何过期的数据和冗余的数据

flushDirtyLogs:负责刷新数据

checkpointRecoveryPointOffsets:对log进行checkpoint,可以提高broker重启的速度,只需要针对checkpoint之后的数据进行特殊处理

cleaner:根据log.cleaner.enable配置是否开启日志合并功能,日志合并指的是是否对相同的key进行压缩,如下图:

kafka源码解析之八LogManager
kafka源码解析之八LogManager