源码剖析-Broker启动流程
# 4.2 Kafka源码剖析之Broker启动流程
# 4.2.1 启动kafka
命令如下:
kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
kafka-server-start.sh内容如下:
if [ $# -lt 1 ]; then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*) ;;
esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
# 4.2.2 查看Kafka.Kafka源码
def main(args: Array[String]): Unit = {
try {
// 读取启动配置
val serverProps = getPropsFromArgs(args)
// 封装KafkaServer
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c)
registerLoggingSignalHandler()
// attach shutdown handler to catch terminating signals as well as normal termination
// 增加回调监听
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
}
)
// 启动服务
kafkaServerStartable.startup()
// 等待
kafkaServerStartable.awaitShutdown()
}
catch {
case e: Throwable =>
fatal(e)
Exit.exit(1)
}
Exit.exit(0)
}
上面的 kafkaServerStartabl 封装了 KafkaServer ,最终执行 startup 的是KafkaServer
class KafkaServerStartable(val serverConfig: KafkaConfig, reporters:Seq[KafkaMetricsReporter]) extends Logging {
private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
// 启动
def startup() {
try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
// 关闭
def shutdown() {
try server.shutdown()
catch {
case _: Throwable =>
fatal("Halting Kafka.")
Exit.halt(1)
}
}
def setServerState(newState: Byte) {
server.brokerState.newState(newState)
}
def awaitShutdown(): Unit = server.awaitShutdown()
}
下面来看一下 KafkaServe r的 startup 方法,启动了很多东西,后面都会用到,代码中也加入了注释
def startup() {
try {
info("starting")
// 是否关闭
if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
// 是否已启动完成
if (startupComplete.get)
return
// 开始启动,并设置已启动变量
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
// 设置broker为启动状态
brokerState.newState(Starting)
/* start scheduler */
// 启动定时器
kafkaScheduler.startup()
/* setup zookeeper */
// 初始化zookeeper配置
zkUtils = initZk()
/* Get or create cluster_id */
// 在zookeeper上生成集群Id
_clusterId = getOrGenerateClusterId(zkUtils)
info(s"Cluster ID = $clusterId")
/* generate brokerId */
// 从配置文件获取brokerId
val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
config.brokerId = brokerId
// 日志上下文
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}]")
this.logIdent = logContext.logPrefix
/* create and configure metrics */
// 通过配置文件中的MetricsReporter的实现类来创建实例
val reporters =config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp,classOf[MetricsReporter],
Map[String, AnyRef](KafkaConfig.BrokerIdProp ->(config.brokerId.toString)).asJava)
// 默认监控会增加jmx
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
// 创建metric对象
metrics = new Metrics(metricConfig, reporters, time, true)
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
// 初始化配额管理服务,对于每个producer或者consumer,可以对他们produce或者consum的速度上限作出限制
quotaManagers = QuotaFactory.instantiate(config, metrics, time,threadNamePrefix.getOrElse(""))
// 增加监听器
notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
// 创建日志管理组件,创建时会检查log目录下是否有.kafka_cleanshutdown文件,如果没有的话,broker进入RecoveringFrom UncleanShutdown 状态
/* start log manager */
logManager = LogManager(config, initialOfflineDirs, zkUtils,brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
// 创建元数据管理组件
metadataCache = new MetadataCache(config.brokerId)
// 创建凭证ᨀ供者组件
credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
// 创建一个sockerServer组件,并启动。该组件启动后,就会开始接收请求
socketServer = new SocketServer(config, metrics, time,credentialProvider)
socketServer.startup(startupProcessors = false)
// 创建一个副本管理组件,并启动该组件
/* start replica manager */
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
// 创建kafka控制器,并启动。该控制器启动后broker会尝试去zk创建节点竞争成为controller
/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, time,metrics, threadNamePrefix)
kafkaController.startup()
// 创建一个集群管理组件
adminManager = new AdminManager(config, metrics, metadataCache,zkUtils)
// 创建群组协调器,并且启动
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, zkUtils,replicaManager, Time.SYSTEM)
groupCoordinator.startup()
// 启动事务协调器,带有单独的后台线程调度程序,用于事务到期和日志加载
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
transactionCoordinator = TransactionCoordinator(config,replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix ="transaction-log-manager-"), zkUtils, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()
// 构造授权器
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map {
authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
// 构造api组件,针对各个接口会处理不同的业务
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager,adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache,metrics, authorizer, quotaManagers,
brokerTopicStats, clusterId, time)
// 请求处理池
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,socketServer.requestChannel, apis, time,
config.numIoThreads)
Mx4jLoader.maybeLoad()
// 动态配置处理器的相关配置
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic-> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers,credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config,quotaManagers))
// 初始化动态配置管理器,并启动
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils,dynamicConfigHandlers)
dynamicConfigManager.startup()
// 通知监听者
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {
endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) else
endpoint
}
// kafka健康检查组件
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners,zkUtils, config.rack,
config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
// 记录一下恢复点
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)
// 修改broker状态
socketServer.startProcessors()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString,metrics)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
上次更新: 2023/08/12, 20:54:07