kafka学习(6)-Springboot整合kafka-消费者手动

时间:2024-04-05 11:40:29

目录

 

1、场景

2、代码

2.1、生产者、消费者配置文件

2.2、生产者发送100条测试数据

2.3、消费者收取100条测试数据-自动

2.4、消费者收取100条测试数据-手动

2.4.1、手动提交注释不打开

2.4.2、手动提交注释打开


1、场景

上一篇kafka的consumer消费者,我们使用的是自动提交offset下标。 
但是offset下标自动提交其实在很多场景都不适用,因为自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。 
很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 , 所以这时我们就需要进行手动提交kafka的offset下标。

2、代码

2.1、生产者、消费者配置文件


import java.time.Duration;

public class MQDict {
	public static final String MQ_ADDRESS_COLLECTION = "192.168.10.10:9092";			//kafka地址
    public static final String CONSUMER_TOPIC = "topic-test";						//消费者连接的topic
    public static final String PRODUCER_TOPIC = "topic-test";						//生产者连接的topic
    public static final String CONSUMER_GROUP_ID = "group-test";								//groupId,可以分开配置
    public static final String CONSUMER_ENABLE_AUTO_COMMIT = "true";				//是否自动提交(消费者)
    public static final String CONSUMER_AUTO_COMMIT_INTERVAL_MS = "1000";			
    public static final String CONSUMER_SESSION_TIMEOUT_MS = "30000";				//连接超时时间
    public static final int CONSUMER_MAX_POLL_RECORDS = 10;							//每次拉取数
    public static final Duration CONSUMER_POLL_TIME_OUT = Duration.ofMillis(3000);	//拉去数据超时时间
}

2.2、生产者发送100条测试数据


import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;

public class Producer {
	static Logger log = Logger.getLogger(Producer.class);

	private static KafkaProducer<String, String> producer = null;

	/*
	 * 初始化生产者
	 */
	static {
		Properties configs = initConfig();
		producer = new KafkaProducer<String, String>(configs);
	}

	/*
	 * 初始化配置
	 */
	private static Properties initConfig() {
		Properties props = new Properties();
		props.put("bootstrap.servers", MQDict.MQ_ADDRESS_COLLECTION);
		props.put("acks", "1");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("key.serializer", StringSerializer.class.getName());
		props.put("value.serializer", StringSerializer.class.getName());
		return props;
	}

	/**
	 * 批量发送100条
	 */
	public static void sendMessages() {
		// 消息实体
		ProducerRecord<String, String> record = null;
		for (int i = 0; i < 100; i++) {
			record = new ProducerRecord<String, String>(MQDict.PRODUCER_TOPIC, "value" + i);
			// 发送消息
			producer.send(record, new Callback() {
				@Override
				public void onCompletion(RecordMetadata recordMetadata, Exception e) {
					if (null != e) {
						log.info("send error" + e.getMessage());
					} else {
						System.out.println(String.format("发送信息---offset:%s,partition:%s", recordMetadata.offset(),
								recordMetadata.partition()));
					}
				}
			});
		}
		producer.close();
	}

}

结果:

                                                kafka学习(6)-Springboot整合kafka-消费者手动

 

2.3、消费者收取100条测试数据-自动

 public static final String CONSUMER_ENABLE_AUTO_COMMIT = "true";       //是否自动提交(消费者)


import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class Consumer {
	 static Logger log = Logger.getLogger(Consumer.class);

	    private static KafkaConsumer<String,String> consumer;

		/**
		*  初始化消费者
		*/
	    static {
	        Properties configs = initConfig();
	        consumer = new KafkaConsumer<String, String>(configs);
	        consumer.subscribe(Arrays.asList(MQDict.CONSUMER_TOPIC));
	    }
		/**
		*  初始化配置
		*/
	    private static Properties initConfig(){
	        Properties props = new Properties();
	        props.put("bootstrap.servers", MQDict.MQ_ADDRESS_COLLECTION);
	        props.put("group.id", MQDict.CONSUMER_GROUP_ID);
	        props.put("enable.auto.commit", MQDict.CONSUMER_ENABLE_AUTO_COMMIT);
	        props.put("auto.commit.interval.ms", MQDict.CONSUMER_AUTO_COMMIT_INTERVAL_MS);
	        props.put("session.timeout.ms", MQDict.CONSUMER_SESSION_TIMEOUT_MS);
	        props.put("max.poll.records", MQDict.CONSUMER_MAX_POLL_RECORDS);
	        props.put("auto.offset.reset", "earliest");
	        props.put("key.deserializer", StringDeserializer.class.getName());
	        props.put("value.deserializer", StringDeserializer.class.getName());
	        return props;
	    }
	    
	    public static void receivebatch() {
	        Logger.getLogger("org").setLevel(Level.ERROR);

	        while (true) {
	            ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
	            records.forEach((ConsumerRecord<String, String> record)->{
	                    log.info("收到消息: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
	            });
	        }
	    }
	 
}

 

 

2.4、消费者收取100条测试数据-手动

public static final String CONSUMER_ENABLE_AUTO_COMMIT = "false";       //是否自动提交(消费者)

2.4.1、手动提交注释不打开

	public static void receivebatch_shoudong() {
		Logger.getLogger("org").setLevel(Level.ERROR);

		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
			records.forEach((ConsumerRecord<String, String> record) -> {
				log.info("收到消息: key ===" + record.key() + " value ====" + record.value() + " topic ==="
						+ record.topic());
			});
		}
		// consumer.commitSync(); 提交
	}

执行结果:

               kafka学习(6)-Springboot整合kafka-消费者手动

可见提交的100条全部取出,当再次启动consumer时,执行结果:

               kafka学习(6)-Springboot整合kafka-消费者手动

因为没有提交消费,已经被消费的数据还会被再次消费。

 

2.4.2、手动提交注释打开

public static void receivebatch_shoudong_50() {
		 Logger.getLogger("org").setLevel(Level.ERROR);

	        int i = 1 ;
	        while (true) {
	            ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
	            records.forEach((ConsumerRecord<String, String> record)->{
	                    log.info("收到消息: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
	            });
	            i++;
	            //每次拉10条CONSUMER_MAX_POLL_RECORDS = 10;		
	            if (i > 5 ){
	                consumer.commitSync();
	                consumer.close();
	                break;
	            }
	        }
	}

执行结果:

               kafka学习(6)-Springboot整合kafka-消费者手动

再次执行:

              kafka学习(6)-Springboot整合kafka-消费者手动