Kafka消费者参数踩坑记
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${-servers:ip1:9092,ip2:9092,ip3:9092}")
private String connectionPath;
@Value("${-id:consumer-group-1}")
private String groupId;
@Value("${:30000}")
private String zookeeperSessionTimeOut;
@Value("${:200}")
private String zookeeperSyncTime;
@Value("${-commit-interval:5000}")
private String autoCommitIntervalTime;
@Value("${:10}")
private int timeSection;
// @Value("${-offset-reset:latest}")
@Value("${-offset-reset:earliest}")
private String autoOffSetCommit;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
(consumerFactory());
(3);
().setPollTimeout(3000);
(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<String, Object>();
// 配置要连接的zookeeper地址与端口
("", connectionPath);
// 配置zookeeper的组id
("", groupId);
// zookeeper session 失效时间
("", zookeeperSessionTimeOut); //
// 配置zookeeper连接超时间隔
("", zookeeperSyncTime);
// 自动提交位移变量时间间隔
("",autoCommitIntervalTime);
// 设置拉取时间间隔
("",300000);
("", timeSection);// 每一批数量
("", autoOffSetCommit);
("", true);
("", "");
("", "");
return propsMap;
}
}