跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
          • 4.3.1 Topic创建
          • 4.3.2 手动创建
          • 4.3.3 查看Topic入口
          • 4.3.4 创建Topic
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31
目录

源码剖析-Topic创建流程

# 4.3 Kafka源码剖析之Topic创建流程

# 4.3.1 Topic创建

有两种创建方式:自动创建、手动创建。在server.properties中配置auto.create.topics.enable=true 时,kafka在发现该topic不存在的时候会按照默认配置自动创建topic,触发自动创建topic有以下两种情况:

  1. Producer向某个不存在的Topic写入消息
  2. Consumer从某个不存在的Topic读取消息

# 4.3.2 手动创建

当 auto.create.topics.enable=false 时,需要手动创建topic,否则消息会发送失败。手动创建topic的方式如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic kafka_test
1

--replication-factor: 副本数目

--partitions: 分区数据

--topic: topic名字

# 4.3.3 查看Topic入口

查看脚本文件 kafka-topics.sh

 exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
1

最终还是调用的 TopicCommand 类:首先判断参数是否为空,并且create、list、alter、descibe、delete只允许存在一个,进行参数验证,创建 zookeeper 链接,如果参数中包含 create 则开始创建topic,其他情况类似。

def main(args: Array[String]): Unit = {
   val opts = new TopicCommandOptions(args)
   // 判断参数长度
   if(args.length == 0)
   CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete,describe, or change a topic.")
   // create、list、alter、descibe、delete只允许存在一个
   // should have exactly one action
   val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt,opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
   if(actions != 1)
   CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
   // 参数验证
   opts.checkArgs()
   // 初始化zookeeper链接
   val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
   30000,
   30000,
   JaasUtils.isZkSecurityEnabled())
   var exitCode = 0
   try {
   	if(opts.options.has(opts.createOpt))
   	// 创建topic
   	createTopic(zkUtils, opts) else if(opts.options.has(opts.alterOpt))
   	// 修改topic
   	alterTopic(zkUtils, opts) else if(opts.options.has(opts.listOpt))
   	// 列出所有的topic,bin/kafka-topics.sh --list --zookeeper localhost:2181
   	listTopics(zkUtils, opts) else if(opts.options.has(opts.describeOpt))
   	// 查看topic᧿述,bin/kafka-topics.sh --describe --zookeeper localhost:2181
   	describeTopic(zkUtils, opts) else if(opts.options.has(opts.deleteOpt))
   	// 删除topic
   	deleteTopic(zkUtils, opts)
   }
   catch {
   	case e: Throwable =>
   	println("Error while executing topic command : " + e.getMessage)
   	error(Utils.stackTrace(e))
   	exitCode = 1
   }
   finally {
   	zkUtils.close()
   	Exit.exit(exitCode)
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 4.3.4 创建Topic

下面我们主要来看一下 createTopic 的执行过程:

def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
   // 获取topic名称
   val topic = opts.options.valueOf(opts.topicOpt)
   val configs = parseTopicConfigsToBeAdded(opts)
   val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
   if (Topic.hasCollisionChars(topic))
   println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
   try {
   	//如果客户端指定了topic的partition的replicas分配情况,则直接把所有topic的元数据信息持久化写入到zk,
   	// topic的properties写入到/config/topics/{topic}目录,
   	// topic的PartitionAssignment写入到/brokers/topics/{topic}目录
   	if (opts.options.has(opts.replicaAssignmentOpt)) {
   		val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
   		AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils,topic, assignment, configs, update = false)
   	} else {
   		// 否则需要自动生成topic的PartitionAssignment
   		CommandLineUtils.checkRequiredArgs(opts.parser, opts.options,opts.partitionsOpt, opts.replicationFactorOpt)
   		// 分区
   		val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
   		// 副本集
   		val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
   		// 从0.10.x版本开始,kafka可以支持指定broker的机架信息,如果指定了机架信息则在副本分配时会尽可能地让分区的副本分不到不同的机架上。
   		// 指定机架信息是通过kafka的配置文件config/server.properties中的broker.rack参数来配置的
   		val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled else RackAwareMode.Enforced
   		AdminUtils.createTopic(zkUtils, topic, partitions, replicas,configs, rackAwareMode)
   	}
   	println("Created topic \"%s\".".format(topic))
   }
   catch {
   	case e: TopicExistsException => if (!ifNotExists) throw e
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  1. 如果客户端指定了topic的partition的replicas分配情况,则直接把所有topic的元数据信息持久化写入到zk,topic的properties写入到/config/topics/{topic}目录, topic的PartitionAssignment写入到/brokers/topics/{topic}目录

  2. 根据分区数量、副本集、是否指定机架来自动生成topic的分区数据

  3. 下面继续来看 AdminUtils.createTopic 方法

def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
   // 获取集群中每个broker的brokerId和机架信息信息的列表,为下面的
   val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
   // 根据是否禁用指定机架策略来生成分配策略
   val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions,replicationFactor)
   // 在zookeeper中创建或更新主题分区分配路径
   AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils,topic, replicaAssignment, topicConfig)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
  1. 下面继续来看 AdminUtils.assignReplicasToBrokers 方法
def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int,Seq[Int]] = {
   if (nPartitions <= 0)
   // 分区个数partitions不能小于等于0
   throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
   if (replicationFactor <= 0)
   // 副本个数replicationFactor不能小于等于0
   throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
   if (replicationFactor > brokerMetadatas.size)
   // 副本个数replicationFactor不能大于broker的节点个数
   throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers:
${brokerMetadatas.size}.")
   if (brokerMetadatas.forall(_.rack.isEmpty))
   // 没有指定机架信息的情况
   assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
   startPartitionId) else {
   	// 针对指定机架信息的情况,更加复杂一点
   	if (brokerMetadatas.exists(_.rack.isEmpty))
   	throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
   	assignReplicasToBrokersRackAware(nPartitions,replicationFactor, brokerMetadatas, fixedStartIndex,
   	startPartitionId)
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  • 未指定机架策略
private def assignReplicasToBrokersRackUnaware(nPartitions:Int,replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
   val ret = mutable.Map[Int, Seq[Int]]()
   val brokerArray = brokerList.toArray
   val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
   var currentPartitionId = math.max(0, startPartitionId)
   var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
   for (_ <- 0 until nPartitions) {
   	if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
   	nextReplicaShift += 1
   	val firstReplicaIndex = (currentPartitionId +startIndex) % brokerArray.length
   	val replicaBuffer =mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
   	for (j <- 0 until replicationFactor - 1)
   	replicaBuffer +=brokerArray(replicaIndex(firstReplicaIndex,nextReplicaShift, j, brokerArray.length))
   	ret.put(currentPartitionId, replicaBuffer)
   	currentPartitionId += 1
   }
   ret
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

遍历每个分区partition然后从brokerArray(brokerId的列表)中选取replicationFactor个brokerId分配给这个partition.

创建一个可变的Map用来存放本方法将要返回的结果,即分区partition和分配副本的映射关系。由于fixedStartIndex为-1,所以startIndex是一个随机数,用来计算一个起始分配的brokerId,同时由于startPartitionId为-1,所以currentPartitionId的值为0,可见默认创建topic时总是从编号为0的分区依次轮询进行分配。

nextReplicaShift表示下一次副本分配相对于前一次分配的位移量,这个字面上理解有点绕,不如举个例子:假设集群中有3个broker节点,即代码中的brokerArray,创建某topic有3个副本和6个分区,那么首先从partitionId(partition的编号)为0的分区开始进行分配,假设第一次计算(由rand.nextInt(brokerArray.length)随机)到nextReplicaShift为1,第一次随机到的startIndex为2,那么partitionId为0的第一个副本的位置(这里指的是brokerArray的数组下标)firstReplicaIndex =(currentPartitionId + startIndex) % brokerArray.length = (0+2)%3 = 2,第二个副本的位置为replicaIndex(firstReplicaIndex, nextReplicaShift, j,brokerArray.length) = replicaIndex(2, nextReplicaShift+1,0, 3)=?。

继续计算replicaIndex(2, nextReplicaShift+1,0, 3) = replicaIndex(2, 2,0, 3)= (2+(1+(2+0)%(3-1)))%3=0。继续计算下一个副本的位置replicaIndex(2, 2,1, 3) = (2+(1+(2+1)%(3-1)))%3=1。所以partitionId为0的副本分配位置列表为[2,0,1],如果brokerArray正好是从0开始编号,也正好是顺序不间断的,即brokerArray为[0,1,2]的话,那么当前partitionId为0的副本分配策略为[2,0,1]。如果brokerId不是从零开始,也不是顺序的(有可能之前集群的其中broker几个下线了),最终的brokerArray为[2,5,8],那么partitionId为0的分区的副本分配策略为[8,2,5]。为了便于说明问题,可以简单的假设brokerArray就是[0,1,2]。

同样计算下一个分区,即partitionId为1的副本分配策略。此时nextReplicaShift还是为2,没有满足自增的条件。这个分区的firstReplicaIndex = (1+2)%3=0。第二个副本的位置replicaIndex(0,2,0,3) = (0+(1+(2+0)%(3-1)))%3 = 1,第三个副本的位置replicaIndex(0,2,1,3) = 2,最终partitionId为2的分区分配策略为[0,1,2]

  • 指定机架策略
private def assignReplicasToBrokersRackAware(nPartitions:
Int,
replicationFactor: Int,
brokerMetadatas: Seq[BrokerMetadata],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
   val brokerRackMap = brokerMetadatas.collect {
   	case BrokerMetadata(id, Some(rack)) =>
   	id -> rack
   }
   .toMap
   val numRacks = brokerRackMap.values.toSet.size
   val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
   val numBrokers = arrangedBrokerList.size
   val ret = mutable.Map[Int, Seq[Int]]()
   val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
   var currentPartitionId = math.max(0, startPartitionId)
   var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
   for (_ <- 0 until nPartitions) {
   	if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
   	nextReplicaShift += 1
   	val firstReplicaIndex = (currentPartitionId +startIndex) % arrangedBrokerList.size
   	val leader = arrangedBrokerList(firstReplicaIndex)
   	val replicaBuffer = mutable.ArrayBuffer(leader)
   	val racksWithReplicas =mutable.Set(brokerRackMap(leader))
   	val brokersWithReplicas = mutable.Set(leader)
   	var k = 0
   	for (_ <- 0 until replicationFactor - 1) {
   		var done = false
   		while (!done) {
   			val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex,nextReplicaShift * numRacks, k, arrangedBrokerList.size))
   			val rack = brokerRackMap(broker)
   			// Skip this broker if
   			// 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
   			// that do not have any replica, or
   			// 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
   			if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
   			&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
   				replicaBuffer += broker
   				racksWithReplicas += rack
   				brokersWithReplicas += broker
   				done = true
   			}
   			k += 1
   		}
   	}
   	ret.put(currentPartitionId, replicaBuffer)
   	currentPartitionId += 1
   }
   ret
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
  • assignReplicasToBrokersRackUnaware的执行前提是所有的broker都没有配置机架信息,而assignReplicasToBrokersRackAware的执行前提是所有的broker都配置了机架信息,如果出现部分broker配置了机架信息而另一部分没有配置的话,则会抛出AdminOperationException的异常,如果还想要顺利创建topic的话,此时需加上“--disable-rack-aware”

  • 第一步获得brokerId和rack信息的映射关系列表brokerRackMap ,之后调用getRackAlternatedBrokerList()方法对brokerRackMap做进一步的处理生成一个brokerId的列表。举例:假设目前有3个机架rack1、rack2和rack3,以及9个broker,分别对应关系如下:

rack1: 0, 1, 2
rack2: 3, 4, 5
rack3: 6, 7, 8
1
2
3

那么经过getRackAlternatedBrokerList()方法处理过后就变成了[0, 3, 6, 1,4, 7, 2, 5, 8]这样一个列表,显而易见的这是轮询各个机架上的broker而产生的,之后你可以简单的将这个列表看成是brokerId的列表,对应assignReplicasToBrokersRackUnaware()方法中的brokerArray,但是其中包含了简单的机架分配信息。之后的步骤也和未指定机架信息的算法类似,同样包含startIndex、currentPartiionId, nextReplicaShift的概念,循环为每一个分区分配副本。分配副本时处理第一个副本之外,其余的也调用replicaIndex方法来获得一个broker,但是这里和assignReplicasToBrokersRackUnaware()不同的是,这里不是简单的将这个broker添加到当前分区的副本列表之中,还要经过一层的筛选,满足以下任意一个条件的broker不能被添加到当前分区的副本列表之中:

1. 如果此broker所在的机架中已经存在一个broker拥有该分区的副本,并且还有其他的机架中没有任何一个broker拥有该分区的副本。对应代码中的(!racksWithReplicas.contains(rack) ||
racksWithReplicas.size == numRacks)

2. 如果此broker中已经拥有该分区的副本,并且还有其他broker中没有该分区的副本。对应代码中的(!brokersWithReplicas.contains(broker) ||brokersWithReplicas.size == numBrokers))
1
2
3
4
  1. 无论是带机架信息的策略还是不带机架信息的策略,上层调用方法AdminUtils.assignReplicasToBrokers()最后都是获得一个[Int, Seq[Int]]类型的副本分配列表,其最后作为kafka zookeeper节点/brokers/topics/{topic-name}节点数据。至此kafka的topic创建就讲解完了,有些同学会感到很疑问,全文通篇(包括上一篇)都是在讲述如何分配副本,最后得到的也不过是个分配的方案,并没有真正创建这些副本的环节,其实这个观点没有任何问题,对于通过kafka提供的kafka-topics.sh脚本创建topic的方法来说,它只是提供一个副本的分配方案,并在kafka zookeeper中创建相应的节点而已。kafka broker的服务会注册监听/brokers/topics/目录下是否有节点变化,如果有新节点创建就会监听到,然后根据其节点中的数据(即topic的分区副本分配方案)来创建对应的副本。
上次更新: 2025/04/03, 11:07:08
源码剖析-Broker启动流程
源码剖析-Producer生产者流程

← 源码剖析-Broker启动流程 源码剖析-Producer生产者流程→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式