应用场景: kafka的多消费者消费同一个topic的数据,并且保证每条记录都能被消费掉
实现方式: topic要创建多个partition(partition的个数要大于等于消费者的个数),多个消费者在同一个组之内
第一步: 创建topic的多partition,并进行验证,如下所示:
[[email protected] ~]# kafka-topics.sh --list --zookeeper hadoop:2181
[[email protected] ~]# kafka-topics.sh --create --zookeeper hadoop:2181 --topic kafkatest --partitions 3 --replication-factor 1
Created topic "kafkatest".
[[email protected] ~]# kafka-topics.sh --list --zookeeper hadoop:2181
kafkatest
[[email protected] ~]# kafka-topics.sh --describe --zookeeper hadoop:2181
Topic:kafkatest PartitionCount:3 ReplicationFactor:1 Configs:
Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: kafkatest Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: kafkatest Partition: 2 Leader: 0 Replicas: 0 Isr: 0
结果显示,已经topic进行创建成功,并且partition的个数为3 (因为后面验证的kafka的消费者的个数为2)
第二步: 增加kafka的消费者配置文件consumer2.properties,为后面的测试做准备,如下所示:
#consumer group id
group.id=group1
说明: 仅仅做这一处修改,别的保留原有配置
第三步: 启动生产者,先不要输入数据,截图如下所示:
可以看到,生产者已经启动, 鼠标显示为等待输入状态
第四步: 分别打开两个新的窗口,都启动消费者,模拟两个消费者,截图如下所示:
可以看到,消费者已经启动,正在准备消费生产者数据
第五步: 在生产证的窗口,开始输入数据,验证消费者是否进行了消费,截图如下所示:
第六步,检查消费者的消费情况,发现一个消费者能正常进行消费,另一个消费者报以下的错误:
[email protected] ~]# kafka-console-consumer.sh --topic kafkatest --zookeeper hadoop:2181 --consumer.config /usr/local/kafka/config/consumer2.properties
[2019-03-16 14:52:57,873] ERROR [group1_hadoop-1552719160845-67df1458], error during syncedRebalance (kafka.consumer.ZookeeperConsumerConnector)
kafka.common.ConsumerRebalanceFailedException: group1_hadoop-1552719160845-67df1458 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:633)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:551)
[2019-03-16 14:53:06,009] ERROR [group1_hadoop-1552719160845-67df1458], error during syncedRebalance (kafka.consumer.ZookeeperConsumerConnector)
kafka.common.ConsumerRebalanceFailedException: group1_hadoop-1552719160845-67df1458 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:633)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:551)
error during syncedRebalance (kafka.consumer.ZookeeperConsumerConnector)
可以看到有“group1_hadoop-1552719160845-67df1458”信息,说明消费者的配置文件已经生效
核心的错误点是: "kafka.common.ConsumerRebalanceFailedException: group1_hadoop-1552719160845-67df1458 can't rebalance after 4 retries"
以下为官网资料,以及原理分析,截图如下所示:
找到原因之后,就要修改kafka的消费者配置文件consumer2.properties了, 如下所示:
#consumer group id
group.id=group1
#consumer timeout
#consumer.timeout.ms=5000
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
rebalance.backoff.ms=2000
rebalance.max.retries=10
重新启动kafka,删除topic之后重建,重新启动生产者和消费者以及输入数据进行验证,生产者截图以下所示:
两个消费者的截图以下所示:
可以看到,两个消费者共同消费了同一个topic的数据,没有重复消费并且也没有遗漏数据