kafka动态监听topic,配置轮询时间

时间:2025-04-04 09:08:55

kafka动态监听topic,配置轮询时间

使用spring-kafka依赖

使用topicPattern进行正则表达式匹配,topicPattern 已经可以做到定期检查topic列表,然后将新加入的topic分配至某个消费者。(默认为五分钟一次)

 @KafkaListener(topicPattern = "topicTest.*", containerFactory = "testKafkaListenerFactory")
    public void p1EquipmentTeamSyncEventListener(String command) {
        System.out.println("-------------------------------------监听消费:"+command);
    }

showcase.*中.是必须的,否则匹配不到目标topic。

配置轮询时间

      @Bean("testKafkaListenerFactory")
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
        Map<String, Object> props = new HashMap<>(5);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
//        (ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);
        props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG,30000);
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new StringDeserializer()));
        return factory;
    }

(ConsumerConfig.METADATA_MAX_AGE_CONFIG,30000); //单位为毫秒