原文:http://blog.csdn.net/u011637069/article/details/72899915
背景:之前用的kafka客户端版本是0.8,近期升级了kafka客户端的版本,写了新的消费者和生产者的代码,在本地测试没有问题,可以正常消费与生产。但最近的项目中使用了新版的代码,当数据量较大时会出现重复消费的问题。现将问题的排除与解决过程记录下来,避免再次踩坑。
问题发现:由于ConsumerRecord对象可以获取到当前消息的分区与偏移量,故在log日志中将当前消息的分区与偏移量也记录下来了。在监控日志的过程中,发现某一个分区的偏移量会在多个线程中出现,而且偏移量的值还不一样,因此觉得可能是重复消费,当查询程序消费的总记录数和kafka中的消息记录数相差甚多。
解决过程:上网搜索如何解决kafka重复消费的问题,都是在说kafka在session时间内未提交offset,故参考网上思路,将consumer的poll的时间改成100ms,即100ms从kafka上poll一次数据,并且设置props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000");kafka自动提交的时间间隔和session时间。
改完后测试,发现当kafka的数据量较大时,还是会有重复消费问题,然后将poll的数据 条数打印出来发现数据条数较多,而且一段时间后(一般30s)会报一个错误:
大概意思是:由于组已经重新平衡并将分区分配给另一个成员,因此无法完成提交。 这意味着在调用poll()的后续调用之间的时间比配置的session.timeout.ms长,这通常意味着poll循环花费太多时间消息处理。 您可以通过增加会话超时或通过使用max.poll.records减少poll()中返回的批次的最大大小来解决此问题。
于是设置如下参数:
这个数值的大小需要和 session.timeout.ms的时长做评估,即100条数据在session.timeout.ms时间内是否能处理完?
注:
还需要 注意的是,fetch.min.bytes这个参数配置,从kafka拉取的数据的大小,这个参数最好设置一下,不然的话可能出问题。建议设置为:
总结:
一般情况下,kafka重复消费都是由于未正常提交offset,故修改配置,正常提交offset即可解决。上文中提到的主要配置如下所示: