跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31

源码剖析-DynamicConfigManager

# 4.15 Kafka源码剖析之DynamicConfigManager

工作流程如下:

配置存储于/config/entityType/entityName,如/config/topics/<topic_name>以及/config/clients/<clientId>

默认配置存储与各自的<default>节点中,上述节点中保存的是覆盖默认配置的数据,以properties的格式。

可以使用分级路径同时指定多个实体的名称,如:/config/users/<user>/clients/<clientId>

设置通知路径/config/changes,避免对所有主题进行监控,有事通知。DynamicConfigManager监控该路径。

更新配置的第一步是更新配置的properties。

之后,在/config/changes/下创建一个新的序列znode,类似于/config/changes/config_change_12231,该节点保存了实体类型和实体名称。

序列znode包含的数据形式:{"version" : 1, "entity_type":"topic/client", "entity_name" :"topic_name/client_id"}

这只是一个通知,真正的配置数据存储于/config/entityType/entityName节点

版本2的通知格式:{"version" : 2, "entity_path":"entity_type/entity_name"}

可以使用分级路径指定多个实体:如,users/<user>/clients/<clientId>

该类对所有的broker设置监视器。监视器工作流程如下:

  1. 监视器读取所有的配置更改通知。
  2. 监视器跟踪它应用过的后缀数字最高的配置更新。
  3. 监视器先前处理过的通知,15min之后监视器将其删除。
  4. 对于新的更改,监视器读取新的配置,将新的配置和默认配置整合,然后更新现有的配置。

配置永远从zk配置路径读取,通知仅用于触发该动作。

如果一个broker宕机,错过了一个更新,没问题——当broker重启的时候,加载所有的配置。

注意:如果有两个连续的配置更新,可能只有最后一个会处理(因为在broker读取配置信息的时候,可能两个更新都处理过了)。

此时,broker不需要进行两次配置更新,虽然人畜无害。

DynamicConfigManager重启的时候,重新处理所有的通知。可能有点儿浪费资源,但是它避免了丢失配置更新。

但要避免在启动时出现任何竞争情况, 因为这些情况可能会丢失初始配置加载与注册更改通知之间的更改。

KafkaServer启动的时候,在startup方法中,配置动态配置管理器,并启动动态配置管理器:

Kafka_Page344_001

DynamicConfigManager的startup方法的逻辑:

在动态配置管理器启动的时候,首先执行一遍配置更新。

Kafka_Page344_002

configChangeListener.init()方法的具体实现:

Kafka_Page345_001

上图中68行订阅子节点个数变化监听器,具体实现:

Kafka_Page345_002

上图中标红框的是订阅子节点个数变化监听器,只要子节点个数发生变化,就回调listener。

listener是哪个?NodeChangeListener

Kafka_Page345_003

NodeChangeListener的具体实现:

Kafka_Page346_001

处理通知的实现:

/**
* 处理给定的通知列表中的所有通知
*/
private def processNotifications(notifications: Seq[String]) {
   // 如果通知非空
   if (notifications.nonEmpty) {
   	info(s"Processing notification(s) to $seqNodeRoot")
   	try {
   		val now = time.milliseconds
   		// 遍历通知集合
   		for (notification <- notifications) {
   			// 获取通知的编号
   			val changeId = changeNumber(notification)
   			// 对比最后执行的修改通知编号,如果当前通知编号大于上次执行的,就执行配置更新
   			if (changeId > lastExecutedChange) {
   				// /config/changes/config_change_12121
   				val changeZnode = seqNodeRoot + "/" + notification
   				// 读取该通知节点的内容
   				val data = zkUtils.readDataMaybeNull(changeZnode)._1.orNull
   				if (data != null) {
   					// 如果有需要更改的数据,则执行配置的更新
   					notificationHandler.processNotification(data)
   				} else {
   					logger.warn(s"read null data from $changeZnode when processing notification $notification")
   				}
   				// 修改上次已执行编号为当前节点编号
   				lastExecutedChange = changeId
   			}
   		}
   		// 移除过期的通知
   		purgeObsoleteNotifications(now, notifications)
   	}
   	catch {
   		case e: ZkInterruptedException =>
   		if (!isClosed.get)
   		throw e
   	}
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

上面代码中第22行的实现:

首先,notificationHandler是哪个?

Kafka_Page347_001

该类在哪里实例化?

Kafka_Page347_002

Kafka_Page347_003

即notificationHandler就是ConfigChangedNotificationHandler类。

 notificationHandler.processNotification(data)
1

上面代码的具体实现:

Kafka_Page347_004

Kafka_Page348_001

如果版本1,则:

Kafka_Page348_002

如果版本2,则:

Kafka_Page348_003

具体实现:

def processConfigChanges(topic: String, topicConfig: Properties) {
   // Validate the configurations.
   // 找出需要排除的配置条目
   val configNamesToExclude = excludedConfigs(topic, topicConfig)
   // 过滤出当前指定主题的所有分区日志
   val logs = logManager.logsByTopicPartition.filterKeys(_.topic ==topic).values.toBuffer
   // 如果日志非空
   if (logs.nonEmpty) {
   	// 整合默认配置和zk中覆盖默认的配置,创建新的Log配置信息
   	val props = new Properties()
   	// 添加默认配置
   	props ++= logManager.defaultConfig.originals.asScala
   	// 遍历覆盖默认配置的条目,如果该条目不在要排除的集合中,则直接put到props中
   	// 该操作会覆盖默认相同key的配置
   	topicConfig.asScala.foreach {
   		case (key, value) =>
   		if (!configNamesToExclude.contains(key)) props.put(key, value)
   	}
   	// 实例化新的logConfig
   	val logConfig = LogConfig(props)
   	if ((topicConfig.containsKey(LogConfig.RetentionMsProp)
   	||topicConfig.containsKey(LogConfig.MessageTimestampDifferenceMaxMsProp))
   	&& logConfig.retentionMs <logConfig.messageTimestampDifferenceMaxMs)
   	warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}. It is smaller than " +
   	s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${logConfig.messageTimestampDifferenceMaxMs}. " +
   	s"This may result in frequent log rolling.")
   	// 更新当前主题所有分区日志的配置信息
   	logs.foreach(_.config = logConfig)
   }
   def updateThrottledList(prop: String, quotaManager: ReplicationQuotaManager) = {
   	if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0) {
   		val partitions = parseThrottledPartitions(topicConfig,kafkaConfig.brokerId, prop)
   		quotaManager.markThrottled(topic, partitions)
   		logger.debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
   	} else {
   		quotaManager.removeThrottle(topic)
   		logger.debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
   	}
   }
   updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp,quotas.leader)
   updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp,quotas.follower)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

删除过期配置更新通知节点。通过时间对比,过期时间为:15min。

Kafka_Page350_001

Kafka_Page350_002

获取指定实体类型中各个实体的配置信息:

Kafka_Page350_003

Kafka_Page351_001

getEntityConfigRootPath(entityType)的具体实现:

Kafka_Page351_002

其中,主题配置管理器TopicConfigHandler:

def processConfigChanges(topic: String, topicConfig: Properties) {
   // Validate the configurations.
   // 找出需要排除的配置条目
   val configNamesToExclude = excludedConfigs(topic, topicConfig)
   // 过滤出当前指定主题的所有分区日志
   val logs = logManager.logsByTopicPartition.filterKeys(_.topic ==topic).values.toBuffer
   // 如果日志非空
   if (logs.nonEmpty) {
   	// 整合默认配置和zk中覆盖默认的配置,创建新的Log配置信息
   	val props = new Properties()
   	// 添加默认配置
   	props ++= logManager.defaultConfig.originals.asScala
   	// 遍历覆盖默认配置的条目,如果该条目不在要排除的集合中,则直接put到props中
   	// 该操作会覆盖默认相同key的配置
   	topicConfig.asScala.foreach {
   		case (key, value) =>
   		if (!configNamesToExclude.contains(key)) props.put(key, value)
   	}
   	// 实例化新的logConfig
   	val logConfig = LogConfig(props)
   	if ((topicConfig.containsKey(LogConfig.RetentionMsProp) ||topicConfig.containsKey(LogConfig.MessageTimestampDifferenceMaxMsProp))
   	&& logConfig.retentionMs <logConfig.messageTimestampDifferenceMaxMs)
   	warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}. It is smaller than " +
   	s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${logConfig.messageTimestampDifferenceMaxMs}. " +
   	s"This may result in frequent log rolling.")
   	// 更新当前主题所有分区日志的配置信息
   	logs.foreach(_.config = logConfig)
   }
   def updateThrottledList(prop: String, quotaManager:ReplicationQuotaManager) = {
   	if (topicConfig.containsKey(prop) &&topicConfig.getProperty(prop).length > 0) {
   		val partitions = parseThrottledPartitions(topicConfig,kafkaConfig.brokerId, prop)
   		quotaManager.markThrottled(topic, partitions)
   		logger.debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
   	} else {
   		quotaManager.removeThrottle(topic)
   		logger.debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
   	}
   }
   updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader)
   updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp,quotas.follower)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
上次更新: 2025/04/03, 11:07:08
源码剖析-KafkaHealthcheck
源码剖析-分区消费模式

← 源码剖析-KafkaHealthcheck 源码剖析-分区消费模式→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式