Kafka系列三 java API操作

时间:2022-12-26 15:50:30

使用java API操作kafka

1.pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast</groupId>
<artifactId>KafkaDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
</project>

2.producer和consumer配置文件

  2.1producer.properties

#请求时候需要验证
acks=all
#请求失败时候需要重试
retries=0
#内存缓存区大小
buffer.memory=33554432
#分区类
partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
#broker地址
bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092
#指定消息key序列化方式
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#指定消息本身的序列化方式
value.serializer=org.apache.kafka.common.serialization.StringSerializer

  2.2consumer.properties

#每个消费者分配独立的组号
group.id=test
#如果value合法,则自动提交偏移量
enable.auto.commit=true
#设置多久一次更新被消费消息的偏移量
auto.commit.interval.ms=1000
#设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
session.timeout.ms=30000
#指定消息key序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消息本身的序列化方式
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#broker地址
bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092

3.生产者和消费者代码

  3.1 KafkaProducerSimple.java

 package cn.itcast.kafka;

 import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.UUID; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerSimple {
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
InputStream inStream = KafkaProducerSimple.class.getClassLoader().getResourceAsStream("producer.properties"); properties.load(inStream); Producer<String, String> producer = new KafkaProducer<>(properties);
String TOPIC = "orderMq6";
for (int messageNo = 1; messageNo < 10000; messageNo++) {
producer.send(new ProducerRecord<String, String>(TOPIC,messageNo + "", UUID.randomUUID() + "itcast"));
}
}
}

  3.2 KafkaConsumerSimple.java

 package cn.itcast.kafka;

 import java.io.InputStream;
import java.util.Arrays;
import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerSimple { public static void main(String[] args) throws Exception {
Properties properties = new Properties();
InputStream inStream = KafkaConsumerSimple.class.getClassLoader().getResourceAsStream("consumer.properties");
properties.load(inStream);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("orderMq6"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (records.count() > 0) {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
} }
}
}
}

  以上代码如果执行超时,必须在本地host文件中配置broker的hostname和ip的映射。