用java代码手动控制kafkaconsumer偏移量

时间:2024-01-05 09:42:08

为应对消费结果需要存储到关系数据库中,避免数据库down时consumer继续消费的场景

http://kafka.apache.org

查了很多源码都记录下来,省的下次还要过滤源码。

//如果将结果存储在关系数据库中,那么在数据库中存储偏移量也可以允许在单个事务中提交结果和偏移量.。因此,要么事务成功,偏移量将根据所消耗的内容进行更新,否则结果将不会被存储,偏移量不会被更新.。
If the results of the consumption are being stored in a relational database, storing the offset in the database as well can allow committing both the results and offset in a single transaction. Thus either the transaction will succeed and the offset will be updated based on what was consumed or the result will not be stored and the offset won't be updated.

每个记录都有自己的偏移量,所以要管理你自己的偏移,你只需要做以下:

1.Configure enable.auto.commit=false
2.Use the offset provided with each ConsumerRecord to save your position.
3.On restart restore the position of the consumer using seek(TopicPartition, long).

这里分享一个别人的源码分析:http://blog.csdn.net/chunlongyu/article/details/52663090>

原子操作( Atomic operations): 不可中断的一个或一系列操作,就像原子一样,不能再被拆分了,已经是最小单位了,当然在这里没有单位只有操作。

public void consume() throws FileNotFoundException, IOException {
Properties props = new Properties();
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = null;
try {
props.load(new FileInputStream(new File("./config/consumer.properties")));
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(props.getProperty("topic")));
boolean y = true;
while (run) { ConsumerRecords<String, String> records = consumer.poll(100);
log.info("records.count():" + records.count());
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);// 疑问,这个list里的数据顺序是怎么确定的 long firstoffset = partitionRecords.get(0).offset();
try {
for (ConsumerRecord<String, String> record : partitionRecords) {
if (this.handler != null)
this.handler.handle(record.offset(), record.key(), record.value());
// TODO insert db }
} catch (Exception e) {
log.info("insert db filuer");
consumer.seek(partition, firstoffset);
y = false;
}
if (y) { long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastoffset + 1)));// singletonXxx():返回一个只包含指定对象的,不可变的集合对象。
}
} }
} catch (Exception e) {
e.printStackTrace();
} finally {
if (consumer != null)
consumer.close();
}
}