springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo。
1.pom配置
只需要在dependencies中增加 spring-kafka的配置即可。完整效果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
<parent>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-parent</artifactid>
<version> 1.5 . 4 .release</version>
</parent>
<properties>
<java.version> 1.8 </java.version>
<spring-kafka.version> 1.2 . 2 .release</spring-kafka.version>
<project.build.sourceencoding>utf- 8 </project.build.sourceencoding>
</properties>
<dependencies>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-test</artifactid>
<scope>test</scope>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-aop</artifactid>
</dependency>
<!-- spring-kafka -->
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-kafka</artifactid>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-kafka-test</artifactid>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
|
2.生产者
参数配置类,其参数卸载yml文件中,通过@value注入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package com.dhb.kafka.producer;
import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.common.serialization.stringserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;
import java.util.hashmap;
import java.util.map;
@configuration
public class senderconfig {
@value ( "${kafka.bootstrap-servers}" )
private string bootstrapservers;
@bean
public map<string,object> producerconfigs() {
map<string,object> props = new hashmap<>();
props.put(producerconfig.bootstrap_servers_config, this .bootstrapservers);
props.put(producerconfig.key_serializer_class_config, stringserializer. class );
props.put(producerconfig.value_serializer_class_config,stringserializer. class );
props.put(producerconfig.acks_config, "0" );
return props;
}
@bean
public producerfactory<string,string> producerfactory() {
return new defaultkafkaproducerfactory<>(producerconfigs());
}
@bean
public kafkatemplate<string,string> kafkatemplate() {
return new kafkatemplate<string, string>(producerfactory());
}
@bean
public sender sender() {
return new sender();
}
}
|
消息发送类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
package com.dhb.kafka.producer;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
@slf4j
public class sender {
@autowired
private kafkatemplate<string,string> kafkatemplate;
public void send(string topic,string payload) {
log.info( "sending payload='{}' to topic='{}'" ,payload,topic);
this .kafkatemplate.send(topic,payload);
}
}
|
3.消费者
参数配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package com.dhb.kafka.consumer;
import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import java.util.hashmap;
import java.util.map;
@configuration
@enablekafka
public class receiverconfig {
@value ( "${kafka.bootstrap-servers}" )
private string bootstrapservers;
public map<string,object> consumerconfigs() {
map<string,object> props = new hashmap<>();
props.put(consumerconfig.bootstrap_servers_config,bootstrapservers);
props.put(consumerconfig.key_deserializer_class_config, stringdeserializer. class );
props.put(consumerconfig.value_deserializer_class_config,stringdeserializer. class );
props.put(consumerconfig.group_id_config, "helloword" );
return props;
}
@bean
public consumerfactory<string,string> consumerfactory() {
return new defaultkafkaconsumerfactory<>(consumerconfigs());
}
@bean
public concurrentkafkalistenercontainerfactory<string,string> kafkalistenercontainerfactory() {
concurrentkafkalistenercontainerfactory<string,string> factory =
new concurrentkafkalistenercontainerfactory<>();
factory.setconsumerfactory(consumerfactory());
return factory;
}
@bean
public receiver receiver() {
return new receiver();
}
}
|
消息接受类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.dhb.kafka.consumer;
import lombok.extern.slf4j.slf4j;
import org.springframework.kafka.annotation.kafkalistener;
import java.util.concurrent.countdownlatch;
@slf4j
public class receiver {
private countdownlatch latch = new countdownlatch( 1 );
public countdownlatch getlatch() {
return latch;
}
@kafkalistener (topics = "${kafka.topic.helloworld}" )
public void receive(string payload) {
log.info( "received payload='{}'" ,payload);
latch.countdown();
}
}
|
3.web测试类
定义了一个基于http的web测试接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
package com.dhb.kafka.web;
import com.dhb.kafka.producer.sender;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.requestmethod;
import org.springframework.web.bind.annotation.restcontroller;
import javax.servlet.http.httpservletrequest;
import javax.servlet.http.httpservletresponse;
import java.io.ioexception;
@restcontroller
@slf4j
public class kafkaproducer {
@autowired
sender sender;
@requestmapping (value = "/sender.action" , method = requestmethod.post)
public void exec(httpservletrequest request, httpservletresponse response,string data) throws ioexception{
this .sender.send( "testtopic" ,data);
response.setcharacterencoding( "utf-8" );
response.setcontenttype( "text/json" );
response.getwriter().write( "success" );
response.getwriter().flush();
response.getwriter().close();
}
}
|
4.启动类及配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
package com.dhb.kafka;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
@springbootapplication
public class kafkaapplication {
public static void main(string[] args) {
springapplication.run(kafkaapplication. class ,args);
}
}
|
application.yml
1
2
3
4
|
kafka:
bootstrap-servers: 192.168 . 162.239 : 9092
topic:
helloworld: testtopic
|
程序结构:
包结构
5.读写测试
通过执行kafkaapplication的main方法启动程序。然后打开postman进行测试:
运行后返回success
生产者日志:
消费者日志:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.jianshu.com/p/3dcb64e49ac5?utm_source=tuicool&utm_medium=referral