源码剖析-Topic创建流程
# 4.3 Kafka源码剖析之Topic创建流程
# 4.3.1 Topic创建
有两种创建方式:自动创建、手动创建。在server.properties中配置auto.create.topics.enable=true
时,kafka在发现该topic不存在的时候会按照默认配置自动创建topic,触发自动创建topic有以下两种情况:
- Producer向某个不存在的Topic写入消息
- Consumer从某个不存在的Topic读取消息
# 4.3.2 手动创建
当 auto.create.topics.enable=false 时,需要手动创建topic,否则消息会发送失败。手动创建topic的方式如下:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic kafka_test
--replication-factor: 副本数目
--partitions: 分区数据
--topic: topic名字
# 4.3.3 查看Topic入口
查看脚本文件 kafka-topics.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
最终还是调用的 TopicCommand 类:首先判断参数是否为空,并且create、list、alter、descibe、delete只允许存在一个,进行参数验证,创建 zookeeper 链接,如果参数中包含 create 则开始创建topic,其他情况类似。
def main(args: Array[String]): Unit = {
val opts = new TopicCommandOptions(args)
// 判断参数长度
if(args.length == 0)
CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete,describe, or change a topic.")
// create、list、alter、descibe、delete只允许存在一个
// should have exactly one action
val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt,opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
if(actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
// 参数验证
opts.checkArgs()
// 初始化zookeeper链接
val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
30000,
30000,
JaasUtils.isZkSecurityEnabled())
var exitCode = 0
try {
if(opts.options.has(opts.createOpt))
// 创建topic
createTopic(zkUtils, opts) else if(opts.options.has(opts.alterOpt))
// 修改topic
alterTopic(zkUtils, opts) else if(opts.options.has(opts.listOpt))
// 列出所有的topic,bin/kafka-topics.sh --list --zookeeper localhost:2181
listTopics(zkUtils, opts) else if(opts.options.has(opts.describeOpt))
// 查看topic᧿述,bin/kafka-topics.sh --describe --zookeeper localhost:2181
describeTopic(zkUtils, opts) else if(opts.options.has(opts.deleteOpt))
// 删除topic
deleteTopic(zkUtils, opts)
}
catch {
case e: Throwable =>
println("Error while executing topic command : " + e.getMessage)
error(Utils.stackTrace(e))
exitCode = 1
}
finally {
zkUtils.close()
Exit.exit(exitCode)
}
}
# 4.3.4 创建Topic
下面我们主要来看一下 createTopic 的执行过程:
def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
// 获取topic名称
val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
if (Topic.hasCollisionChars(topic))
println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
try {
//如果客户端指定了topic的partition的replicas分配情况,则直接把所有topic的元数据信息持久化写入到zk,
// topic的properties写入到/config/topics/{topic}目录,
// topic的PartitionAssignment写入到/brokers/topics/{topic}目录
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils,topic, assignment, configs, update = false)
} else {
// 否则需要自动生成topic的PartitionAssignment
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options,opts.partitionsOpt, opts.replicationFactorOpt)
// 分区
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
// 副本集
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
// 从0.10.x版本开始,kafka可以支持指定broker的机架信息,如果指定了机架信息则在副本分配时会尽可能地让分区的副本分不到不同的机架上。
// 指定机架信息是通过kafka的配置文件config/server.properties中的broker.rack参数来配置的
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled else RackAwareMode.Enforced
AdminUtils.createTopic(zkUtils, topic, partitions, replicas,configs, rackAwareMode)
}
println("Created topic \"%s\".".format(topic))
}
catch {
case e: TopicExistsException => if (!ifNotExists) throw e
}
}
如果客户端指定了topic的partition的replicas分配情况,则直接把所有topic的元数据信息持久化写入到zk,topic的properties写入到/config/topics/{topic}目录, topic的PartitionAssignment写入到/brokers/topics/{topic}目录
根据分区数量、副本集、是否指定机架来自动生成topic的分区数据
下面继续来看 AdminUtils.createTopic 方法
def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
// 获取集群中每个broker的brokerId和机架信息信息的列表,为下面的
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
// 根据是否禁用指定机架策略来生成分配策略
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions,replicationFactor)
// 在zookeeper中创建或更新主题分区分配路径
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils,topic, replicaAssignment, topicConfig)
}
- 下面继续来看 AdminUtils.assignReplicasToBrokers 方法
def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int,Seq[Int]] = {
if (nPartitions <= 0)
// 分区个数partitions不能小于等于0
throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
if (replicationFactor <= 0)
// 副本个数replicationFactor不能小于等于0
throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
if (replicationFactor > brokerMetadatas.size)
// 副本个数replicationFactor不能大于broker的节点个数
throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers:
${brokerMetadatas.size}.")
if (brokerMetadatas.forall(_.rack.isEmpty))
// 没有指定机架信息的情况
assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
startPartitionId) else {
// 针对指定机架信息的情况,更加复杂一点
if (brokerMetadatas.exists(_.rack.isEmpty))
throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
assignReplicasToBrokersRackAware(nPartitions,replicationFactor, brokerMetadatas, fixedStartIndex,
startPartitionId)
}
}
- 未指定机架策略
private def assignReplicasToBrokersRackUnaware(nPartitions:Int,replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId +startIndex) % brokerArray.length
val replicaBuffer =mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer +=brokerArray(replicaIndex(firstReplicaIndex,nextReplicaShift, j, brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
遍历每个分区partition然后从brokerArray(brokerId的列表)中选取replicationFactor个brokerId分配给这个partition.
创建一个可变的Map用来存放本方法将要返回的结果,即分区partition和分配副本的映射关系。由于fixedStartIndex为-1,所以startIndex是一个随机数,用来计算一个起始分配的brokerId,同时由于startPartitionId为-1,所以currentPartitionId的值为0,可见默认创建topic时总是从编号为0的分区依次轮询进行分配。
nextReplicaShift表示下一次副本分配相对于前一次分配的位移量,这个字面上理解有点绕,不如举个例子:假设集群中有3个broker节点,即代码中的brokerArray,创建某topic有3个副本和6个分区,那么首先从partitionId(partition的编号)为0的分区开始进行分配,假设第一次计算(由rand.nextInt(brokerArray.length)随机)到nextReplicaShift为1,第一次随机到的startIndex为2,那么partitionId为0的第一个副本的位置(这里指的是brokerArray的数组下标)firstReplicaIndex =(currentPartitionId + startIndex) % brokerArray.length = (0+2)%3 = 2
,第二个副本的位置为replicaIndex(firstReplicaIndex, nextReplicaShift, j,brokerArray.length) = replicaIndex(2, nextReplicaShift+1,0, 3)=?
。
继续计算replicaIndex(2, nextReplicaShift+1,0, 3) = replicaIndex(2, 2,0, 3)= (2+(1+(2+0)%(3-1)))%3=0
。继续计算下一个副本的位置replicaIndex(2, 2,1, 3) = (2+(1+(2+1)%(3-1)))%3=1
。所以partitionId为0的副本分配位置列表为[2,0,1],如果brokerArray正好是从0开始编号,也正好是顺序不间断的,即brokerArray为[0,1,2]的话,那么当前partitionId为0的副本分配策略为[2,0,1]。如果brokerId不是从零开始,也不是顺序的(有可能之前集群的其中broker几个下线了),最终的brokerArray为[2,5,8],那么partitionId为0的分区的副本分配策略为[8,2,5]。为了便于说明问题,可以简单的假设brokerArray就是[0,1,2]。
同样计算下一个分区,即partitionId为1的副本分配策略。此时nextReplicaShift还是为2,没有满足自增的条件。这个分区的firstReplicaIndex = (1+2)%3=0
。第二个副本的位置replicaIndex(0,2,0,3) = (0+(1+(2+0)%(3-1)))%3 = 1
,第三个副本的位置replicaIndex(0,2,1,3) = 2
,最终partitionId为2的分区分配策略为[0,1,2]
- 指定机架策略
private def assignReplicasToBrokersRackAware(nPartitions:
Int,
replicationFactor: Int,
brokerMetadatas: Seq[BrokerMetadata],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val brokerRackMap = brokerMetadatas.collect {
case BrokerMetadata(id, Some(rack)) =>
id -> rack
}
.toMap
val numRacks = brokerRackMap.values.toSet.size
val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
val numBrokers = arrangedBrokerList.size
val ret = mutable.Map[Int, Seq[Int]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId +startIndex) % arrangedBrokerList.size
val leader = arrangedBrokerList(firstReplicaIndex)
val replicaBuffer = mutable.ArrayBuffer(leader)
val racksWithReplicas =mutable.Set(brokerRackMap(leader))
val brokersWithReplicas = mutable.Set(leader)
var k = 0
for (_ <- 0 until replicationFactor - 1) {
var done = false
while (!done) {
val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex,nextReplicaShift * numRacks, k, arrangedBrokerList.size))
val rack = brokerRackMap(broker)
// Skip this broker if
// 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
// that do not have any replica, or
// 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
replicaBuffer += broker
racksWithReplicas += rack
brokersWithReplicas += broker
done = true
}
k += 1
}
}
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
assignReplicasToBrokersRackUnaware的执行前提是所有的broker都没有配置机架信息,而assignReplicasToBrokersRackAware的执行前提是所有的broker都配置了机架信息,如果出现部分broker配置了机架信息而另一部分没有配置的话,则会抛出AdminOperationException的异常,如果还想要顺利创建topic的话,此时需加上“--disable-rack-aware”
第一步获得brokerId和rack信息的映射关系列表brokerRackMap ,之后调用getRackAlternatedBrokerList()方法对brokerRackMap做进一步的处理生成一个brokerId的列表。举例:假设目前有3个机架rack1、rack2和rack3,以及9个broker,分别对应关系如下:
rack1: 0, 1, 2
rack2: 3, 4, 5
rack3: 6, 7, 8
那么经过getRackAlternatedBrokerList()方法处理过后就变成了[0, 3, 6, 1,4, 7, 2, 5, 8]这样一个列表,显而易见的这是轮询各个机架上的broker而产生的,之后你可以简单的将这个列表看成是brokerId的列表,对应assignReplicasToBrokersRackUnaware()方法中的brokerArray,但是其中包含了简单的机架分配信息。之后的步骤也和未指定机架信息的算法类似,同样包含startIndex、currentPartiionId, nextReplicaShift的概念,循环为每一个分区分配副本。分配副本时处理第一个副本之外,其余的也调用replicaIndex方法来获得一个broker,但是这里和assignReplicasToBrokersRackUnaware()不同的是,这里不是简单的将这个broker添加到当前分区的副本列表之中,还要经过一层的筛选,满足以下任意一个条件的broker不能被添加到当前分区的副本列表之中:
1. 如果此broker所在的机架中已经存在一个broker拥有该分区的副本,并且还有其他的机架中没有任何一个broker拥有该分区的副本。对应代码中的(!racksWithReplicas.contains(rack) ||
racksWithReplicas.size == numRacks)
2. 如果此broker中已经拥有该分区的副本,并且还有其他broker中没有该分区的副本。对应代码中的(!brokersWithReplicas.contains(broker) ||brokersWithReplicas.size == numBrokers))
- 无论是带机架信息的策略还是不带机架信息的策略,上层调用方法AdminUtils.assignReplicasToBrokers()最后都是获得一个[Int, Seq[Int]]类型的副本分配列表,其最后作为kafka zookeeper节点/brokers/topics/{topic-name}节点数据。至此kafka的topic创建就讲解完了,有些同学会感到很疑问,全文通篇(包括上一篇)都是在讲述如何分配副本,最后得到的也不过是个分配的方案,并没有真正创建这些副本的环节,其实这个观点没有任何问题,对于通过kafka提供的kafka-topics.sh脚本创建topic的方法来说,它只是提供一个副本的分配方案,并在kafka zookeeper中创建相应的节点而已。kafka broker的服务会注册监听/brokers/topics/目录下是否有节点变化,如果有新节点创建就会监听到,然后根据其节点中的数据(即topic的分区副本分配方案)来创建对应的副本。