Kafka消费者参数踩坑记

时间:2025-04-05 08:25:43
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${-servers:ip1:9092,ip2:9092,ip3:9092}") private String connectionPath; @Value("${-id:consumer-group-1}") private String groupId; @Value("${:30000}") private String zookeeperSessionTimeOut; @Value("${:200}") private String zookeeperSyncTime; @Value("${-commit-interval:5000}") private String autoCommitIntervalTime; @Value("${:10}") private int timeSection; // @Value("${-offset-reset:latest}") @Value("${-offset-reset:earliest}") private String autoOffSetCommit; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); (consumerFactory()); (3); ().setPollTimeout(3000); (true); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<String, Object>(); // 配置要连接的zookeeper地址与端口 ("", connectionPath); // 配置zookeeper的组id ("", groupId); // zookeeper session 失效时间 ("", zookeeperSessionTimeOut); // // 配置zookeeper连接超时间隔 ("", zookeeperSyncTime); // 自动提交位移变量时间间隔 ("",autoCommitIntervalTime); // 设置拉取时间间隔 ("",300000); ("", timeSection);// 每一批数量 ("", autoOffSetCommit); ("", true); ("", ""); ("", ""); return propsMap; } }