前言
最近项目里有个需求,要消费kafka里的数据。之前也手动写过代码去消费kafka数据。但是转念一想。既然spring提供了消费kafka的方法。就没必要再去重复造*。于是尝试使用spring的API。
项目技术背景,使用springMVC,XML配置和注解相互使用。kafka的配置都是使用XML方式。
整合过程
1. 引入spring-kafka的依赖包
1
2
3
4
5
|
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version> 2.2 . 0 .RELEASE</version>
</dependency>
|
2. 在spring的xml文件里增加配置项,也可以单独创建一个spring-context-XX.xml文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
<!-- consumer configuration 该配置项可以根据自己业务的实际需求做增加或删除-->
<bean id= "consumerProperties" class = "java.util.HashMap" >
<constructor-arg>
<map>
<entry key= "bootstrap.servers" value= "${kafka.bootstrap.servers}" />
<entry key= "group.id" value= "group" />
<entry key= "enable.auto.commit" value= "true" />
<entry key= "auto.commit.interval.ms" value= "3000" />
<entry key= "session.timeout.ms" value= "10000" />
<entry key= "key.deserializer"
value= "org.apache.kafka.common.serialization.StringDeserializer" />
<entry key= "value.deserializer"
value= "org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- create factory 该类是spring jar包里提供,就这么配置-->
<bean id= "consumerFactory" class = "org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
<constructor-arg>
<ref bean= "consumerProperties" />
</constructor-arg>
</bean>
<!-- 自定义的消费类,需要实现spring的接口 -->
<bean id= "payPalConsumer"
class = "com.chao.service.consumer.PayPalConsumer" />
<!-- 该类也是jar包里提供的,注入的监听类是自己定义的,topic名称是配置文件引入的-->
<bean id= "containerProperties" class = "org.springframework.kafka.listener.ContainerProperties" >
<constructor-arg name= "topics" value= "${kafka.paypal.topic.name}" />
<property name= "messageListener" ref= "payPalConsumer" />
</bean>
<!-- 改类也是jar里提供的,把这个containerProperties和consumerfactory 注入 -->
<bean id= "messageListenerContainer" class = "org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method= "doStart" >
<constructor-arg ref= "consumerFactory" />
<constructor-arg ref= "containerProperties" />
</bean>
|
2. 自定义消费者类,消费者类依然可以使用注解。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
/**
* get msg from kafka
*/
@Component
public class PayPalConsumer implements MessageListener<String, String> {
private static Logger logger = LoggerFactory.getLogger(PayPalConsumer. class );
@Autowired
private XXService XXService;
@Override
public void onMessage(ConsumerRecord<String, String> authorizeRecord) {
String value = authorizeRecord.value();
if (StringUtils.isEmpty(value)){
logger.warn( "receive message from kafka is null" );
return ;
}
logger.info( "receive message from kafka is {}" ,value);
}
}
|
使用这个步骤配置,一次性过。非常顺利。
到此这篇关于spring 整合kafka监听消费的配置过程的文章就介绍到这了,更多相关spring 整合kafka内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://www.cnblogs.com/snail-learn-code/p/14480267.html