Java使用kafka发送和生产消息的示例

时间:2022-05-26 09:42:59

1. maven依赖包

?
1
2
3
4
5
<dependency>
 <groupid>org.apache.kafka</groupid>
 <artifactid>kafka-clients</artifactid>
 <version>0.9.0.1</version>
</dependency>

2. 生产者代码

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.lnho.example.kafka; 
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producer;
import org.apache.kafka.clients.producer.producerrecord;  
import java.util.properties;  
public class kafkaproducerexample {
 public static void main(string[] args) {
  properties props = new properties();
  props.put("bootstrap.servers", "master:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");  
  producer<string, string> producer = new kafkaproducer<>(props);
  for(int i = 0; i < 100; i++)
   producer.send(new producerrecord<>("topic1", integer.tostring(i), integer.tostring(i)));  
  producer.close();
 }
}

3. 消费者代码

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.lnho.example.kafka;  
import org.apache.kafka.clients.consumer.consumerrecord;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import java.util.arrays;
import java.util.properties;  
public class kafkaconsumerexample {
 public static void main(string[] args) {
  properties props = new properties();
  props.put("bootstrap.servers", "master:9092");
  props.put("group.id", "test");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");
  kafkaconsumer<string, string> consumer = new kafkaconsumer<>(props);
  consumer.subscribe(arrays.aslist("topic1"));
  while (true) {
   consumerrecords<string, string> records = consumer.poll(100);
   for (consumerrecord<string, string> record : records)
    system.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
  }
 }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/u012129558/article/details/80065817