SpringBoot整合RabbitMq(二)

时间:2024-10-16 13:34:20

       本文序列化和添加package参考:https://www.jianshu.com/p/13fd9ff0648d

RabbitMq安装

[root@topcheer ~]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
elasticsearch latest 874179f19603 11 days ago 771 MB
springbootdemo4docker latest cd13bc7f56a0 2 weeks ago 678 MB
docker.io/tomcat latest ee48881b3e82 4 weeks ago 506 MB
docker.io/rabbitmq latest a00bc560660a 4 weeks ago 147 MB
docker.io/centos latest 67fa590cfc1c 7 weeks ago 202 MB
docker.io/redis latest f7302e4ab3a8 8 weeks ago 98.2 MB
docker.io/rabbitmq 3.7.16-management 3f92e6354d11 2 months ago 177 MB
[root@topcheer ~]# docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 3f92e6354d11
ab8a0c8bae576f12ff334b22aae36d5fd87e744062462765628b06b5a65b9005
[root@topcheer ~]# docker ps -l
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
ab8a0c8bae57 3f92e6354d11 "docker-entrypoint..." 27 seconds ago Up 26 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp myrabbitmq
[root@topcheer ~]#
 SpringBoot整合RabbitMq(二)

账号密码都为guest,创建交换机

SpringBoot整合RabbitMq(二)

SpringBoot整合RabbitMq(二)

进行交换机和队列进行绑定

SpringBoot整合RabbitMq(二)

SpringBoot整合RabbitMq(二)

SpringBoot整合RabbitMq(二)

SpringBoot整合RabbitMq(二)

Springboot开发

 <dependencies>
<!--消息队列依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--fastjson依赖-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--测试依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

启动类

 /**
* 自动配置
* 1、RabbitAutoConfiguration
* 2、有自动配置了连接工厂ConnectionFactory;
* 3、RabbitProperties 封装了 RabbitMQ的配置
* 4、 RabbitTemplate :给RabbitMQ发送和接受消息;
* 5、 AmqpAdmin : RabbitMQ系统管理功能组件;
* AmqpAdmin:创建和删除 Queue,Exchange,Binding
* 6、@EnableRabbit + @RabbitListener 监听消息队列的内容
*
*/
@MapperScan("com.topcheer.*.*.dao")
@SpringBootApplication
@EnableCaching
@EnableRabbit
public class Oss6Application {

public static void main(String[] args) {
SpringApplication.run(Oss6Application.class, args);
}

}

配置文件

 spring:
rabbitmq:
host: 192.168.180.113
username: guest
password: guest

Bo类

 /**
* @author WGR
* @create 2019/9/3 -- 0:34
*/
@Document(indexName = "topcheer",type = "book" )
@Slf4j
@Data
//@Builder 用这个来构造,反序列化的时候会出问题
public class Book implements Serializable {

private Integer id;
private String name;
private String author; public Book(String name, String author) {
this.name = name;
this.author = author;
}

public Book(Integer id, String name, String author) {
this.id = id;
this.name = name;
this.author = author;
}

public Book() {
}
}

MessageConverter

我们先来创建一个转换的实现类,只需要继承抽象类AbstractMessageConverter并实现内部的createMessagefromMessage两个方法就可以完成实体类的序列化反序列化的转换,代码如下所示:

 /**
* 自定义消息转换器
* 采用FastJson完成消息转换
*
* @author:于起宇 <br/>
* ===============================
* Created with Eclipse.
* Date:2017/10/26
* Time:19:28
* 简书:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
public class RabbitMqFastJsonConverter
extends AbstractMessageConverter {
/**
* 日志对象实例
*/
private Logger logger = LoggerFactory.getLogger(RabbitMqFastJsonConverter.class);
/**
* 消息类型映射对象
*/
private static ClassMapper classMapper = new DefaultClassMapper();
/**
* 默认字符集
*/
private static String DEFAULT_CHART_SET = "UTF-8";

/**
* 创建消息
*
* @param o 消息对象
* @param messageProperties 消息属性
* @return
*/
@Override
protected Message createMessage(Object o, MessageProperties messageProperties) {
byte[] bytes = null;
try {
String jsonString = JSON.toJSONString(o);
bytes = jsonString.getBytes(DEFAULT_CHART_SET);
} catch (IOException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(DEFAULT_CHART_SET);
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
classMapper.fromClass(o.getClass(), messageProperties);
return new Message(bytes, messageProperties);
}

/**
* 转换消息为对象
*
* @param message 消息对象
* @return
* @throws MessageConversionException
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
if (contentType != null && contentType.contains("json")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = DEFAULT_CHART_SET;
}
try {
Class<?> targetClass = classMapper.toClass(
message.getMessageProperties());

content = convertBytesToObject(message.getBody(),
encoding, targetClass);
} catch (IOException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
} else {
logger.warn("Could not convert incoming message with content-type ["
+ contentType + "]");
}
}
if (content == null) {
content = message.getBody();
}
return content;
}

/**
* 将字节数组转换成实例对象
*
* @param body Message对象主体字节数组
* @param encoding 字符集
* @param clazz 类型
* @return
* @throws UnsupportedEncodingException
*/
private Object convertBytesToObject(byte[] body, String encoding,
Class<?> clazz) throws UnsupportedEncodingException {
String contentAsString = new String(body, encoding);
return JSON.parseObject(contentAsString, clazz);
}
}
在该转换类内我们使用了DefaultClassMapper来作为类的映射,我们可以先来看下该类相关信任package的源码,如下所示: ......
public class DefaultClassMapper implements ClassMapper, InitializingBean {
public static final String DEFAULT_CLASSID_FIELD_NAME = "__TypeId__";
private static final String DEFAULT_HASHTABLE_TYPE_ID = "Hashtable";
// 默认信任的package列表
private static final List<String> TRUSTED_PACKAGES = Arrays.asList("java.util", "java.lang");
private final Set<String> trustedPackages;
private volatile Map<String, Class<?>> idClassMapping;
private volatile Map<Class<?>, String> classIdMapping;
private volatile Class<?> defaultMapClass;
private volatile Class<?> defaultType;

public DefaultClassMapper() {
// 构造函数初始化信任的package为默认的pakcage列表
// 仅支持java.util、java.lang两个package
this.trustedPackages = new LinkedHashSet(TRUSTED_PACKAGES);
this.idClassMapping = new HashMap();
this.classIdMapping = new HashMap();
this.defaultMapClass = LinkedHashMap.class;
this.defaultType = LinkedHashMap.class;
}
......

RabbitMqConfiguration

下面我们需要将该转换设置到RabbitTemplateSimpleRabbitListenerContainerFactory内,让RabbitMQ支持自定义的消息转换,如下所示:

 /**
* rabbitmq 相关配置
* @author:于起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/11
* Time:下午5:42
* 简书:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@Configuration
public class RabbitMqConfiguration {


/**
* 配置消息队列模版
* 并且设置MessageConverter为自定义FastJson转换器
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new RabbitMqFastJsonConverter());
return template;
}

/**
* 自定义队列容器工厂
* 并且设置MessageConverter为自定义FastJson转换器
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new RabbitMqFastJsonConverter());
factory.setDefaultRequeueRejected(false);
return factory;
}

}

重写DefaultClassMapper构造函数

不加这个会报错,显示这个类没有被信任

创建一个名为RabbitMqFastJsonClassMapper的类并且继承DefaultClassMapper,如下所示:

 /**
* fastjson 转换映射
*
* @author:于起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/13
* Time:下午10:17
* 简书:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
public class RabbitMqFastJsonClassMapper extends DefaultClassMapper {
/**
* 构造函数初始化信任所有pakcage
*/
public RabbitMqFastJsonClassMapper() {
super();
setTrustedPackages("*");
}
}

在上面构造函数内我们设置了信任全部的package,添加了RabbitMqFastJsonClassMapper类后,需要让MessageConverter使用该类作为映射,修改RabbitMqFastJsonConverter部分代码如下所示:

/**
* 消息类型映射对象
*/
private static ClassMapper classMapper = new DefaultClassMapper();
>>> 修改为 >>>
/**
* 消息类型映射对象
*/
private static ClassMapper classMapper = new RabbitMqFastJsonClassMapper();

监听类

 @Service
public class BookService {

@RabbitListener(queues = "topcheer.news")
public void receive(Book book){
System.out.println("收到消息:"+book);
}

@RabbitListener(queues = "topcheer")
public void receive02(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}

测试类


 
   /**
* 1、单播(点对点)
*/
@Test
public void contextLoads() {
//Message需要自己构造一个;定义消息体内容和消息头
//rabbitTemplate.send(exchage,routeKey,message);

//object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
//rabbitTemplate.convertAndSend(exchage,routeKey,object);
Map<String,Object> map = new HashMap<>();
map.put("msg","这是第一个消息");
map.put("data", Arrays.asList("helloworld",123,true));
//对象被默认序列化以后发送出去
rabbitTemplate.convertAndSend("exchange-direct","topcheer",new Book("红楼梦","曹雪芹"));

}

//接受数据,如何将数据自动的转为json发送出去
@Test
public void receive(){
Object o = rabbitTemplate.receiveAndConvert("topcheer.news");
// System.out.println(o.getClass());
System.out.println(o);
}

/**
* 广播
*/
@Test
public void sendMsg(){
rabbitTemplate.convertAndSend("exchange-fanout","",new Book("红楼梦1","曹雪芹1"));
}

测试结果如下:

SpringBoot整合RabbitMq(二)

2019-10-11 22:20:50.730  INFO --- [           main] tMqFastJsonConverter : 消息为对象
Book(id=null, name=红楼梦, author=曹雪芹)
2019-10-11 22:39:04.284  INFO --- [ntContainer#1-1] tMqFastJsonConverter : 消息为对象
[B@4da0a5ae
MessageProperties [headers={__TypeId__=com.topcheer.oss.shiro.bo.Book}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange-direct, receivedRoutingKey=topcheer, deliveryTag=4, consumerTag=amq.ctag-mFGYgGzoHK3Utt8uk2Hxdg, consumerQueue=topcheer]