使用kafka-topic.sh工具可以执行大部分操作 创建/修改/删除/查看集群里的主题。要使用全部功能,需要通过--zookeeper参数提供zookeerper连接字符串
创建主题:
创建主题需要3个参数: 主题名字 复制系数 分区数量
格式: kafka-topic.sh --zookeeper <zookeeper connect> --create --topic <string> --replication-factor <integer> --partitions <integer>
如果不需要基于机架信息的分配策略,使用参数--disable-rack-aware
增加主题分区的数量至16:
kafka-topic.sh --zookeeper <zookeeper connect> --alter --topic my-topic --partition 16
减少主题分区数量: 会导致消息乱序,只能删除分区数量,重新创建
删除主题:
配置参数 delete.topic.enable=true
kafka-topic.sh --zookeeper <zookeeper connect> --delete --topic my-topic
列出集群所有主题
kafka-topic.sh --zookeeper <zookeeper connect> --list
列出主题详细信息
列出集群所有主题详细信息
kafka-topic.sh --zookeeper <zookeeper connect> -describe
找出所有包含覆盖配置的主题 --topic-with-overrides
列出所有包含不同步副本的分区 --under-replicated-partitions
kafka-topic.sh --zookeeper <zookeeper connect> --describe --under-replicated-partitions
列出所有没有首领的分区 --unavailable-partitions
列出新版本的消费者群组
Kafka-consumer-groups.sh --new-consumer --bootstrap-server <kafka集群主机:port/kafka-cluster> --list
获取旧版本消费者群组testgroup详细信息
kafka-consumer-group.sh --zookeeper <zookeeper connect> --describe --group testgroup
删除消费者群组
kafka-consumer-groups.sh --zookeeper <zookeeper connect> --delete --group testgroup
删除消费者群组testgroup中my-topic 主题的偏移量
kafka-consumer-groups.sh --zookeeper <zookeeper connect> --delete --group testgroup --topic my-topic
导出群组testgroup的偏移量到offsets文件
kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect <zookeeper connect> --group testgroup --output-file offsets
导入偏移量:
先关闭消费者
kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect <zookeeper connect> --inpiut-file offsets
更改主题配置的命令格式:
kafka-configs.sh --zookeeper <zookeeper connect> --alter --entity-type topics --entity-name <topic name > -add-config <key>=<value>[,<key>=<value>...]
将主题my-topic 消息保留时间设置为1小时
kafka-confihs.sh --zookeeper <zookeeper connect> --alter --entity-type topic --entity-name my-topic -add-config retention.ms=3600000
更改客户端配置命令格式:
kafka-configs.sh --zookeeper <zookeeper connect> --alter --entity-type clients --entity-name <client ID> -add-config <key>=<value>....
列出主题my-topic 所有被覆盖的配置:
kafka-configs.sh --zookeeper <zookeeper connect> --describe --entity-type topics --entity-name my-topic
删除主题my-topic的retention.ms覆盖配置
kafka-config.sh --zookeeper <zookeeper connect> --alter --entity-type topics --entity-name my-topic --delete-config retention.ms
在一个包含1主题和8个分区集群里启动首选的副本选举
kafka-preferred-replica-election.sh --zookeeper <zookeeper connect>
通过partitions.json 文件里指定分区清单来启动副本的选举
kafka-prefered-replica-election.sh --zookeeper <zookeeper connect> --path-to-json-file partitions.json
修改分区副本:
为topic.json文件里的主题生成迁移步骤,以便将这些主题迁移至broker0 和 broker1上
kafka-reassign-partitions.sh --zookeeper <zookeeper connect> --generate --topics-to-move-json-file topics.json --broker-list 0,1
使用reassign.json 来执行建议的分区分配方案:
kafka-reassign-partitions.sh --zookeeper <zookeeper connect> --execute --reassignment-json-file reassign.json
验证reassign.json文件里指定的分区重分配情况:
kafka-reassign-partitions.sh --zookeeper <zookeeper connect> --verify --reassignment-json-file reassign.json
解码日志片段000052368601.log ,显示消息的概要信息
kafka-run-class.sh kafka.tools.DumpLogSegments --files 000052368601.log
解码日志片段000001.log,显示消息内容
kafka-run-class.sh kafka.tools.DumpLogSegments --files 000001.log --print-data-log
验证日志片段00001.log索引文件的正确性
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00001.index,000001.log --index-sanity-check
// --verify-index-only 将会检查索引的匹配度
对broker1和broker2上以my-开头的主题副本进行验证
kafka-replica-verification.sh --broker-list kafka1.com:9092,kafka2.com:9092 --topic-white-list 'my-*'
使用旧版消费者读取单个主题
kafka-console-consumer.sh --zookeeper <zookeeper connect> --topic my-topic
向主题my-topic 生成2个消息
kafka-console-producer.sh --broker-list kafka1.com:9092,kafka2.com:9092 --topic my-topic