源码剖析-KafkaHealthcheck
# 4.14 Kafka源码剖析之KafkaHealthcheck
健康检查的初始化和启动:
在启动KafkaServer的startup方法中,实例化并启动了健康检查:
健康检查的startup方法的执行逻辑:
注册状态监听器的具体实现:
subscribeStateChanges(listener)具体实现:
调用zookeeper客户端的方法,该方法将监听器对象添加到_stateListener这个Set集合中:
zookeeper客户端的回调方法:
新建会话事件触发监听器:
如果发生了zk重连,则需要重新在zk中注册当前borker。
会话建立异常,触发监听器:
无法建立到zk的连接:
状态改变,触发执行监听器方法:
只要状态发生改变,就标记当前事件的发生。用于监控。
其中register方法具体逻辑:
解决端点的主机名端口号,然后调用zkUtil的方法将当前broker的信息注册到zookeeper中:
registerBrokerInZk的具体逻辑:
/**
* 如果Kafka的apiVersion不低于0.10.0.X,则使用json v4格式(包含多个端点和机架)注册broker。
* 否则使用json v2格式注册。
*
* json v4格式包含了默认的端点以兼容老客户端。
*
* @param id broker ID
* @param host broker host name
* @param port broker port
* @param advertisedEndpoints broker对外ᨀ供服务的端点
* @param jmxPort jmx port
* @param rack broker所在机架
* @param apiVersion Kafka version the broker is running as
*/
def registerBrokerInZk(id: Int,
host: String,
port: Int,
advertisedEndpoints: Seq[EndPoint],
jmxPort: Int,
rack: Option[String],
apiVersion: ApiVersion) {
// /brokers/ids/<broker.id>
val brokerIdPath = BrokerIdsPath + "/" + id
// see method documentation for reason why we do this
val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
val json = Broker.toJson(version, id, host, port, advertisedEndpoints,jmxPort, rack)
// 将broker信息注册到指定的路径。该znode的值就是json字符串
// 默认znode节点是:/broker
registerBrokerInZk(brokerIdPath, json)
info("Registered broker %d at path %s with addresses: %s".format(id,brokerIdPath, advertisedEndpoints.mkString(",")))
}
在zk中注册broker的具体实现:
主要是在zk的/brokers/[0...N] 路径上建立该Broker的信息,并且该节点是ZK中的Ephemeral Node,当此Broker离线的时候,zk上对应的节点也就消失了,那么其它Broker可以及时发现该Broker的异常。
class KafkaHealthcheck(private val brokerId: Int,
private val advertisedHost: String,
private val advertisedPort: Int,
private val zkSessionTimeoutMs: Int,
private val zkClient: ZkClient) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
val sessionExpireListener = new SessionExpireListener
def startup() {
zkClient.subscribeStateChanges(sessionExpireListener)
register()
}
def shutdown() {
zkClient.unsubscribeStateChanges(sessionExpireListener)
ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
}
def register() {
val advertisedHostName =
if(advertisedHost == null || advertisedHost.trim.isEmpty)
InetAddress.getLocalHost.getCanonicalHostName else
advertisedHost
val jmxPort = System.getProperty("com.sun.management.jmxremote.port","-1").toInt
//在/brokers/ids/路径下存储broker的基本消息,例如端口号,ip地址,时间戳等,以上内容均在Ephemeral Node上,只要该broker和zk失去链接,则zk对应目录的内容被清空
ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName,advertisedPort, zkSessionTimeoutMs, jmxPort)
}
//该SessionExpireListener的作用就是重建broker的节点,防止短暂的和zk失去链接之后,该broker对应的节点也全部丢失了
class SessionExpireListener() extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
def handleNewSession() {
info("re-registering broker info in ZK for broker " + brokerId)
register()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
}
}
}
上次更新: 2023/08/12, 20:54:07