kafka动态监听topic,配置轮询时间
使用spring-kafka依赖
使用topicPattern进行正则表达式匹配,topicPattern 已经可以做到定期检查topic列表,然后将新加入的topic分配至某个消费者。(默认为五分钟一次)
@KafkaListener(topicPattern = "topicTest.*", containerFactory = "testKafkaListenerFactory")
public void p1EquipmentTeamSyncEventListener(String command) {
System.out.println("-------------------------------------监听消费:"+command);
}
showcase.*中.是必须的,否则匹配不到目标topic。
配置轮询时间
@Bean("testKafkaListenerFactory")
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
Map<String, Object> props = new HashMap<>(5);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// (ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG,30000);
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new StringDeserializer()));
return factory;
}
(ConsumerConfig.METADATA_MAX_AGE_CONFIG,30000); //单位为毫秒