目录
1、场景
上一篇kafka的consumer消费者,我们使用的是自动提交offset下标。
但是offset下标自动提交其实在很多场景都不适用,因为自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 , 所以这时我们就需要进行手动提交kafka的offset下标。
2、代码
2.1、生产者、消费者配置文件
import java.time.Duration;
public class MQDict {
public static final String MQ_ADDRESS_COLLECTION = "192.168.10.10:9092"; //kafka地址
public static final String CONSUMER_TOPIC = "topic-test"; //消费者连接的topic
public static final String PRODUCER_TOPIC = "topic-test"; //生产者连接的topic
public static final String CONSUMER_GROUP_ID = "group-test"; //groupId,可以分开配置
public static final String CONSUMER_ENABLE_AUTO_COMMIT = "true"; //是否自动提交(消费者)
public static final String CONSUMER_AUTO_COMMIT_INTERVAL_MS = "1000";
public static final String CONSUMER_SESSION_TIMEOUT_MS = "30000"; //连接超时时间
public static final int CONSUMER_MAX_POLL_RECORDS = 10; //每次拉取数
public static final Duration CONSUMER_POLL_TIME_OUT = Duration.ofMillis(3000); //拉去数据超时时间
}
2.2、生产者发送100条测试数据
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;
public class Producer {
static Logger log = Logger.getLogger(Producer.class);
private static KafkaProducer<String, String> producer = null;
/*
* 初始化生产者
*/
static {
Properties configs = initConfig();
producer = new KafkaProducer<String, String>(configs);
}
/*
* 初始化配置
*/
private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", MQDict.MQ_ADDRESS_COLLECTION);
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
return props;
}
/**
* 批量发送100条
*/
public static void sendMessages() {
// 消息实体
ProducerRecord<String, String> record = null;
for (int i = 0; i < 100; i++) {
record = new ProducerRecord<String, String>(MQDict.PRODUCER_TOPIC, "value" + i);
// 发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (null != e) {
log.info("send error" + e.getMessage());
} else {
System.out.println(String.format("发送信息---offset:%s,partition:%s", recordMetadata.offset(),
recordMetadata.partition()));
}
}
});
}
producer.close();
}
}
结果:
2.3、消费者收取100条测试数据-自动
public static final String CONSUMER_ENABLE_AUTO_COMMIT = "true"; //是否自动提交(消费者)
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
public class Consumer {
static Logger log = Logger.getLogger(Consumer.class);
private static KafkaConsumer<String,String> consumer;
/**
* 初始化消费者
*/
static {
Properties configs = initConfig();
consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList(MQDict.CONSUMER_TOPIC));
}
/**
* 初始化配置
*/
private static Properties initConfig(){
Properties props = new Properties();
props.put("bootstrap.servers", MQDict.MQ_ADDRESS_COLLECTION);
props.put("group.id", MQDict.CONSUMER_GROUP_ID);
props.put("enable.auto.commit", MQDict.CONSUMER_ENABLE_AUTO_COMMIT);
props.put("auto.commit.interval.ms", MQDict.CONSUMER_AUTO_COMMIT_INTERVAL_MS);
props.put("session.timeout.ms", MQDict.CONSUMER_SESSION_TIMEOUT_MS);
props.put("max.poll.records", MQDict.CONSUMER_MAX_POLL_RECORDS);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return props;
}
public static void receivebatch() {
Logger.getLogger("org").setLevel(Level.ERROR);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
records.forEach((ConsumerRecord<String, String> record)->{
log.info("收到消息: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
});
}
}
}
2.4、消费者收取100条测试数据-手动
public static final String CONSUMER_ENABLE_AUTO_COMMIT = "false"; //是否自动提交(消费者)
2.4.1、手动提交注释不打开
public static void receivebatch_shoudong() {
Logger.getLogger("org").setLevel(Level.ERROR);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
records.forEach((ConsumerRecord<String, String> record) -> {
log.info("收到消息: key ===" + record.key() + " value ====" + record.value() + " topic ==="
+ record.topic());
});
}
// consumer.commitSync(); 提交
}
执行结果:
可见提交的100条全部取出,当再次启动consumer时,执行结果:
因为没有提交消费,已经被消费的数据还会被再次消费。
2.4.2、手动提交注释打开
public static void receivebatch_shoudong_50() {
Logger.getLogger("org").setLevel(Level.ERROR);
int i = 1 ;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
records.forEach((ConsumerRecord<String, String> record)->{
log.info("收到消息: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
});
i++;
//每次拉10条CONSUMER_MAX_POLL_RECORDS = 10;
if (i > 5 ){
consumer.commitSync();
consumer.close();
break;
}
}
}
执行结果:
再次执行: