// 创建配置对象
Map<String,Object> configMap = new HashMap<>();
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");
// 对生产的数据的K,V 进行反序列化的操作
configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
configMap.put(ConsumerConfig.GROUP_ID_CONFIG,"com.kafka.test");
// 创建消费者对象
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(configMap);
//订阅主题
kafkaConsumer.subscribe(Collections.singleton("test"));
// 从Kafka中获取数据
// 消费者从Kafka中拉取数据
ConsumerRecords<String, String> datas = kafkaConsumer.poll(1000);
datas.forEach(data ->{
System.out.println(data);
});
// 关闭消费者对象
kafkaConsumer.close();
点击下方名片,关注『编程青衫客』
随时随地获取最新好文章!