kafka代码实现

时间:2022-10-18 16:05:11

Kafka是一个分布式、分区的、多副本的、多订阅者,常见可以用于日志、日常系统解耦、削峰、提速、广播、消息服务等等。


导入jar包

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency>

kafka消费端

public class KafkaConsume implements Runnable {

private KafkaConsume(){}

public KafkaConsume(String groupId, Collection<String> topics){

Properties props = new Properties();
//连接kafka的服务器地址,多个服务以逗号隔开
props.put("bootstrap.servers", "kafakServers:9092,kafakServers1:9092");
//none:如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常
//earliest:在各分区下有提交的offset时,从offset处开始消费;在各分区下无提交的offset时,从头开始消费
//latest:在各分区下有提交的offset时,从offset处开始消费;在各分区下无提交的offset时,从最新的数据开始消费
props.put("auto.offset.reset", "earliest");
//指定消费
props.put("group.id", groupId);
//是否开启自动提交Offset,默认:true
props.put("enable.auto.commit", "false");
//自动提交Offset的时间间隔,默认:5000ms
props.put("auto.commit.interval.ms", "100");
//该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB,
//KafkaConsumer.poll()方法从每个分区里返回的记录最多不超过max.partition.fetch.bytes指定的字节。
//如果一个主题有20个分区和5个消费者,那么每个消费者需要至少4MB的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,
//因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。
props.put("max.partition.fetch.bytes", "10240");
//一次调用poll()操作时返回的最大记录数,默认值为500
props.put("max.poll.records", 1000);
//该属性值指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s。
//如果消费者没有在session.timeout.ms指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其它消费者
props.put("session.timeout.ms", "30000");
//consumer给broker发送⼼跳的间隔时间
props.put("heartbeat.interval.ms", 10000);
//因为消息发送的时候将key 和 value 进行序列化生成字节数组,因此消费数据的时候需要反序列化为原来的数据
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty ("security.protocol", "SASL_PLAINTEXT");
props.setProperty ("sasl.mechanism", "PLAIN");
//设置kafka登入账号和没密码
String jassc = "org.apache.kafka.common.security.plain.PlainLoginModule required\n"
+ "username = \"kafkaUsername\"\n"
+ "password =\"kafkaPassword\";";
props.setProperty("sasl.jaas.config", jassc);
//创建消费者
consumer = new KafkaConsumer(props);
//订阅的主题,可以多个
consumer.subscribe(topics);//topics

//创建生产者
producer = new KafkaProducer(props);
}

private static KafkaConsumer<String, String> consumer = null;

//从kafka中获取到消息存入队列在程序中再处理
public final static LinkedBlockingQueue linkedQueue = new LinkedBlockingQueue<String>(1000);

public String getLinkedQueue() throws InterruptedException {
return (String) linkedQueue.take();
}

public void run(){
while (true) {
try {

//每次获取消息时长
ConsumerRecords<String, String> records = consumer.poll(1000*10);//毫秒

for (ConsumerRecord<String, String> record : records) {
// System.out.println(++i + "fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key: " + record.key() + ",value:" + record.value() );
JSONObject json = JSONObject.fromObject(record.value());
String news = json.getString("userId")+"|"+json.getString("kankeId")+"|"+json.getString("playTime");
//可以进行业务处理,或直接存队列
linkedQueue.put(news);
}
if(records.count() > 0){
//手动同步提交
consumer.commitSync();
}
}catch (Exception e){
e.printStackTrace();
continue;
}
}
}

public static void main(String[] args) {
new KafkaConsume("mygroup6",Arrays.asList("user_watch_log")).run();
}

public void setProducer(){
producer.send(new ProducerRecord<>("topics", "key", "value"));
producer.close();
}
}