源码剖析-Broker启动流程
# 4.2 Kafka源码剖析之Broker启动流程
# 4.2.1 启动kafka
命令如下:
kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
1
kafka-server-start.sh内容如下:
if [ $# -lt 1 ]; then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*) ;;
esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 4.2.2 查看Kafka.Kafka源码
def main(args: Array[String]): Unit = {
try {
// 读取启动配置
val serverProps = getPropsFromArgs(args)
// 封装KafkaServer
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c)
registerLoggingSignalHandler()
// attach shutdown handler to catch terminating signals as well as normal termination
// 增加回调监听
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
}
)
// 启动服务
kafkaServerStartable.startup()
// 等待
kafkaServerStartable.awaitShutdown()
}
catch {
case e: Throwable =>
fatal(e)
Exit.exit(1)
}
Exit.exit(0)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
上面的 kafkaServerStartabl 封装了 KafkaServer ,最终执行 startup 的是KafkaServer
class KafkaServerStartable(val serverConfig: KafkaConfig, reporters:Seq[KafkaMetricsReporter]) extends Logging {
private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
// 启动
def startup() {
try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
// 关闭
def shutdown() {
try server.shutdown()
catch {
case _: Throwable =>
fatal("Halting Kafka.")
Exit.halt(1)
}
}
def setServerState(newState: Byte) {
server.brokerState.newState(newState)
}
def awaitShutdown(): Unit = server.awaitShutdown()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
下面来看一下 KafkaServe r的 startup 方法,启动了很多东西,后面都会用到,代码中也加入了注释
def startup() {
try {
info("starting")
// 是否关闭
if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
// 是否已启动完成
if (startupComplete.get)
return
// 开始启动,并设置已启动变量
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
// 设置broker为启动状态
brokerState.newState(Starting)
/* start scheduler */
// 启动定时器
kafkaScheduler.startup()
/* setup zookeeper */
// 初始化zookeeper配置
zkUtils = initZk()
/* Get or create cluster_id */
// 在zookeeper上生成集群Id
_clusterId = getOrGenerateClusterId(zkUtils)
info(s"Cluster ID = $clusterId")
/* generate brokerId */
// 从配置文件获取brokerId
val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
config.brokerId = brokerId
// 日志上下文
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}]")
this.logIdent = logContext.logPrefix
/* create and configure metrics */
// 通过配置文件中的MetricsReporter的实现类来创建实例
val reporters =config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp,classOf[MetricsReporter],
Map[String, AnyRef](KafkaConfig.BrokerIdProp ->(config.brokerId.toString)).asJava)
// 默认监控会增加jmx
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
// 创建metric对象
metrics = new Metrics(metricConfig, reporters, time, true)
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
// 初始化配额管理服务,对于每个producer或者consumer,可以对他们produce或者consum的速度上限作出限制
quotaManagers = QuotaFactory.instantiate(config, metrics, time,threadNamePrefix.getOrElse(""))
// 增加监听器
notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
// 创建日志管理组件,创建时会检查log目录下是否有.kafka_cleanshutdown文件,如果没有的话,broker进入RecoveringFrom UncleanShutdown 状态
/* start log manager */
logManager = LogManager(config, initialOfflineDirs, zkUtils,brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
// 创建元数据管理组件
metadataCache = new MetadataCache(config.brokerId)
// 创建凭证ᨀ供者组件
credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
// 创建一个sockerServer组件,并启动。该组件启动后,就会开始接收请求
socketServer = new SocketServer(config, metrics, time,credentialProvider)
socketServer.startup(startupProcessors = false)
// 创建一个副本管理组件,并启动该组件
/* start replica manager */
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
// 创建kafka控制器,并启动。该控制器启动后broker会尝试去zk创建节点竞争成为controller
/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, time,metrics, threadNamePrefix)
kafkaController.startup()
// 创建一个集群管理组件
adminManager = new AdminManager(config, metrics, metadataCache,zkUtils)
// 创建群组协调器,并且启动
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, zkUtils,replicaManager, Time.SYSTEM)
groupCoordinator.startup()
// 启动事务协调器,带有单独的后台线程调度程序,用于事务到期和日志加载
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
transactionCoordinator = TransactionCoordinator(config,replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix ="transaction-log-manager-"), zkUtils, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()
// 构造授权器
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map {
authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
// 构造api组件,针对各个接口会处理不同的业务
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager,adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache,metrics, authorizer, quotaManagers,
brokerTopicStats, clusterId, time)
// 请求处理池
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,socketServer.requestChannel, apis, time,
config.numIoThreads)
Mx4jLoader.maybeLoad()
// 动态配置处理器的相关配置
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic-> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers,credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config,quotaManagers))
// 初始化动态配置管理器,并启动
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils,dynamicConfigHandlers)
dynamicConfigManager.startup()
// 通知监听者
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {
endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) else
endpoint
}
// kafka健康检查组件
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners,zkUtils, config.rack,
config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
// 记录一下恢复点
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)
// 修改broker状态
socketServer.startProcessors()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString,metrics)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
上次更新: 2025/04/03, 11:07:08