跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31

源码剖析-KafkaHealthcheck

# 4.14 Kafka源码剖析之KafkaHealthcheck

健康检查的初始化和启动:

在启动KafkaServer的startup方法中,实例化并启动了健康检查:

Kafka_Page338_001

健康检查的startup方法的执行逻辑:

Kafka_Page339_001

注册状态监听器的具体实现:

subscribeStateChanges(listener)具体实现:

调用zookeeper客户端的方法,该方法将监听器对象添加到_stateListener这个Set集合中:

Kafka_Page339_003

zookeeper客户端的回调方法:

新建会话事件触发监听器:

Kafka_Page339_004

如果发生了zk重连,则需要重新在zk中注册当前borker。

Kafka_Page340_001

会话建立异常,触发监听器:

Kafka_Page340_002

无法建立到zk的连接:

Kafka_Page340_003

状态改变,触发执行监听器方法:

Kafka_Page340_004

只要状态发生改变,就标记当前事件的发生。用于监控。

Kafka_Page340_005

其中register方法具体逻辑:

解决端点的主机名端口号,然后调用zkUtil的方法将当前broker的信息注册到zookeeper中:

Kafka_Page341_001

registerBrokerInZk的具体逻辑:

/**
* 如果Kafka的apiVersion不低于0.10.0.X,则使用json v4格式(包含多个端点和机架)注册broker。
* 否则使用json v2格式注册。
*
* json v4格式包含了默认的端点以兼容老客户端。
*
* @param id broker ID
* @param host broker host name
* @param port broker port
* @param advertisedEndpoints broker对外ᨀ供服务的端点
* @param jmxPort jmx port
* @param rack broker所在机架
* @param apiVersion Kafka version the broker is running as
*/
def registerBrokerInZk(id: Int,
host: String,
port: Int,
advertisedEndpoints: Seq[EndPoint],
jmxPort: Int,
rack: Option[String],
apiVersion: ApiVersion) {
   // /brokers/ids/<broker.id>
   val brokerIdPath = BrokerIdsPath + "/" + id
   // see method documentation for reason why we do this
   val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
   val json = Broker.toJson(version, id, host, port, advertisedEndpoints,jmxPort, rack)
   // 将broker信息注册到指定的路径。该znode的值就是json字符串
   // 默认znode节点是:/broker
   registerBrokerInZk(brokerIdPath, json)
   info("Registered broker %d at path %s with addresses: %s".format(id,brokerIdPath, advertisedEndpoints.mkString(",")))
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

在zk中注册broker的具体实现:

Kafka_Page342_001

主要是在zk的/brokers/[0...N] 路径上建立该Broker的信息,并且该节点是ZK中的Ephemeral Node,当此Broker离线的时候,zk上对应的节点也就消失了,那么其它Broker可以及时发现该Broker的异常。

class KafkaHealthcheck(private val brokerId: Int,
private val advertisedHost: String,
private val advertisedPort: Int,
private val zkSessionTimeoutMs: Int,
private val zkClient: ZkClient) extends Logging {
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
   val sessionExpireListener = new SessionExpireListener
   def startup() {
   	zkClient.subscribeStateChanges(sessionExpireListener)
   	register()
   }
   def shutdown() {
   	zkClient.unsubscribeStateChanges(sessionExpireListener)
   	ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
   }
   def register() {
   	val advertisedHostName =
   	if(advertisedHost == null || advertisedHost.trim.isEmpty)
   	InetAddress.getLocalHost.getCanonicalHostName else
   	advertisedHost
   	val jmxPort = System.getProperty("com.sun.management.jmxremote.port","-1").toInt
   	//在/brokers/ids/路径下存储broker的基本消息,例如端口号,ip地址,时间戳等,以上内容均在Ephemeral Node上,只要该broker和zk失去链接,则zk对应目录的内容被清空
   	ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName,advertisedPort, zkSessionTimeoutMs, jmxPort)
   }
   //该SessionExpireListener的作用就是重建broker的节点,防止短暂的和zk失去链接之后,该broker对应的节点也全部丢失了
   class SessionExpireListener() extends IZkStateListener {
   	@throws(classOf[Exception])
   	def handleStateChanged(state: KeeperState) {
   		// do nothing, since zkclient will do reconnect for us.
   	}
   	def handleNewSession() {
   		info("re-registering broker info in ZK for broker " + brokerId)
   		register()
   		info("done re-registering broker")
   		info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
   	}
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
上次更新: 2025/04/03, 11:07:08
源码剖析-KafkaController
源码剖析-DynamicConfigManager

← 源码剖析-KafkaController 源码剖析-DynamicConfigManager→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式