源码剖析-OffsetManager
# 4.11 Kafka源码剖析之OffsetManager
消费者如何提交偏移量?
自动提交
手动提交
- 同步提交
- 异步提交
客户端提交偏移量,交给KafkaApis的handle方法,handle方法使用模式匹配,调用handleOffsetCommitRequest方法进行处理:
handleOffsetCommitRequest的实现:
如果apiVersion的值是0,则交给zookeeper保存偏移量信息:
否则调用组协调器负责处理偏移量提交请求:
handleCommitOffsets的实现:
首先根据groupId查找消费组元数据。
如果没有找到消费组元数据,则要么该消费组不依赖Kafka进行消费组管理,允许提交;要么提交的偏移量信息是消费组再平衡之前的偏移量,旧请求,拒绝。
正常情况就是最后的分支:
找到了消费组元数据,调用doCommitOffsets处理。偏移量提交的请求。
doCommitOffsets的实现:
该方法判断消费组的状态:
- 如果是Dead,则响应错误信息。
- 如果消费组还在等待消费者同步,则响应错误信息
- 如果消费组中没有这个消费者,则响应错误信息
- 如果请求中的纪元数字和消费组当前纪元数字不符,则响应错误信息
- 如果仅使用Kafka存储偏移量,而不需要管理,则直接保存偏移量
- 正常情况下,找到了消费组,消费组中有这个消费者,同时消费组工作正常,则保存偏移量信息
storeOffsets方法的实现:
需要先计算当前消费组的偏移量需要提交到 __consumer_offsets 主题的哪个分区中。
将消息追加到 __consumer_offsets 主题的指定分区中:
其中计算 __consumer_offsets 分区的实现:
上图中的函数,计算方式如下:
获取消费组ID的散列值,取绝对值,然后将此绝对值对 __consumer_offsets 主题分区个数取模得到。
appendForGroup方法的实现:
调用副本管理器的方法将消息追加到 __consumer_offsets 主题的指定分区日志中。
如果偏移量消息追加成功,则调用callback响应客户端:
缓存偏移量信息:
具体实现:
responseCallback最终是KafkaApis中的308行(有可能不是,因为我加注释了,差不多这么多行):
该函数将消费者提交的偏移量追加到日志中并添加到消费组缓存中之后,返回结果给消费者客户端。
消费者提交偏移量:KafkaApis,KafkaApis->GroupCoordinator的方法-> GroupMetadata
不仅需要将消费组的偏移量提交到日志中,还需要在内存维护该偏移量信息。
其实对于消费者,获取结果后,也需要在消费者客户端解析该响应,将消费者的偏移量缓存到消费者客户端:
消费者客户端消费消息的方法:KafkaConsumer.poll(1_000);
调用poll方法拉取消息:
该方法调用pollOnce进行消息的拉取:
pollOnce方法会调用coordinator的poll方法周期性地提交偏移量:
其中poll方法的实现:
poll方法中,最后会判断是否需要自动提交偏移量:
invokeCompletedOffsetCommitCallbacks方法用于轮询偏移量提交后broker端的响应信息:
onCommitCompleted的实现:
lastCommittedOffsets为:
KafkaConsumer -> Broker -> KafkaApis -handle-> GroupCoordinator ->GroupMetadataManager -> GroupMetadata -> ReplicaManager -> log-> KafkaConsumer ->lastCommittedOffsets集合。
在Kafka 1.0.2之前的版本中有一个OffsetManager负责偏移量的处理。
OffsetManager主要提供对offset的保存和读取,kafka管理topic的偏移量有2种方式:
- zookeeper,即把偏移量提交至zk上;
- kafka,即把偏移量提交至kafka内部,主要由offsets.storage参数决定。1.0.2版本中默认是kafka。也就是说如果配置offsets.storage= kafka,则kafka会把这种offsetcommit请求转变为一种Producer,保存至topic为 __consumer_offsets 的log里面。
class OffsetManager(val config: OffsetManagerConfig,
replicaManager: ReplicaManager,
zkClient: ZkClient,
scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
//通过offsetsCache提供对GroupTopicPartition的查询
private val offsetsCache = new Pool[GroupTopicPartition,OffsetAndMetadata]
//把过时的偏移量刷入磁盘,因为这些偏移量长时间没有被更新,意味着消费者可能不再消费了,也就不需要了,因此刷入到磁盘
scheduler.schedule(name = "offsets-cache-compactor",
fun = compact,
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
主要完成2件事情:
- 提供对topic偏移量的查询
- 将偏移量消息刷入
__consumer_offsets
主题的log中。