跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
          • 4.2.1 启动kafka
          • 4.2.2 查看Kafka.Kafka源码
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31
目录

源码剖析-Broker启动流程

# 4.2 Kafka源码剖析之Broker启动流程

# 4.2.1 启动kafka

命令如下:

 kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties 
1

kafka-server-start.sh内容如下:

if [ $# -lt 1 ]; then
	echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
	exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
	export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
	export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
-daemon)
	EXTRA_ARGS="-daemon "$EXTRA_ARGS
	shift
	;;
*) ;;

esac

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 4.2.2 查看Kafka.Kafka源码

def main(args: Array[String]): Unit = {
   try {
   	// 读取启动配置
   	val serverProps = getPropsFromArgs(args)
   	// 封装KafkaServer
   	val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
   	// register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c)
   	registerLoggingSignalHandler()
   	// attach shutdown handler to catch terminating signals as well as normal termination
   	// 增加回调监听
   	Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
   		override def run(): Unit = kafkaServerStartable.shutdown()
   	}
   	)
   	// 启动服务
   	kafkaServerStartable.startup()
   	// 等待
   	kafkaServerStartable.awaitShutdown()
   }
   catch {
   	case e: Throwable =>
   	fatal(e)
   	Exit.exit(1)
   }
   Exit.exit(0)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

上面的 kafkaServerStartabl 封装了 KafkaServer ,最终执行 startup 的是KafkaServer

class KafkaServerStartable(val serverConfig: KafkaConfig, reporters:Seq[KafkaMetricsReporter]) extends Logging {
   private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)
   def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
   // 启动
   def startup() {
   	try server.startup()
   	catch {
   		case _: Throwable =>
   		// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
   		fatal("Exiting Kafka.")
   		Exit.exit(1)
   	}
   }
   // 关闭
   def shutdown() {
   	try server.shutdown()
   	catch {
   		case _: Throwable =>
   		fatal("Halting Kafka.")
   		Exit.halt(1)
   	}
   }
   def setServerState(newState: Byte) {
   	server.brokerState.newState(newState)
   }
   def awaitShutdown(): Unit = server.awaitShutdown()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

下面来看一下 KafkaServe r的 startup 方法,启动了很多东西,后面都会用到,代码中也加入了注释

def startup() {
   try {
   	info("starting")
   	// 是否关闭
   	if (isShuttingDown.get)
   	throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
   	// 是否已启动完成
   	if (startupComplete.get)
   	return
   	// 开始启动,并设置已启动变量
   	val canStartup = isStartingUp.compareAndSet(false, true)
   	if (canStartup) {
   		// 设置broker为启动状态
   		brokerState.newState(Starting)
   		/* start scheduler */
   		// 启动定时器
   		kafkaScheduler.startup()
   		/* setup zookeeper */
   		// 初始化zookeeper配置
   		zkUtils = initZk()
   		/* Get or create cluster_id */
   		// 在zookeeper上生成集群Id
   		_clusterId = getOrGenerateClusterId(zkUtils)
   		info(s"Cluster ID = $clusterId")
   		/* generate brokerId */
   		// 从配置文件获取brokerId
   		val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
   		config.brokerId = brokerId
   		// 日志上下文
   		logContext = new LogContext(s"[KafkaServer id=${config.brokerId}]")
   		this.logIdent = logContext.logPrefix
   		/* create and configure metrics */
   		// 通过配置文件中的MetricsReporter的实现类来创建实例
   		val reporters =config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp,classOf[MetricsReporter],
   		Map[String, AnyRef](KafkaConfig.BrokerIdProp ->(config.brokerId.toString)).asJava)
   		// 默认监控会增加jmx
   		reporters.add(new JmxReporter(jmxPrefix))
   		val metricConfig = KafkaServer.metricConfig(config)
   		// 创建metric对象
   		metrics = new Metrics(metricConfig, reporters, time, true)
   		/* register broker metrics */
   		_brokerTopicStats = new BrokerTopicStats
   		// 初始化配额管理服务,对于每个producer或者consumer,可以对他们produce或者consum的速度上限作出限制
   		quotaManagers = QuotaFactory.instantiate(config, metrics, time,threadNamePrefix.getOrElse(""))
   		// 增加监听器
   		notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
   		logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
   		// 创建日志管理组件,创建时会检查log目录下是否有.kafka_cleanshutdown文件,如果没有的话,broker进入RecoveringFrom UncleanShutdown 状态
   		/* start log manager */
   		logManager = LogManager(config, initialOfflineDirs, zkUtils,brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
   		logManager.startup()
   		// 创建元数据管理组件
   		metadataCache = new MetadataCache(config.brokerId)
   		// 创建凭证ᨀ供者组件
   		credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
   		// Create and start the socket server acceptor threads so that the bound port is known.
   		// Delay starting processors until the end of the initialization sequence to ensure
   		// that credentials have been loaded before processing authentications.
   		// 创建一个sockerServer组件,并启动。该组件启动后,就会开始接收请求
   		socketServer = new SocketServer(config, metrics, time,credentialProvider)
   		socketServer.startup(startupProcessors = false)
   		// 创建一个副本管理组件,并启动该组件
   		/* start replica manager */
   		replicaManager = createReplicaManager(isShuttingDown)
   		replicaManager.startup()
   		// 创建kafka控制器,并启动。该控制器启动后broker会尝试去zk创建节点竞争成为controller
   		/* start kafka controller */
   		kafkaController = new KafkaController(config, zkUtils, time,metrics, threadNamePrefix)
   		kafkaController.startup()
   		// 创建一个集群管理组件
   		adminManager = new AdminManager(config, metrics, metadataCache,zkUtils)
   		// 创建群组协调器,并且启动
   		/* start group coordinator */
   		// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
   		groupCoordinator = GroupCoordinator(config, zkUtils,replicaManager, Time.SYSTEM)
   		groupCoordinator.startup()
   		// 启动事务协调器,带有单独的后台线程调度程序,用于事务到期和日志加载
   		/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
   		// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
   		transactionCoordinator = TransactionCoordinator(config,replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix ="transaction-log-manager-"), zkUtils, metrics, metadataCache, Time.SYSTEM)
   		transactionCoordinator.startup()
   		// 构造授权器
   		/* Get the authorizer and initialize it if one is specified.*/
   		authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map {
   			authorizerClassName =>
   			val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
   			authZ.configure(config.originals())
   			authZ
   		}
   		// 构造api组件,针对各个接口会处理不同的业务
   		/* start processing requests */
   		apis = new KafkaApis(socketServer.requestChannel, replicaManager,adminManager, groupCoordinator, transactionCoordinator,
   		kafkaController, zkUtils, config.brokerId, config, metadataCache,metrics, authorizer, quotaManagers,
   		brokerTopicStats, clusterId, time)
   		// 请求处理池
   		requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,socketServer.requestChannel, apis, time,
   		config.numIoThreads)
   		Mx4jLoader.maybeLoad()
   		// 动态配置处理器的相关配置
   		/* start dynamic config manager */
   		dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic-> new TopicConfigHandler(logManager, config, quotaManagers),
   		ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
   		ConfigType.User -> new UserConfigHandler(quotaManagers,credentialProvider),
   		ConfigType.Broker -> new BrokerConfigHandler(config,quotaManagers))
   		// 初始化动态配置管理器,并启动
   		// Create the config manager. start listening to notifications
   		dynamicConfigManager = new DynamicConfigManager(zkUtils,dynamicConfigHandlers)
   		dynamicConfigManager.startup()
   		// 通知监听者
   		/* tell everyone we are alive */
   		val listeners = config.advertisedListeners.map {
   			endpoint =>
   			if (endpoint.port == 0)
   			endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) else
   			endpoint
   		}
   		// kafka健康检查组件
   		kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners,zkUtils, config.rack,
   		config.interBrokerProtocolVersion)
   		kafkaHealthcheck.startup()
   		// 记录一下恢复点
   		// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
   		checkpointBrokerId(config.brokerId)
   		// 修改broker状态
   		socketServer.startProcessors()
   		brokerState.newState(RunningAsBroker)
   		shutdownLatch = new CountDownLatch(1)
   		startupComplete.set(true)
   		isStartingUp.set(false)
   		AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString,metrics)
   		info("started")
   	}
   }
   catch {
   	case e: Throwable =>
   	fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
   	isStartingUp.set(false)
   	shutdown()
   	throw e
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
上次更新: 2025/04/03, 11:07:08
Kafka源码剖析
源码剖析-Topic创建流程

← Kafka源码剖析 源码剖析-Topic创建流程→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式