Spring Boot对kafka提供了自动配置(auto configuration)。使用用Spring Boot只需要做很少的配置即可集成对kafka的访问。
pom配置
1、继承spring-boot-starter-parent
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/>
</parent>
对于一个spring boot项目来说,最好继承于spring-boot-starter-parent。它会帮我们统一管理spring用到的相关的包的版本。
2、添加spring-boot-start和spring-kafka的依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
这里不需要设置依赖包的版本,spring-boot-starter-parent已经帮我们添加了版本的管理。
application.properties配置
在application.properties里提供了很多kafka的配置,配置项是以spring.kafka.为前缀,所有的配置项可以在org.springframework.boot.autoconfigure.kafka.KafkaProperties找到。
如配置:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
发送信息
spring-kafka提供了KafkaTemplate的一个接口,可以用它来发送消息。spring boot会自动构建KafkaTemplate对象,使用kafkaTemplate只需要使用@Autowired注入。
示例
@Component
public class MessageSender{
@Autowired
private final KafkaTemplate template;
public void send() {
this.template.send("myTopic", "message1");
this.template.send("myTopic", "message2");
this.template.send("myTopic", "message3");
}
}
接收消息
在接收消息端,只需要在指定Bean的方法上添加注解@KafkaListener,我们就可以监听Kafka的消息。
@Component
public class MessageReceiver {
@KafkaListener(topics = "myTopic")
public void processMessage(String content) {
// ...
}
}
Spring boot会自动创建KafkaListenerContainerFactory。如果要定义KafkaListen,可以在application.properties设置,相关配置项如下:
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
spring.kafka.listener.type=single # Listener type.
可以看到,基于Spring boot可以很简单就完成了kafka的集成。