kafka控制offset偏移量

时间:2022-04-02 08:17:05

通过KafkaConsumer.seek()来控制offset.注意如果设置了手动提交需要
commitAsync()否则不生效。

使用场景:
1.消费抛异常 offset+1跳过
2.重新消费前面的数据(消息写入hbase失败 重写)


try {
ConsumerRecords<String, String> consumerRecords = consumer.poll(pollMillions);
//key is null

for (ConsumerRecord<String, String> record : consumerRecords) {
sourceVistor.accept(sourceAdaptor.adapt(record.value()));

//计数器++
receiveCount.addAndGet(1);
}

if (!consumerRecords.isEmpty()) {
consumer.commitAsync();
}
}

catch (RecordTooLargeException e2) {
//跳过,忽略这条过长的
LOGGER.warn("kafkasource poll error: RecordTooLargeException, ignore and increment offset!", e2);
offsetIncrementIfTooLarge(e2.recordTooLargePartitions());
}
...
private void offsetIncrementIfTooLarge(Map<TopicPartition, Long> topicPartitions) {
for (TopicPartition topicPartition : topicPartitions.keySet()) {
consumer.seek(topicPartition, topicPartitions.get(topicPartition) + 1);
consumer.commitAsync();
}
}

“`