一、环境
使用Kafka3.0.0
master | slave1 | slave2 | |
---|---|---|---|
ip | 193.168.3.34 | 193.168.3.35 | 193.168.3.36 |
二、maven引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
三、application配置
spring:
kafka:
bootstrap-servers: 192.168.3.34:9092,192.168.3.35:9092,192.168.3.36:9092 # 指定 kafka 的地址
producer: #生产者
retries: 0 #重复次数 ,失败不重发
batch-size: 16384 #每次批量发送消息的数量
buffer-memory: 33554432 #缓存大小达到buffer.memory就发送数据
acks: 1 # 0=生产者将不会等待来自服务器的任何确认 1=leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应 -1 =leader将等待完整的同步副本集以确认记录
key-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 key 的序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 value 的序列化器
consumer:
group-id: nacl #指定消费者组的 group_id
auto-offset-reset: earliest #latest 最新的位置 , earliest最早的位置
auto-commit-interval: 100 #自动提交offset频率 100毫秒
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 key 的反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 value 的反序列化器
listener:
concurrency: 3 #3个并行监听
四、SpringBoot-生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@CrossOrigin
@RestController
public class ProducerController {
// Kafka 模板用来向 kafka 发送数据
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@RequestMapping("/kf")
public String data() {
kafkaTemplate.send("first", "hello");
return "ok";
}
}
五、SpringBoot-消费者
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaConsumer {
// 指定要监听的 topic
@KafkaListener(topics = "first")
public void consumeTopic(String msg) { // 参数: 收到的 value
System.out.println("收到的信息: " + msg);
}
}
六、SpringBoot-主题分区
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaTopic {
@Bean
public NewTopic batchTopic() {
//项目启动时,自动创建topic,指定分区和副本数量
return new NewTopic("first", 3, (short) 1);
}
}