
时间:2023-03-10 04:38:51


当我们在shell中执行topic删除命令的时候` kafka-topics --delete --topic xxxx --zookeeper xxx`,会显示,xxxx已经被标记为删除。然后过了很久你再查看topic列表,发现那个topic依然被标记删除,显然删除没有真正执行。下面就深入了解,kafka删除topic的流程。


delete.topic.enable,配置默认是false,意思是 是否允许kafka集群删除topic,只有为true的情况,kafka才会删除那些已经被标记为删除的topic。否则topic将不会被删除,仅仅被标记,所谓标记,也就是在zk上记录那些delete的topic。注意修改完后需要重启集群。


1. 删除zookeeper上topic的数据



2. 删除该topic所有partition和replica的数据



通过shell命令可以找到操作topic的类TopicCommand,在删除topic这块逻辑中,只做了3件事情,1.判断该topic是否存在;2.判断topic是否是kafka内部topic(不允许被删除); 3.在zk上创建一个节点(/admin/delete_toppics/xxx)来记录删除的topic。下面是详细代码

def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topics = getTopics(zkUtils, opts)
val ifExists = opts.options.has(opts.ifExistsOpt)
// topic不存在 直接抛出异常
if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
topics.foreach { topic =>
try {
// topic是kafka自己的,比如__offset之类的topic,那么不允许删除
if (Topic.isInternal(topic)) {
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
} else {
// 在zk上创建一个节点,标记该topic是需要被删除的
println("Topic %s is marked for deletion.".format(topic))
println("Note: This will have no impact if delete.topic.enable is not set to true.")
} catch {
case _: ZkNodeExistsException =>
println("Topic %s is already marked for deletion.".format(topic))
case e: AdminOperationException =>
throw e
case _: Throwable =>
throw new AdminOperationException("Error while deleting topic %s".format(topic))




 override def process(): Unit = {
if (!isActive) return
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
topicsToBeDeleted --= nonExistentTopics
if (config.deleteTopicEnable) {
if (topicsToBeDeleted.nonEmpty) {
info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
// mark topic ineligible for deletion if other state changes are in progress
topicsToBeDeleted.foreach { topic =>
val partitionReassignmentInProgress =
if (partitionReassignmentInProgress)
// add topic to deletion list
} else {
// If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
for (topic <- topicsToBeDeleted) {
info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")


* Invoked with the list of topics to be deleted
* It invokes onPartitionDeletion for all partitions of a topic.
* The updateMetadataRequest is also going to set the leader for the topics being deleted to
* {@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be
* removed from their caches.
private def onTopicDeletion(topics: Set[String]) {
info("Topic deletion callback for %s".format(topics.mkString(",")))
// send update metadata so that brokers stop serving data for topics to be deleted
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
topics.foreach { topic =>
} /**
* Invoked by onTopicDeletion with the list of partitions for topics to be deleted
* It does the following -
* 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) for partitions that are being
* deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException
* 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas
* and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state,
* it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1
* 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And
* will delete all persistent data from all replicas of the respective partitions
private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
} /**
* nvoked by onPartitionDeletion. It is the 2nd step of topic deletion, the first being sending
* UpdateMetadata requests to all brokers to start rejecting requests for deleted topics. As part of starting deletion,
* the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic
* is never retried. A topic is removed from the in progress list when
* 1. Either the topic is successfully deleted OR
* 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state
* If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
* As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends
* the replicas a StopReplicaRequest (delete=true)
* This method does the following things -
* 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
* for deletion if some replicas are dead since it won't complete successfully anyway
* 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
*@param replicasForTopicsToBeDeleted
private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
// move dead replicas directly to failed state
replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
// send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) =>
eventManager.put(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build)
if (deadReplicasForTopic.nonEmpty) {
debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))



  private def completeDeleteTopic(topic: String) {
// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
val zkUtils = controllerContext.zkUtils
zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))




Kafka 源码