kafka源码解析之十五客户端如何创建topic

时间:2022-11-03 21:59:32
主要存在两种方式:即自动创建还是手动创建。当配置auto.create.topics.enable=true时,则如果kafka发现该topic不存在的话,会按默认配置自动创建topic。当配置auto.create.topics.enable=false时,客户端需要提前创建好topic,才可以正确地往topic发送数据,创建topic的命令如下:

bin/kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor [副本数目]

--partitions [分区数据] --topic[topic名字]

查看kafka-topics.sh脚本:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@

最终执行的是TopicCommand类:

object TopicCommand {
def main(args: Array[String]): Unit = {
val opts = new TopicCommandOptions(args)
if(args.length == 0)
CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.")
// should have exactly one action
val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
if(actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
opts.checkArgs()
val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
try {
if(opts.options.has(opts.createOpt))//如果是create请求,则走这个分支,其它类似
createTopic(zkClient, opts)
else if(opts.options.has(opts.alterOpt))
alterTopic(zkClient, opts)
else if(opts.options.has(opts.listOpt))
listTopics(zkClient, opts)
else if(opts.options.has(opts.describeOpt))
describeTopic(zkClient, opts)
else if(opts.options.has(opts.deleteOpt))
deleteTopic(zkClient, opts)
} catch {
case e: Throwable =>
println("Error while executing topic command " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
zkClient.close()
}
}
……
}

继续往下追:

//创建topic
def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
if (opts.options.has(opts.replicaAssignmentOpt)) {//如果客户端指定了topic的partition的replicas分配情况,则直接把所有topic的元数据信息写入到zk,其中topic的properties写入到/config/topics/[topic]目录,topic的PartitionAssignment
写入到/brokers/topics/[topic]目录
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
} else {//否则需要自动生成topic的PartitionAssignment
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
}
println("Created topic \"%s\".".format(topic))
}

自动生成topic的PartitionAssignment如下:

def createTopic(zkClient: ZkClient,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties) {
val brokerList = ZkUtils.getSortedBrokerList(zkClient)//根据brokerId排序broker
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)//生成Replicas的分布配置信息 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)//写入zk
}
def assignReplicasToBrokers(brokerList: Seq[Int],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1)
: Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new AdminOperationException("number of partitions must be larger than 0")
if (replicationFactor <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
if (replicationFactor > brokerList.size)
throw new AdminOperationException("replication factor: " + replicationFactor +
" larger than available brokers: " + brokerList.size)
val ret = new mutable.HashMap[Int, List[Int]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)//决定第一个被分配的broker的相对位置
var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0//决定第一个被分配的partition的位置
//根据fixedStartIndex计算replica的起始位置
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
nextReplicaShift += 1//partition个数大于broker个数时,需要增加ReplicaShift
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
var replicaList = List(brokerList(firstReplicaIndex))//先决定currentPartition的第一个replica的位置
for (j <- 0 until replicationFactor - 1)//然后currentPartition的其它replica的位置位于第一个replica位置之后
replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
ret.put(currentPartitionId, replicaList.reverse)// 保存信息
currentPartitionId = currentPartitionId + 1//currentPartition递增
}
ret.toMap
}

其实关键是2点:

第一点:从随机位置开始,通过轮询方式分配每个分区的第一个replica的位置

第二点:分区剩余的replicas的位置紧跟着其第一个replica的位置

则假设通过以下命令在5个kafka集群里创建1个10个partition,副本数目为3的topic:

bin/kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 3

--partitions 10 --topic test,其replica的分配情况有可能是以下情况:

broker-0  broker-1  broker-2  broker-3  broker-4
p0        p1        p2        p3        p4       (1st replica)
p5        p6        p7        p8        p9       (1st replica)
p4        p0        p1        p2        p3       (2nd replica)
p8        p9        p5        p6        p7       (2nd replica)
p3        p4        p0        p1        p2       (3nd replica)
p7        p8        p9        p5        p6       (3nd replica)