本文介绍了spring boot集成kafka的示例代码,分享给大家,也给自己留个笔记
系统环境
使用远程服务器上搭建的kafka服务
- ubuntu 16.04 lts
- kafka_2.12-0.11.0.0.tgz
- zookeeper-3.5.2-alpha.tar.gz
集成过程
1.创建spring boot工程,添加相关依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
<?xml version= "1.0" encoding= "utf-8" ?>
<project xmlns= "http://maven.apache.org/pom/4.0.0" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation= "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
<modelversion> 4.0 . 0 </modelversion>
<groupid>com.laravelshao.springboot</groupid>
<artifactid>spring-boot-integration-kafka</artifactid>
<version> 0.0 . 1 -snapshot</version>
<packaging>jar</packaging>
<name>spring-boot-integration-kafka</name>
<description>demo project for spring boot</description>
<parent>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-parent</artifactid>
<version> 2.0 . 0 .release</version>
<relativepath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceencoding>utf- 8 </project.build.sourceencoding>
<project.reporting.outputencoding>utf- 8 </project.reporting.outputencoding>
<java.version> 1.8 </java.version>
</properties>
<dependencies>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter</artifactid>
</dependency>
<!--kafka-->
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-kafka</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-json</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-test</artifactid>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-maven-plugin</artifactid>
</plugin>
</plugins>
</build>
</project>
|
2.添加配置信息,这里使用yml文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
spring:
kafka:
bootstrap-servers:x.x.x.x: 9092
producer:
value-serializer: org.springframework.kafka.support.serializer.jsonserializer
consumer:
group-id: test
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.jsondeserializer
properties:
spring:
json:
trusted:
packages: com.laravelshao.springboot.kafka
|
3.创建消息对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public class message {
private integer id;
private string msg;
public message() {
}
public message(integer id, string msg) {
this .id = id;
this .msg = msg;
}
public integer getid() {
return id;
}
public void setid(integer id) {
this .id = id;
}
public string getmsg() {
return msg;
}
public void setmsg(string msg) {
this .msg = msg;
}
@override
public string tostring() {
return "message{" +
"id=" + id +
", msg='" + msg + '\ '' +
'}' ;
}
}
|
4.创建生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package com.laravelshao.springboot.kafka;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.component;
/**
* created by shaoqinghua on 2018/3/23.
*/
@component
public class producer {
private static logger log = loggerfactory.getlogger(producer. class );
@autowired
private kafkatemplate kafkatemplate;
public void send(string topic, message message) {
kafkatemplate.send(topic, message);
log.info( "producer->topic:{}, message:{}" , topic, message);
}
}
|
5.创建消费者,使用@ kafkalistener注解监听主题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
package com.laravelshao.springboot.kafka;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.component;
/**
* created by shaoqinghua on 2018/3/23.
*/
@component
public class consumer {
private static logger log = loggerfactory.getlogger(consumer. class );
@kafkalistener (topics = "test_topic" )
public void receive(consumerrecord<string, message> consumerrecord) {
log.info( "consumer->topic:{}, value:{}" , consumerrecord.topic(), consumerrecord.value());
}
}
|
6.发送消费测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.laravelshao.springboot;
import com.laravelshao.springboot.kafka.message;
import com.laravelshao.springboot.kafka.producer;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.context.applicationcontext;
@springbootapplication
public class integrationkafkaapplication {
public static void main(string[] args) throws interruptedexception {
applicationcontext context = springapplication.run(integrationkafkaapplication. class , args);
producer producer = context.getbean(producer. class );
for ( int i = 1 ; i < 10 ; i++) {
producer.send( "test_topic" , new message(i, "test topic message " + i));
thread.sleep( 2000 );
}
}
}
|
可以依次看到发送消息,消费消息
异常问题
反序列化异常(自定义的消息对象不在kafka信任的包路径下)?
[org.springframework.kafka.kafkalistenerendpointcontainer#0-0-c-1] error org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.719 container exception
org.apache.kafka.common.errors.serializationexception: error deserializing key/value for partition test_topic-0 at offset 9. if needed, please seek past the record to continue consumption.
caused by: java.lang.illegalargumentexception: the class 'com.laravelshao.springboot.kafka.message' is not in the trusted packages: [java.util, java.lang]. if you believe this class is safe to deserialize, please provide its name. if the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.getclassidtype(defaultjackson2javatypemapper.java:139)
at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype(defaultjackson2javatypemapper.java:113)
at org.springframework.kafka.support.serializer.jsondeserializer.deserialize(jsondeserializer.java:191)
at org.apache.kafka.clients.consumer.internals.fetcher.parserecord(fetcher.java:923)
at org.apache.kafka.clients.consumer.internals.fetcher.access$2600(fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.fetcher$partitionrecords.fetchrecords(fetcher.java:1100)
at org.apache.kafka.clients.consumer.internals.fetcher$partitionrecords.access$1200(fetcher.java:949)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchrecords(fetcher.java:570)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchedrecords(fetcher.java:531)
at org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer.java:1146)
at org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer.java:1103)
at org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.run(kafkamessagelistenercontainer.java:667)
at java.util.concurrent.executors$runnableadapter.call(executors.java:511)
at java.util.concurrent.futuretask.run(futuretask.java:266)
at java.lang.thread.run(thread.java:745)
解决方法:将当前包添加到kafka信任的包路径下
1
2
3
4
5
6
7
8
|
spring:
kafka:
consumer:
properties:
spring:
json:
trusted:
packages: com.laravelshao.springboot.kafka
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://my.oschina.net/LaravelShao/blog/1788005