系统
启动 Kafka
-daemon
参数可以让 Kafka 在后台运行。
复制
指定 JMX 端口启动
JMX 的全称为 Java Management Extensions。顾名思义,是管理 Java 的一种扩展,通过 JMX 可以方便我们监控 Kafka 的内存,线程,CPU 的使用情况,以及生产和消费消息的指标。
复制
停止 Kafka
复制
Topic
创建 Topic
复制
列出所有 Topic
复制
查看指定 Topic
复制
删除指定 Topic
复制
扩展 Topic 的 Partition 数量
artition 数量只能扩大不能缩小。
复制
扩展 topic 每个 partition 的副本数量
replication factor 可以扩大也可以缩小,最多不能超过 broker 数量。先创建一个文件名为 increace-factor.json,这里要扩展的是 mysql-audit-log 这个 topic 的 partition 到 15 个:0,1,2 为 broker id。
复制
复制
查看 Topic 数据大小
复制
消费者组 Consumer Group
列出所有的 Consumer Group
复制
查看指定 Consumer Group 详情
- GROUP:消费者 group
- TOPIC:话题 id
- PARTITION:分区 id
- CURRENT-OFFSET:当前已消费的条数
- LOG-END-OFFSET:总条数
- LAG:未消费的条数
- CONSUMER-ID:消费者 id
- HOST:消费者 ip 地址
- CLIENT-ID:客户端 id
复制
删除指定 Consumer Group
复制
消息
生产消息
普通生产消息
复制
生产消息指定 Key
key.separator=,
指定以逗号作为 key 和 value 的分隔符。
复制
消费消息
从头开始消费
从头开始消费是可以消费到之前的消息的,通过 --from-beginning
指定:
复制
从尾部开始消费
--offset latest
指定从尾部开始消费,另外还需要指定 partition,可以指定多个:
复制
消费指定条数的消息
--max-messages
指定取的个数:
复制
指定消费组进行消费
--consumer-property group.id=<消费者组名>
执行消费者组进行消费:
复制
查看消息具体内容
复制
查看 Topic 中当前消息总数
Kafka 自带的命令没有直接提供这样的功能,要使用 Kafka 提供的工具类 GetOffsetShell 来计算给定 Topic 每个分区当前最早位移和最新位移,差值就是每个分区的当前的消息总数,将该 Topic 所有分区的消息总数累加就能得到该 Topic 总的消息数。
首先查询 Topic 中每个分区 offset 的最小值(起始位置),使用 --time -2
参数。一个分区的起始位置并不是每时每刻都为 0 ,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加。
复制
然后使用--time -1
参数查询 Topic 各个分区的 offset 的最大值。
复制
对于本例来说,test-topic 中当前总的消息数为 (5500000 - 0) + (5500000 - 0),等于 1100 万条。如果只是要获取 Topic 中总的消息数(包括已经从 Kafka 删除的消息),那么只需要将 Topic 中每个 Partition 的 Offset 累加即可。
Offset
重置消费者 Offset
复制
性能测试
生产者性能测试
- --num-records 10000000: 向指定主题发送了 1 千万条消息。
- --record-size 1024: 每条消息的大小为 1024KB。
- --throughput -1: 不限制吞吐量。
- --producer-props: 指定生产者参数。
- acks=-1: 这要求 ISR 列表里跟 leader 保持同步的那些 follower 都要把消息同步过去,才能认为这条消息是写入成功。
- linger.ms=2000: batch.size 和 linger.ms 是对 kafka producer 性能影响比较大的两个参数。batch.size 是 producer 批量发送的基本单位,默认是 16384Bytes,即 16kB;lingger.ms 是 sender 线程在检查 batch 是否 ready 时候,判断有没有过期的参数,默认大小是 0ms。
- compression.type=lz4: 使用 lz4 压缩算法。
复制
我们应该关心延时的概率分布情况,仅仅知道一个平均值是没有意义的。这就是这里计算分位数的原因。通常我们关注到 99th 分位就可以了。比如在上面的输出中,99th 值是 262 ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 262 ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。
消费者性能测试
复制
虽然输出格式有所差别,但该脚本也会打印出消费者的吞吐量数据。比如本例中的 629.9997MB/s。有点令人遗憾的是,它没有计算不同分位数下的分布情况。因此,在实际使用过程中,这个脚本的使用率要比生产者性能测试脚本的使用率低。
修改动态参数
查看支持的动态参数
如果你想要知道动态 Broker 参数都有哪些,一种方式是在 Kafka 官网中查看 Broker 端参数列表,另一种方式是直接运行无参数的 kafka-configs 脚本,该脚本的说明文档会告诉你当前动态 Broker 参数都有哪些。
复制
修改 Broker 动态参数
修改动态参数无需重启 Broker,动态 Broker 参数的使用场景非常广泛,通常包括但不限于以下几种:
- 动态调整 Broker 端各种线程池大小,实时应对突发流量。
- 动态调整 Broker 端连接信息或安全配置信息。
- 动态更新 SSL Keystore 有效期。
- 动态调整 Broker 端 Compact 操作性能。
- 实时变更 JMX 指标收集器 (JMX Metrics Reporter)。
Kafka Broker Config 的参数有以下 3 种类型:
- read-only:被标记为 read-only 的参数和原来的参数行为一样,只有重启 Broker,才能令修改生效。
- per-broker:被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效。
- cluster-wide:被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效,也就是说,对所有 Broker 都生效。你也可以为具体的 Broker 修改 cluster-wide 参数。
在集群层面设置全局值,即设置 cluster-wide 范围值,将 unclean.leader.election.enable
参数在集群层面设置为 true。
复制
如果要设置 cluster-wide 范围的动态参数,需要显式指定 entity-default。现在,我们使用下面的命令来查看一下刚才的配置是否成功。
复制
在 Zookeeper 上查看 /config/brokers/ 节点可以查看 cluster-wide 的动态参数设置。
复制
设置 per-broker 范围参数。我们还是以 unclean.leader.election.enable
参数为例,我现在为 ID 为 1 的 Broker 设置一个不同的值。命令如下:
复制
我们使用下列命令查看 Broker ID 为 1 的节点动态参数,可以看到 DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false
,表示我们刚才对 per-broker 参数的调整生效了。
复制
在 Zookeeper 上查看 /config/brokers/1 节点可以查看 Broker ID 为 1 的节点的动态参数设置。
复制
删除 cluster-wide 范围动态参数。
复制
删除 per-broker 范围参数。
复制
修改 Topic 动态参数
设置 Topic test-topic 的 retention.ms
为 10000。
复制
查看设置的 Topic 动态参数。
复制
在 Zookeeper 上可以查看 /config/topics/ 来查看 Topic 动态参数。
复制
删除 Topic 动态参数。
复制
Kafka 集群一键启动/停止脚本
环境变量设置:
复制
一键启动/停止脚本,查看状态需要安装 jps 工具。
复制
参考资料
- Kafka 动态配置了解下?(https://time.geekbang.org/column/article/113504)
- 常见工具脚本大汇总(https://time.geekbang.org/column/article/116111)
- Kafka 并不难学!