Spring Boot集成kafka

时间:2023-05-12 17:45:23

Spring Boot对kafka提供了自动配置(auto configuration)。使用用Spring Boot只需要做很少的配置即可集成对kafka的访问。

pom配置

1、继承spring-boot-starter-parent

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>1.5.9.RELEASE</version>
  <relativePath/>
 </parent>

对于一个spring boot项目来说,最好继承于spring-boot-starter-parent。它会帮我们统一管理spring用到的相关的包的版本。

2、添加spring-boot-start和spring-kafka的依赖

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
</dependency>

这里不需要设置依赖包的版本,spring-boot-starter-parent已经帮我们添加了版本的管理。

application.properties配置

在application.properties里提供了很多kafka的配置,配置项是以spring.kafka.为前缀,所有的配置项可以在org.springframework.boot.autoconfigure.kafka.KafkaProperties找到。

如配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

发送信息

spring-kafka提供了KafkaTemplate的一个接口,可以用它来发送消息。spring boot会自动构建KafkaTemplate对象,使用kafkaTemplate只需要使用@Autowired注入。

示例

@Component
public class MessageSender{

    @Autowired
	private final KafkaTemplate template;

    public void send() {
      this.template.send("myTopic", "message1");
      this.template.send("myTopic", "message2");
      this.template.send("myTopic", "message3");
    }

}

接收消息

在接收消息端,只需要在指定Bean的方法上添加注解@KafkaListener,我们就可以监听Kafka的消息。

@Component
public class MessageReceiver {

	@KafkaListener(topics = "myTopic")
	public void processMessage(String content) {
		// ...
	}
}

Spring boot会自动创建KafkaListenerContainerFactory。如果要定义KafkaListen,可以在application.properties设置,相关配置项如下:

spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
spring.kafka.listener.type=single # Listener type.

可以看到,基于Spring boot可以很简单就完成了kafka的集成。