8.1 kafka日志的组成
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
private val logs = new Pool[TopicAndPartition, Log]()
}
class Log(val dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
……
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
}
Kafka创建topic时可以指定topic的partition个数,每个broker按照自己分到的topic的partition创建对应的log。其中每个log由多个LogSegment组成,每个LogSegment以本LogSegment的第一条message为索引供segments管理,如图:
其中LogSegment的组成如下:
class LogSegment(val log: FileMessageSet,
val index: OffsetIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
time: Time) extends Logging {
……
}
其中FileMessageSet通过设定设定segment之内的start和end来读取segment内的文件,OffsetIndex是segment里面的message索引,它并不是每条message建立索引,而是间隔log.index.interval.bytes条message添加一条索引,即如下图所示:
因此查找一条记录的话,如果给定topic,partition和offset,则分2步完成:
1)快速定位segmentfile
先定位位于哪个segmentfile,因为segments是由ConcurrentSkipListMap组成的一个跳跃表,即:
通过跳跃表快速定位到位于哪个segment file
2)segment file中查找msg chunk
然后通过稀疏索引的方式进行二分查找,查找到对应的索引块:
class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {/** * Find the slot in which the largest offset less than or equal to the given * target offset is stored. * * @param idx The index buffer * @param targetOffset The offset to look for * * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty */private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = { // we only store the difference from the base offset so calculate that val relOffset = targetOffset - baseOffset // check if the index is empty if(entries == 0) return -1 // check if the target offset is smaller than the least offset if(relativeOffset(idx, 0) > relOffset) return -1 // binary search for the entry var lo = 0 var hi = entries-1 while(lo < hi) { val mid = ceil(hi/2.0 + lo/2.0).toInt val found = relativeOffset(idx, mid) if(found == relOffset) return mid else if(found < relOffset) lo = mid else hi = mid - 1 } lo}}
8.2 LogManager的启动
主要分2步:
第一步:根据每个kafka配置的log目录,重建logs。
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs()
……
}
recoveryPointCheckpoints记录了每个log最新的刷新的位置,即刷到磁盘的topic and partition的messages的偏移量,有可能在这之后的LogSegment和OffsetIndex刷新异常,需要特殊处理。
第二步:启动3个task和是否开启日志合并功能
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
//删除过期的数据+删除冗余的数据
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
//flush脏数据
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
//日志合并,把小的多个logsegment合并为大的一个logsegment
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
cleanupLogs:负责删除任何过期的数据和冗余的数据
flushDirtyLogs:负责刷新数据
checkpointRecoveryPointOffsets:对log进行checkpoint,可以提高broker重启的速度,只需要针对checkpoint之后的数据进行特殊处理
cleaner:根据log.cleaner.enable配置是否开启日志合并功能,日志合并指的是是否对相同的key进行压缩,如下图: