消费者消费方式
1、KafkaConsumer.subscribe():为consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给同group下的不同consumer。
2、KafkaConsumer.assign():为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当与指定的group无效(this method does not use the consumer’s group management)。
注意:consumer.assign()是不会被消费者的组管理功能管理的,他相对于是一个临时的,不会改变当前group.id的offset,比如:在使用consumer.subscribe(Arrays.asList(topicName));时offset为20,如果再通过assign方式已经获取了消息后,在下次通过consumer.subscribe(Arrays.asList(topicName));来获取消息时offset还是20,还是会获取20以后的消息。
3、我们还可以配置如下属性auto.offset.reset来,设置消费者从分区的开头或者末尾进行消费数据。当然这也是有条件的。
//一般配置earliest 或者latest 值
props.put("auto.offset.reset", "latest");
我把上述三种情况的消费者不同使用方式下,消费者提交offset的情况进行了归总和说明:
注意:
只要不更改group.id,每次重新消费kafka,都是从上次消费结束的地方继续开始,不论"auto.offset.reset”属性设置的是什么
场景一:Kafka上在实时被灌入数据,但kafka上已经积累了两天的数据,如何从最新的offset开始消费?
(最新指相对于当前系统时间最新)
1.将group.id换成新的名字(相当于加入新的消费组)
2.网上文章写还要设置 properties.setProperty("auto.offset.reset", "latest”)
实验发现即使不设置这个,只要group.id是全新的,就会从最新的的offset开始消费
场景二:kafka在实时在灌入数据,kafka上已经积累了两天的数据,如何从两天前最开始的位置消费?
1.将group.id换成新的名字
2.properties.setProperty("auto.offset.reset", "earliest”)
场景三:不更改group.id,只是添加了properties.setProperty("auto.offset.reset", "earliest”),consumer会从两天前最开始的位置消费吗?
不会,只要不更改消费组,只会从上次消费结束的地方继续消费
场景四:不更改group.id,只是添加了properties.setProperty("auto.offset.reset", "latest”),consumer会从距离现在最近的位置消费吗?
不会,只要不更改消费组,只会从上次消费结束的地方继续消费
应用:
正式打包上线前应该使用新的group.id,以便于从kafka最新的位置开始消费
只要将group.id换成全新的,不论"auto.offset.reset”是否设置,设置成什么,都会从最新的位置开始消费