跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31

源码剖析-OffsetManager

# 4.11 Kafka源码剖析之OffsetManager

消费者如何提交偏移量?

  • 自动提交

  • 手动提交

    • 同步提交
    • 异步提交

客户端提交偏移量,交给KafkaApis的handle方法,handle方法使用模式匹配,调用handleOffsetCommitRequest方法进行处理:

Kafka_Page316_001

handleOffsetCommitRequest的实现:

Kafka_Page316_002

如果apiVersion的值是0,则交给zookeeper保存偏移量信息:

Kafka_Page317_001

否则调用组协调器负责处理偏移量提交请求:

Kafka_Page317_002

handleCommitOffsets的实现:

首先根据groupId查找消费组元数据。

如果没有找到消费组元数据,则要么该消费组不依赖Kafka进行消费组管理,允许提交;要么提交的偏移量信息是消费组再平衡之前的偏移量,旧请求,拒绝。

正常情况就是最后的分支:

找到了消费组元数据,调用doCommitOffsets处理。偏移量提交的请求。

Kafka_Page318_001

doCommitOffsets的实现:

该方法判断消费组的状态:

  1. 如果是Dead,则响应错误信息。
  2. 如果消费组还在等待消费者同步,则响应错误信息
  3. 如果消费组中没有这个消费者,则响应错误信息
  4. 如果请求中的纪元数字和消费组当前纪元数字不符,则响应错误信息
  5. 如果仅使用Kafka存储偏移量,而不需要管理,则直接保存偏移量
  6. 正常情况下,找到了消费组,消费组中有这个消费者,同时消费组工作正常,则保存偏移量信息

Kafka_Page319_001

storeOffsets方法的实现:

Kafka_Page319_002

需要先计算当前消费组的偏移量需要提交到 __consumer_offsets 主题的哪个分区中。

Kafka_Page319_003

将消息追加到 __consumer_offsets 主题的指定分区中:

Kafka_Page320_001

其中计算 __consumer_offsets 分区的实现:

Kafka_Page320_002

上图中的函数,计算方式如下:

获取消费组ID的散列值,取绝对值,然后将此绝对值对 __consumer_offsets 主题分区个数取模得到。

appendForGroup方法的实现:

调用副本管理器的方法将消息追加到 __consumer_offsets 主题的指定分区日志中。

Kafka_Page320_003

如果偏移量消息追加成功,则调用callback响应客户端:

Kafka_Page320_004

缓存偏移量信息:

Kafka_Page321_001

具体实现:

Kafka_Page321_002

Kafka_Page321_003

responseCallback最终是KafkaApis中的308行(有可能不是,因为我加注释了,差不多这么多行):

该函数将消费者提交的偏移量追加到日志中并添加到消费组缓存中之后,返回结果给消费者客户端。

Kafka_Page321_004

消费者提交偏移量:KafkaApis,KafkaApis->GroupCoordinator的方法-> GroupMetadata

不仅需要将消费组的偏移量提交到日志中,还需要在内存维护该偏移量信息。

其实对于消费者,获取结果后,也需要在消费者客户端解析该响应,将消费者的偏移量缓存到消费者客户端:

消费者客户端消费消息的方法:KafkaConsumer.poll(1_000);

调用poll方法拉取消息:

该方法调用pollOnce进行消息的拉取:

Kafka_Page322_001

pollOnce方法会调用coordinator的poll方法周期性地提交偏移量:

Kafka_Page322_002

其中poll方法的实现:

Kafka_Page323_001

poll方法中,最后会判断是否需要自动提交偏移量:

Kafka_Page323_002

Kafka_Page323_003

Kafka_Page324_001

Kafka_Page324_002

Kafka_Page324_003

invokeCompletedOffsetCommitCallbacks方法用于轮询偏移量提交后broker端的响应信息:

Kafka_Page324_004

Kafka_Page325_001

Kafka_Page325_002

onCommitCompleted的实现:

Kafka_Page325_003

lastCommittedOffsets为:

Kafka_Page326_001

KafkaConsumer -> Broker -> KafkaApis -handle-> GroupCoordinator ->GroupMetadataManager -> GroupMetadata -> ReplicaManager -> log-> KafkaConsumer ->lastCommittedOffsets集合。

在Kafka 1.0.2之前的版本中有一个OffsetManager负责偏移量的处理。

OffsetManager主要提供对offset的保存和读取,kafka管理topic的偏移量有2种方式:

  1. zookeeper,即把偏移量提交至zk上;
  2. kafka,即把偏移量提交至kafka内部,主要由offsets.storage参数决定。1.0.2版本中默认是kafka。也就是说如果配置offsets.storage= kafka,则kafka会把这种offsetcommit请求转变为一种Producer,保存至topic为 __consumer_offsets 的log里面。
class OffsetManager(val config: OffsetManagerConfig,
replicaManager: ReplicaManager,
zkClient: ZkClient,
scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
   //通过offsetsCache提供对GroupTopicPartition的查询
   private val offsetsCache = new Pool[GroupTopicPartition,OffsetAndMetadata]
   //把过时的偏移量刷入磁盘,因为这些偏移量长时间没有被更新,意味着消费者可能不再消费了,也就不需要了,因此刷入到磁盘
   scheduler.schedule(name = "offsets-cache-compactor",
   fun = compact,
   period = config.offsetsRetentionCheckIntervalMs,
   unit = TimeUnit.MILLISECONDS)
1
2
3
4
5
6
7
8
9
10
11

主要完成2件事情:

  1. 提供对topic偏移量的查询
  2. 将偏移量消息刷入__consumer_offsets主题的log中。
上次更新: 2025/04/03, 11:07:08
源码剖析-ReplicaManager
源码剖析-KafkaApis

← 源码剖析-ReplicaManager 源码剖析-KafkaApis→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式