源码剖析-LogManager
# 4.9 Kafka源码剖析之LogManager
- kafka日志管理子系统的入口。日志管理器负责日志的创建、抽取、和清理。
- 所有的读写操作都代理给具体的Log实例。
- 日志管理器在一个或多个目录维护日志。新的日志创建到拥有最少log的目录中。
- 分区不移动。
- 通过一个后台线程通过定期截断多余的日志段来处理日志保留。
启动Kafka服务器的脚本:
main方法中创建KafkaServerStartable对象:
该类中包含KakfaServer对象,startup方法调用的是KafkaServer的startup方法:
KafkaServer的startup方法中,启动了LogManager:
/**
* @param logDirs 主题分区目录的File对象
* @param initialOfflineDirs
* @param topicConfigs 主题配置
* @param defaultConfig 主题的默认配置
* @param cleanerConfig 日志清理器配置
* @param ioThreads IO线程数
* @param flushCheckMs
* @param flushRecoveryOffsetCheckpointMs
* @param flushStartOffsetCheckpointMs
* @param retentionCheckMs 检查日志保留的时间
* @param maxPidExpirationMs
* @param scheduler
* @param brokerState
* @param brokerTopicStats
* @param logDirFailureChannel
* @param time 时间
*/
LogManager的startup方法:
/**
* 启动后台线程们用于将日志刷盘以及日志的清理
*/
def startup() {
if (scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
// 用于清除日志片段的调度任务,没有压缩,周期性执行
scheduler.schedule("kafka-log-retention",
cleanupLogs _,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
// 用于日志片段刷盘的调度任务,周期性执行
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs _,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
// 用于将当前broker上各个分区的恢复点写到文本文件的调度任务,周期性执行
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointLogRecoveryOffsets _,
delay = InitialTaskDelayMs,
period = flushRecoveryOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
// 用于将当前broker上各个分区起始偏移量写到文本文件的调度任务,周期性执行
scheduler.schedule("kafka-log-start-offset-checkpoint",
checkpointLogStartOffsets _,
delay = InitialTaskDelayMs,
period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs",
deleteLogs _,
delay = InitialTaskDelayMs,
period = defaultConfig.fileDeleteDelayMs,
TimeUnit.MILLISECONDS)
}
// 如果配置了日志的清理,则启动清理任务
if (cleanerConfig.enableCleaner)
cleaner.startup()
}
# 4.9.1 清除日志片段
cleanupLogs的具体实现:
deleteOldSegments()的实现:
首先找到所有可以删除的日志片段
然后执行删除
该方法执行日志片段的异步删除。步骤如下:
- 将日志片段的信息从map集合移除,之后再也不读了
- 在日志片段的索引和log文件名称后追加.deleted,加标记而已
- 调度异步删除操作,执行.deleted文件的真正删除。
异步删除允许在读取文件的同时执行删除,而不需要进行同步,避免了在读取一个文件的同时物理删除引起的冲突。
该方法不需要将IOException转换为KafkaStorageException,因为该方法要么在所有日志加载之前调用,要么在使用中由调用者处理IOException。
根据日志片段大小进行删除:
shouldDelete是一个函数,作为deleteOldSegments删除日志片段的判断条件。
根据偏移量删除日志片段:
对于当前日志片段是否需要删除,要看它的下一个日志片段的baseOffset是否小于等于日志对外暴露给消费者的日志偏移量,如果小,消费者不用读取,当前日志片段就可以删除。
# 4.9.2 日志片段刷盘
在LogManager的startup中,启动了刷盘的线程:
调用flushDirtyLogs方法进行日志刷盘处理。
Kafka推荐让操作系统后台进行刷盘,使用副本保证数据高可用,这样效率更高。
因此此种方式不推荐。
执行刷盘的方法:
/**
* 日志片段刷盘到offset-1的偏移量位置。
*
* @param offset 从上一个恢复点开始刷盘到该偏移量-1的位置。offset偏移量的不刷盘。
* offset是新的恢复点值。
*/
def flush(offset: Long) : Unit = {
maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") {
// 如果偏移量小于等于该日志的恢复点,则不需要刷盘
if (offset <= this.recoveryPoint)
return
debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " +
s"unflushed: $unflushedMessages")
// 遍历需要刷盘的日志片段
for (segment <- logSegments(this.recoveryPoint, offset))
// 执行刷盘
segment.flush()
lock synchronized {
// 检查MMAP是否关闭
checkIfMemoryMappedBufferClosed()
// 如果偏移量大于恢复点
if (offset > this.recoveryPoint) {
// 设置新的恢复点,表示到达这个偏移量位置的消息都已经刷盘了
this.recoveryPoint = offset
// 设置当前时间为刷盘的时间
lastflushedTime.set(time.milliseconds)
}
}
}
}
# 4.9.3 将当前broker上各个分区的恢复点写到文本文件
方法实现:
方法实现:
# 4.9.4 将当前broker上各个分区起始偏移量写到文本文件
方法实现:
写文本文件:
# 4.9.5 删除日志片段
对标记为删除的日志执行删除的动作:
# 4.9.6 clearner
如果配置了日志清理,则启动清理任务:
cleaners是多个CleanerThread集合:
最终执行清理的是,压缩:
上次更新: 2023/08/12, 20:54:07