跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31

源码剖析-消息存储机制

# 4.6 Kafka源码剖析之消息存储机制

log.dirs/<topic-name>-<partitionno>/{.index,.timeindex,.log}
1

首先查看Kafka如何处理生产的消息:

Kafka_Page276_001

调用副本管理器,将记录追加到分区的副本中。

Kafka_Page276_002

将数据追加到本地的Log日志中:

Kafka_Page276_003

追加消息的实现:

Kafka_Page276_004

遍历需要追加的每个主题分区的消息:

Kafka_Page277_001

调用partition的方法将记录追加到该分区的leader分区中:

Kafka_Page277_002

如果在本地找到了该分区的leader:

Kafka_Page277_003

执行下述逻辑将消息追加到leader分区:

// 获取该分区的log
val log = leaderReplica.log.get
// 获取最小ISR副本数
val minIsr = log.config.minInSyncReplicas
// 计算同步副本的个数
val inSyncSize = inSyncReplicas.size
// 如果同步副本的个数小于要求的最小副本数,并且acks设置的是-1,则不追加消息
if (inSyncSize < minIsr && requiredAcks == -1) {
	throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"
	.format(topicPartition, inSyncSize, minIsr))
}
// 追加消息到leader
val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch,isFromClient)
// 尝试锁定follower获取消息的请求,因为此时leader正在更新LEO。
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
// 如果ISR只有一个元素的话,需要HW+1
(info, maybeIncrementLeaderHW(leaderReplica))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)的实现:

Kafka_Page278_001

Kafka_Page278_002

具体代码实现:

/**
* 验证如下信息:
* 每条消息与其CRC是否匹配
* 每条消息的字节数是否匹配
* 传入记录批的序列号与现有状态以及彼此之间是否一致。
* 同时计算如下值:
* 消息批中第一个偏移量
* 消息批中最后一个偏移量
* 消息个数
* 正确字节的个数
* 偏移量是否单调递增
* 是否使用了压缩编码解码器(如果使用了压缩编解码器,则给出最后一个)
*/
val appendInfo = analyzeAndValidateRecords(records, isFromClient =isFromClient)
// 如果没有消息需要追加或该消息集合与上一个消息集合重复,则返回
if (appendInfo.shallowCount == 0)
return appendInfo
// 在向磁盘日志追加之前剔除不正确的字节或剔除不完整的消息
var validRecords = trimInvalidBytes(records, appendInfo)
// 消息集合剩余的正确部分,插入到日志中
lock synchronized {
   // 检查日志的MMap是否关闭了,如果关闭无法进行写操作,抛异常
   checkIfMemoryMappedBufferClosed()
   if (assignOffsets) {
   	// 如果需要给消息添加偏移量
   	val offset = new LongRef(nextOffsetMetadata.messageOffset)
   	appendInfo.firstOffset = offset.value
   	val now = time.milliseconds
   	val validateAndOffsetAssignResult = try {
   		// 校验消息和赋值给消息的偏移量是否正确无误
   		LogValidator.validateMessagesAndAssignOffsets(validRecords,
   		offset,
   		time,
   		now,
   		appendInfo.sourceCodec,
   		appendInfo.targetCodec,
   		config.compact,
   		config.messageFormatVersion.messageFormatVersion.value,
   		config.messageTimestampType,
   		config.messageTimestampDifferenceMaxMs,
   		leaderEpoch,
   		isFromClient)
   	}
   	catch {
   		case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
   	}
   	// 正确的消息集合,此时处于内存中
   	validRecords = validateAndOffsetAssignResult.validatedRecords
   	// 要追加消息的最大时间戳
   	appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
   	// 要追加的消息的最大时间戳对应的偏移量
   	appendInfo.offsetOfMaxTimestamp =validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
   	// 最后一个偏移量是偏移量的值-1
   	appendInfo.lastOffset = offset.value - 1
   	appendInfo.recordsProcessingStats =validateAndOffsetAssignResult.recordsProcessingStats
   	if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
   	// 如果消息时间戳的类型是日志追加的时间,则需要赋值当前系统时间
   	appendInfo.logAppendTime = now
   	// 需要重新验证消息的大小,以防消息发生改变,如重新压缩或者转换了消息格式
   	if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
   		for (batch <- validRecords.batches.asScala) {
   			// 如果消息集合的字节数大于配置的消息最大字节数,抛异常
   			if (batch.sizeInBytes > config.maxMessageSize) {
   				// we record the original message set size instead of the trimmed size
   				// to be consistent with pre-compression bytesRejectedRate recording
   				brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
   				brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
   				throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d."
   				.format(batch.sizeInBytes, config.maxMessageSize))
   			}
   		}
   	}
   } else {
   	// 如果不需要分配消息偏移量,则使用给定的消息偏移量
   	if (!appendInfo.offsetsMonotonic)
   	// 如果偏移量不是单调递增的,抛异常
   	throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " + records.records.asScala.map(_.offset))
   	// 如果消息批的第一个偏移量小于分区leader日志中下一条记录的偏移量,抛异常。
   	if (appendInfo.firstOffset < nextOffsetMetadata.messageOffset) {
   		// we may still be able to recover if the log is empty
   		// one example: fetching from log start offset on the leader which is not batch aligned,
   		// which may happen as a result of AdminClient#deleteRecords()
   		// appendInfo.firstOffset maybe either first offset or last offset of the first batch.
   		// get the actual first offset, which may require decompressing the data
   		val firstOffset = records.batches.asScala.head.baseOffset()
   		throw new UnexpectedAppendOffsetException(
   		s"Unexpected offset in append to $topicPartition. First offset or last offset of the first batch " +
   		s"${appendInfo.firstOffset} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
   		s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
   		s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
   		firstOffset, appendInfo.lastOffset)
   	}
   }
   // 使用leader给消息赋值的epoch值更新缓存的epoch值。
   validRecords.batches.asScala.foreach {
   	batch =>
   	if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
   	// 需要在epoch中记录leader的epoch值和消息集合的起始偏移量
   	leaderEpochCache.assign(batch.partitionLeaderEpoch,batch.baseOffset)
   }
   // 检查消息批的字节大小是否大于日志分段的最大值,如果是,则抛异常
   if (validRecords.sizeInBytes > config.segmentSize) {
   	throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d."
   	.format(validRecords.sizeInBytes, config.segmentSize))
   }
   // 消息批的消息都正确,偏移量也都赋值了,时间戳也更新了
   // 此时需要验证生产者的幂等性/事务状态,并收集一些元数据
   val (updatedProducers, completedTxns, maybeDuplicate) =analyzeAndValidateProducerState(validRecords, isFromClient)
   // 如果是重复的消息批,则直接返回被重复的消息批的appendInfo
   maybeDuplicate.foreach {
   	duplicate =>
   	appendInfo.firstOffset = duplicate.firstOffset
   	appendInfo.lastOffset = duplicate.lastOffset
   	appendInfo.logAppendTime = duplicate.timestamp
   	appendInfo.logStartOffset = logStartOffset
   	return appendInfo
   }
   // 如果当前日志分段写满了,则滚动日志分段
   val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
   maxTimestampInMessages = appendInfo.maxTimestamp,
   maxOffsetInMessages = appendInfo.lastOffset)
   val logOffsetMetadata = LogOffsetMetadata(
   messageOffset = appendInfo.firstOffset,
   segmentBaseOffset = segment.baseOffset,
   relativePositionInSegment = segment.size)
   // 日志片段中追加消息
   segment.append(firstOffset = appendInfo.firstOffset,
   largestOffset = appendInfo.lastOffset,
   largestTimestamp = appendInfo.maxTimestamp,
   shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
   records = validRecords)
   // 更新生产者状态
   for ((producerId, producerAppendInfo) <- updatedProducers) {
   	producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
   	producerStateManager.update(producerAppendInfo)
   }
   // update the transaction index with the true last stable offset. The last offset visible
   // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
   for (completedTxn <- completedTxns) {
   	val lastStableOffset = producerStateManager.completeTxn(completedTxn)
   	segment.updateTxnIndex(completedTxn, lastStableOffset)
   }
   // always update the last producer id map offset so that the snapshot reflects the current offset
   // even if there isn't any idempotent data being written
   producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
   // leader的LEO+1
   updateLogEndOffset(appendInfo.lastOffset + 1)
   // update the first unstable offset (which is used to compute LSO)
   updateFirstUnstableOffset()
   trace(s"Appended message set to log with last offset ${appendInfo.lastOffset} " +
   s"first offset: ${appendInfo.firstOffset}, " +
   s"next offset: ${nextOffsetMetadata.messageOffset}, " +
   s"and messages: $validRecords")
   // 如果未刷盘的消息个数大于配置的消息个数,刷盘
   if (unflushedMessages >= config.flushInterval)
   // 刷盘
   flush()
   appendInfo
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
上次更新: 2025/04/03, 11:07:08
源码剖析-Consumer消费者流程
源码剖析-SocketServer

← 源码剖析-Consumer消费者流程 源码剖析-SocketServer→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式