【RabbitMQ系列】 Spring mvc整合RabbitMQ

时间:2021-05-17 20:29:07

一、linux下安装rabbitmq

1、安装erlang环境

wget http://erlang.org/download/otp_src_18.2.1.tar.gz
tar xvfz otp_src_18.2.1.tar.gz
cd otp_src_18.2.1
./configure
make install

2、安装RabbitMQ

wget http://www.rabbitmq.com/releases/rabbitmq-server/vx.x.x/rabbitmq-server-generic-unix-x.x.x.tar.xz
//xy文件压缩工具
yum install xz
//解压
xz -d rabbitmq-server-generic-unix-x.x.x.tar.xz
tar -xvf rabbitmq-server-generic-unix-x.x.x.tar
//将其移动至/usr/local/下 按自己习惯
cp -r rabbitmq_server-x.x.x /usr/local/rabbitmq
//改变环境变量
vi /etc/profile
export PATH=/usr/local/rabbitmq/sbin:$PATH
source /etc/profile
//启用MQ管理方式
rabbitmq-plugins enable rabbitmq_management #启动后台管理
rabbitmq-server -detached #后台运行rabbitmq
//设置端口号 可供外部使用
iptables -I INPUT -p tcp --dport 15672 -j ACCEPT

3、添加用户和权限

//添加用户
rabbitmqctl add_user admin admin
//添加权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
//添加用户角色
rabbitmqctl set_user_tags admin administrator  

 

二、Spring mvc整合RabbitMQ

1、添加pom.xml依赖jar包

<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.5.RELEASE</version>
</dependency>

2、添加配置applicationContext.xml

<!--配置rabbitmq开始-->
<bean id="connectionFactoryMq" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="192.168.181.201"/>
<property name="username" value="admin"/>
<property name="password" value="admin"/>
<property name="host" value="192.168.181.201"/>
<property name="port" value="5672"/>
</bean>
<bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
<constructor-arg ref="connectionFactoryMq"/>
</bean>
<!--创建rabbitTemplate消息模板类-->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactoryMq"/>
</bean>
<!--创建消息转换器为SimpleMessageConverter-->
<bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter">
</bean>
<!--创建持久化的队列-->
<bean id="queue" class="org.springframework.amqp.core.Queue">
<constructor-arg index="0" value="testQueue"></constructor-arg>
<constructor-arg index="1" value="true"></constructor-arg>
<constructor-arg index="2" value="false"></constructor-arg>
<constructor-arg index="3" value="true"></constructor-arg>
</bean>
<!--创建交换器的类型 并持久化-->
<bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange">
<constructor-arg index="0" value="testExchange"></constructor-arg>
<constructor-arg index="1" value="true"></constructor-arg>
<constructor-arg index="2" value="false"></constructor-arg>
</bean>
<util:map id="arguments"> </util:map>
<!--绑定交换器 队列-->
<bean id="binding" class="org.springframework.amqp.core.Binding">
<constructor-arg index="0" value="testQueue"></constructor-arg>
<constructor-arg index="1" value="QUEUE"></constructor-arg>
<constructor-arg index="2" value="testExchange"></constructor-arg>
<constructor-arg index="3" value="testQueue"></constructor-arg>
<constructor-arg index="4" value="#{arguments}"></constructor-arg>
</bean>
<!--用于接收消息的处理类-->
<bean id="rqmConsumer" class="com.slp.mq.RmqConsumer"></bean> <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="rqmConsumer" />
<property name="defaultListenerMethod" value="rmqProducerMessage"></property>
<property name="messageConverter" ref="serializerMessageConverter"></property>
</bean>
<!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列 queues可以传多个-->
<bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="queues" ref="queue"></property>
<property name="connectionFactory" ref="connectionFactoryMq"></property>
<property name="messageListener" ref="messageListenerAdapter"></property>
</bean>
<bean id="rmqProducer" class="com.slp.mq.RmqProducer"></bean>
<!--配置rabbitmq结束-->

3、消息实体类

package com.slp.mq;

import java.io.*;

/**
* @author sanglp
* @create 2018-02-06 14:00
* @desc rabbit消息类
**/
public class RabbitMessage implements Serializable {
/**
* 参数类型
*/
private Class<?>[] paramTypes ;
/**
* 交换器
*/
private String exchange; private Object[] params;
/**
* 路由key
*/
private String routekey; public RabbitMessage() {
} public RabbitMessage(String exchange, String routekey,Object...params) {
this.exchange = exchange;
this.params = params;
this.routekey = routekey;
} @SuppressWarnings("rawtypes")
public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)
{
this.params=params;
this.exchange=exchange;
this.routekey=routeKey;
int len=params.length;
Class[] clazzArray=new Class[len];
for(int i=0;i<len;i++) {
clazzArray[i] = params[i].getClass();
}
this.paramTypes=clazzArray;
} public byte[] getSerialBytes(){
byte[] res = new byte[0];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutput oos ;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(this);
oos.close();
res = baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
} return res;
} public Class<?>[] getParamTypes() {
return paramTypes;
} public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
} public String getExchange() {
return exchange;
} public void setExchange(String exchange) {
this.exchange = exchange;
} public Object[] getParams() {
return params;
} public void setParams(Object[] params) {
this.params = params;
} public String getRoutekey() {
return routekey;
} public void setRoutekey(String routekey) {
this.routekey = routekey;
}
}

4、生产者

package com.slp.mq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import javax.annotation.Resource;

/**
* @author sanglp
* @create 2018-02-06 14:19
* @desc 生产者
**/
public class RmqProducer { @Resource
private RabbitTemplate rabbitTemplate; /**
* 发送信息
* @param msg
*/
public void sendMessage(RabbitMessage msg){
System.out.println(rabbitTemplate.getConnectionFactory().getHost());
System.out.println(rabbitTemplate.getConnectionFactory().getPort());
System.out.println("msg"+msg);
rabbitTemplate.convertAndSend(msg.getExchange(),msg.getRoutekey(),msg);
System.out.println("发送完成"); }
}

  

5、消费者

package com.slp.mq;

/**
* @author sanglp
* @create 2018-02-06 14:23
* @desc 消费者
**/
public class RmqConsumer { public void rmqProducerMessage(Object object){
System.out.println("消费前");
RabbitMessage rabbitMessage = (RabbitMessage) object;
System.out.println(rabbitMessage.getExchange());
System.out.println(rabbitMessage.getRoutekey());
System.out.println(rabbitMessage.getParams().toString());
}
}

  

6、测试类

package com.slp;

import com.slp.mq.RabbitMessage;
import com.slp.mq.RmqConsumer;
import com.slp.mq.RmqProducer;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext; import java.util.HashMap;
import java.util.Map; /**
* @author sanglp
* @create 2018-02-06 14:36
* @desc mq测试类
**/
public class MqTest { private RmqProducer rmqProducer ;
private RmqConsumer rqmConsumer ;
@Before
public void setUp() throws Exception {
//ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("D:/web-back/web-back/myweb/web/WEB-INF/applicationContext.xml");
//context.start(); String path="web/WEB-INF/applicationContext.xml";
ApplicationContext context = new FileSystemXmlApplicationContext(path);
rmqProducer = (RmqProducer) context.getBean("rmqProducer");
rqmConsumer = (RmqConsumer)context.getBean("rqmConsumer");
}
@Test
public void test(){
String exchange = "testExchange";
String routeKey ="testQueue";
String methodName = "test";
//参数
for (int i=0;i<10;i++){
Map<String,Object> param=new HashMap<String, Object>();
param.put("data","hello"); RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);
//发送消息
rmqProducer.sendMessage(msg);
} // rqmConsumer.rmqProducerMessage(msg); }
}

 

运行结果:

没有开启消费者之前:

 【RabbitMQ系列】 Spring mvc整合RabbitMQ