1、列出集群中的topic
bin/kafka-topics.sh --zookeeper spark1:2181,spark2:2181,spark3:2181 --list
2、创建topic
replication-factor为副本因子数量, partitions为分区数量
bin/kafka-topics.sh --zookeeper spark1:,spark2:,spark3: --create --topic weblogs --replication-factor --partitions
注意:kafka-logs目录会产生topic的消息文件,为什么要生产此消息文件呢?就是当一台机器挂了后,其他机器会基于本机的此topic信息继续对外提供服务。
3、创建一个生产者
bin/kafka-console-producer.sh --broker-list spark1:,spark2:,spark3: --topic weblogs
4、创建一个消费者
from-beginning表示从开始位置开始消费
bin/kafka-console-consumer.sh --zookeeper spark1:,spark2:,spark3: --topic weblogs --from-beginning
5、删除topic
1、命令删除
Kafka 删除topic的命令是:
bin/kafka-topics.sh --delete --zookeeper spark1:2181,spark2:2181,spark3:2181 --topic weblogs
如果kafaka启动时加载的配置文件中server.properties没有配置"delete.topic.enable=true",那么此时的删除并不是真正的删除,而是把topic标记为删除:marked for deletion
被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/topic名称,如果你删除了此处的topic,那么marked for deletion 标记消失
zookeeper的config中也有有关topic的配置信息
ls /config/topics/topic名称
2、手动删除
1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
2、
(1)登录zookeeper客户端的命令:
zookeeper/bin/zkCli.sh
(2)找到topic所在的目录:
ls /brokers/topics
(3)找到要删除的topic,执行如下命令即可,此时topic被彻底删除:
rmr /brokers/topics/topic名称