只是简略的记录如何手动初始化Kafka
消费者、生产者
1.Consumer
相关配置
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, xxx);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, xxx);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, xxx);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, xxx);
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, xxx);
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, xxx);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, xxx);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, xxx);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, xxx);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, xxx);
properties.put(SPECIFIC_AVRO_READER_CONFIG, xxx);
2.新建Kafka Consumer
实例
consumer = new KafkaConsumer<>(这里使用上面的配置)
3.订阅topic
consumer.subscribe(xxxTopics);
1.Producer
相关配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, xxx);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, xxx);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, xxx);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, xxx);
2.新建生产者实例
producer = new KafkaProducer<>(使用上面的配置);
3.产生内容到kafka
producer.send(new ProducerRecord<>(WhichTopic, Key, Value));