跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
          • 4.10.1 副本管理器的启动和ISR的收缩和扩展
          • 4.10.2 follower副本如何与leader同步消息
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31
目录

源码剖析-ReplicaManager

# 4.10 Kafka源码剖析之ReplicaManager

# 4.10.1 副本管理器的启动和ISR的收缩和扩展

在启动KafkaServer的时候,运行KafkaServer的startup方法。在该方法中实例化ReplicaManager,并调用ReplicaManager的startup方法:

Kafka_Page307_001

ReplicaManager的startup方法:

Kafka_Page307_002

处理ISR收缩的情况:

Kafka_Page307_003

def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
   val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
   	leaderReplicaIfLocal match {
   		case Some(leaderReplica) =>
   		// 获取ISR中的不同步副本
   		val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica,replicaMaxLagTimeMs)
   		// 如果该集合不为空,则需要收缩ISR
   		if(outOfSyncReplicas.nonEmpty) {
   			// 从ISR中除去非同步副本
   			val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
   			assert(newInSyncReplicas.nonEmpty)
   			info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","),
   			newInSyncReplicas.map(_.brokerId).mkString(",")))
   			// 在缓存和zk中更新ISR
   			updateIsr(newInSyncReplicas)
   			// 标记ISR收缩事件
   			replicaManager.isrShrinkRate.mark()
   			// 由于ISR可能发生变化,变为1,如果ISR发生变化,需要增加HW
   			maybeIncrementLeaderHW(leaderReplica)
   		} else {
   			false
   		}
   		// 如果leader不在当前broker,则什么都不做
   		case None => false
   	}
   }
   // 当更新HW之后,尝试完成因ISR收缩而阻塞的操作。
   if (leaderHWIncremented)
   tryCompleteDelayedRequests()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

对于在ISR集合中的副本,检查有没有需要从ISR中移除的:

两种情况需要从ISR中移除:

  1. 卡主的Follower:如果副本的LEO经过maxLagMs毫秒还没有更新,则Follower卡主了,需要从ISR移除

  2. 慢Follower:如果副本从maxLagMs毫秒之前到现在还没有读到leader的LEO,则Follower落后,需要从ISR移除。

Kafka_Page308_001

处理ISR变动事件广播:

同时startup方法中周期性地调用maybePropagateIsrChanges()方法:

该函数周期性运行,检查ISR是否需要扩展。两种情况发生ISR的广播:

  1. 有尚未广播的ISR变动
  2. 最近5s没有发生ISR变动,或者上次ISR广播已经过去60s了。

该方法保证在ISR偶尔发生变动时,几秒之内即可将ISR变动广播出去。

避免了当发生大量ISR变更时压垮controller和其他broker。

Kafka_Page309_001

处理日志目录异常的失败:

Kafka_Page309_002

# 4.10.2 follower副本如何与leader同步消息

副本管理器类:

Kafka_Page309_003

副本管理器类在实例化的时候创建ReplicaFetcherManager对象,该对象是负责从leader拉取消息与leader保持同步的线程管理器:

Kafka_Page310_001

方法的具体实现:

创建负责从leader拉取消息与leader保持同步的线程管理器:

Kafka_Page310_002

Kafka_Page310_003

副本拉取管理器中实现了createFetcherThread方法,该方法返回ReplicaFetcherThread对象:

Kafka_Page310_004

ReplicaFetcherThread线程负责从Leader副本拉取消息进行同步。

Kafka_Page311_001

AbstractFetcherManager中的addFetcherForPartitions方法中的嵌套方法addAndStartFetcherThread创建并启动拉取线程:

而其中用到的createFetcherThread方法便是在AbstractFetcherManager的实现类ReplicaFetcherManager中实现的。

Kafka_Page311_002

抽象类AbstractFetcherThread从同一个远程broker上为当前broker上的多个分区follower副本拉取消息。

即,在远程同一个broker上有多个leader副本的follower副本在当前broker上。

Kafka_Page311_003

ReplicaFetcherThread的start方法实际上就是AbstractFetcherThread中的start方法。

在AbstractFetcherThread中没有start方法,在其父类ShutdownableThread也没有start方法:

Kafka_Page312_001

但是ShutdownableThread继承自Thread,Thread中有start方法,并且start方法要调用run方法,在ShutdownableThread中有run方法:

Kafka_Page312_002

该run方法重复调用doWork方法进行数据的拉取。

doWork方法是抽象方法,没有实现。其实现在ShutdownableThread的实现类AbstractFetcherThread中:

Kafka_Page313_001

上图中的doWork方法会反复调用,上图中的方法创建拉取请求对象,然后调用processFetchRequest方法进行请求的发送和结果的处理。

Kafka_Page313_002

Kafka_Page313_003

fetch方法的实现在AbstractFetcherThread的子类ReplicaFetcherThread中:

Kafka_Page313_004

sendRequest方法在ReplicaFetcherBlockingSend中:

Kafka_Page314_001

通过NetworkClientUtils发送请求,并等待请求的响应:

Kafka_Page314_002

KafkaApis对Fetch的处理:

Kafka_Page314_003

Kafka_Page314_004

该方法中,Leader从本地日志读取数据,返回:

Kafka_Page315_001

总结:

当KafkaServer启动的时候,会实例化副本管理器

Kafka_Page315_002

副本管理器实例化的时候会实例化副本拉取器管理器:

Kafka_Page315_003

副本管理器中有实现createFetcherThread方法,创建副本拉取器对象

Kafka_Page315_004

拉取线程启动起来之后不断地从leader副本所在的broker拉取消息,以便Follower与leader保持消息的同步。

上次更新: 2025/04/03, 11:07:08
源码剖析-LogManager
源码剖析-OffsetManager

← 源码剖析-LogManager 源码剖析-OffsetManager→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式