源码剖析-KafkaController
# 4.13 Kafka源码剖析之KafkaController
当前broker被选为新的controller的时候,执行如下操作:
注册controller epoch事件监听
controller epoch +1
初始化controller上下文对象,该上下文对象缓存当前所有主题、活跃broker以及所有分区leader的信息
启动controller channel manager
启动副本状态机
启动分区状态机
如果注册为controller的过程中发生了异常,重新注册当前broker为controller,如此则触发新一轮controller选举,以保证永远有一个活跃的controller。
启动Kafka服务器的脚本:
main方法中创建KafkaServerStartable对象:
该类中包含KakfaServer对象,startup方法调用的是KafkaServer的startup方法:
KafkaServer中的startup方法调用了kafkaController的startup方法:
KafkaController的startup方法中,将Startup样例类设置到eventManager中,然后调用eventManager的start方法:
上图中的eventManager.put(Startup)方法实现:
上图中的方法将Startup样例类放到queue中。
queue的实现:
Startup样例类:
其中的process方法执行controller的选举:
上图中1的代码表示当session超时的时候的处理逻辑,也就是controller到zk连接超时重连,触发该逻辑:
方法的实现:
当Controller到zk的连接过期重连的时候,调用方法:
样例类:Reelect
上图中2的代码,表示当controller发生变化的时候的处理逻辑:
方法的实现:
当controller发生变化的时候的处理逻辑(subscribeDataChanges):
调用:
上图中3处的代码表示执行controller的选举:
KafkaController的startup方法中,调用eventManager的start方法:
实现:
thread是ControllerEventThread对象:
ShutdownableThread的实现:
其中的run方法:
只要系统正常运行,就会不断调用doWork方法:
样例类ControllerChange中:
/**
* This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
* required to clean up internal controller data structures
*/
def onControllerResignation() {
debug("Resigning")
// 取消注册ISR变化通知监听器
deregisterIsrChangeNotificationListener()
// 取消注册分区重新分配监听器
deregisterPartitionReassignmentListener()
// 取消注册带偏向的副本leader选举监听器
deregisterPreferredReplicaElectionListener()
// 取消注册log.dirs事件通知监听器
deregisterLogDirEventNotificationListener()
// 重置主题删除管理器
topicDeletionManager.reset()
// 关闭Kafka的leader再平衡调度器
kafkaScheduler.shutdown()
offlinePartitionCount = 0
preferredReplicaImbalanceCount = 0
globalTopicCount = 0
globalPartitionCount = 0
// 取消注册分区再平衡ISR变化监听器
deregisterPartitionReassignmentIsrChangeListeners()
// 关闭分区状态机
partitionStateMachine.shutdown()
// 取消注册主题变化监听器
deregisterTopicChangeListener()
// 取消注册一堆分区修改监听器
partitionModificationsListeners.keys.foreach(deregisterPartitionModificati onsListener)
// 取消注册主题删除监听器
deregisterTopicDeletionListener()
// 关闭副本状态机
replicaStateMachine.shutdown()
// 取消注册broker变化监听器
deregisterBrokerChangeListener()
// 重置controller上下文
resetControllerContext()
// 日志:controller辞职不干了
info("Resigned")
}