这两天闲着没事玩了下RabbitMQ.
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求..
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
关于基本的介绍网上太多了..我这里就不赘述了.exchange, queue, channel这些基本概念也就说了.
只介绍一点, exchange一般用的有三种, 有兴趣童鞋可以参考此博客-----> 点这里.
开始正题!
在开发之前需要下载rabbitmq, 而在rabbitmq安装之前,童鞋们需要安装erlang, 因为rabbitmq是用erlang写的.
安装完毕之后,我们建立一个maven项目.然后我们开始配置项目.
由于是spring整合,我们需要加入spring的依赖.
<!-- spring版本号 -->
<>3.2.</>
<!-- 添加Spring依赖 -->
<dependency>
<groupId></groupId>
<artifactId>spring-core</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-webmvc</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-context</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-context-support</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-aop</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-aspects</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-tx</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-jdbc</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-web</artifactId>
<version>${}</version>
</dependency>
然后加入rabbitmq和spring的整合依赖
<!--rabbitmq依赖 -->
<dependency>
<groupId></groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.</version>
</dependency>
依赖加好了之后, 我们需要定义消息生产者和消息发送者.
由于exchange有几种,这里我只测试了两种, 通过分别定义两个exchange去绑定direct和topic..
首先, 定义消息生产者, 通过配置将template链接connect-factory并注入到代码中使用.
package ;
import org.;
import org.;
import ;
import ;
import ;
import ;
/**
* Created by wuxing on 2016/9/21.
*/
@Service
public class MessageProducer {
private Logger logger = ();
@Resource(name="amqpTemplate")
private AmqpTemplate amqpTemplate;
@Resource(name="amqpTemplate2")
private AmqpTemplate amqpTemplate2;
public void sendMessage(Object message) throws IOException {
("to send message:{}", message);
("queueTestKey", message);
("queueTestChris", message);
("", message);
}
}
然后我们定义消息消费者, 这里,我定义了三个消费者, 通过监听消息队列, 分别接受各自所匹配的消息.
第一个消费者, 接受direct的消息, 他的exchange为exchangeTest, rout-key为queueTestKey
package ;
import org.;
import org.;
import ;
import ;
/**
* Created by wuxing on 2016/9/21.
*/
public class MessageConsumer implements MessageListener {
private Logger logger = ();
@Override
public void onMessage(Message message) {
("consumer receive message------->:{}", message);
}
}
package ;
import org.;
import org.;
import ;
import ;
/**
* Created by wuxing on 2016/9/21.
*/
public class ChrisConsumer implements MessageListener {
private Logger logger = ();
@Override
public void onMessage(Message message) {
("chris receive message------->:{}", message);
}
}
第三个消费者, 接受topic的消息他的exchange为exchangeTest2, pattern为wuxing.*.. 网上说.*可以匹配一个, .#可以匹配一个或多个..
但是笔者好像两个都试了..都可以匹配一个或多个..不知道什么鬼...
package ;
import org.;
import org.;
import ;
import ;
/**
* Created by wuxing on 2016/9/21.
*/
public class WuxingConsumer implements MessageListener {
private Logger logger = ();
@Override
public void onMessage(Message message) {
("wuxing receive message------->:{}", message);
}
}
然后就是关键的地方了..rabbit整合spring的配置文件.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="/schema/beans"
xmlns:xsi="http:///2001/XMLSchema-instance" xmlns:rabbit="/schema/rabbit"
xsi:schemaLocation="/schema/beans
/schema/beans/spring-beans-3.
/schema/rabbit
/schema/rabbit/spring-rabbit-1.">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory
username="guest" password="guest" host="localhost" port="5672" />
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template connection-factory="connectionFactory"
exchange="exchangeTest"/>
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/>
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息接收者 -->
<bean class=""></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageReceiver"/>
</rabbit:listener-container>
<!--定义queue -->
<rabbit:queue name="queueChris" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/>
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueChris" key="queueTestChris"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息接收者 -->
<bean class=""></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="queueChris" ref="receiverChris"/>
</rabbit:listener-container>
<!-- 分隔线 -->
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory
username="guest" password="guest" host="localhost" port="5672"/>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template connection-factory="connectionFactory2"
exchange="exchangeTest2"/>
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="connectionFactory2"/>
<!--定义queue -->
<rabbit:queue name="queueWuxing" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin2"/>
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:topic-exchange name="exchangeTest2" durable="true" auto-delete="false" declared-by="connectAdmin2">
<rabbit:bindings>
<rabbit:binding queue="queueWuxing" pattern="wuxing.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 消息接收者 -->
<bean class=""></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory2" >
<rabbit:listener queues="queueWuxing" ref="recieverWuxing"/>
</rabbit:listener-container>
</beans>
这里,有个问题笔者研究了好久...就是如何定义两个exchange, 一开始一直不成功..直到找到了一篇国外的文章才解决...
定义两个exchange的时候, 需要用到declared-by..
而这个必须要引入下面的这个申明, 才有..
/schema/rabbit
/schema/rabbit/spring-rabbit-1.">
文件中大概的配置解释一下.
connect-factory进行连接rabbitmq服务.
template用于连接factory并指定exchange, 这上面还能直接指定rout-key.
admin相当于一个管理员的角色..可以将exchange和queue进行管理,
queue和topic-exchange分别定义队列和路由器, 这里需要用declared-by指定管理员,从而连接到相应的factory.
listener-container用于消费者的监听(其实,rabbit配置中是可以指定某个类的某个方法的, 但是笔者失败了, 还在试验中...)
这里还有一个问题...需要大家注意..
当一个exchange绑定了一种类型之后, 这个exchange在配置就不能再换成另一种了.会一直报错, received 'direct' but current is 'topic' 类似这种..
笔者这个也是被坑了若干时间去找问题...
然后贴下spring的基本配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="/schema/beans"
xmlns:xsi="http:///2001/XMLSchema-instance" xmlns:context="/schema/context"
xsi:schemaLocation="/schema/beans /schema/beans/spring-beans-3.
/schema/context /schema/context/spring-context-3.">
<import resource="classpath*:" />
<!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->
<context:component-scan base-package=", " />
<!-- 激活annotation功能 -->
<context:annotation-config />
<!-- 激活annotation功能 -->
<context:spring-configured />
</beans>
package ;
import ;
import ;
import ;
import org.;
import org.;
import ;
import ;
/**
* Created by wuxing on 2016/9/21.
*/
public class MessageTest {
private Logger logger = ();
private ApplicationContext context = null;
@Before
public void setUp() throws Exception {
context = new ClassPathXmlApplicationContext("");
}
@Test
public void should_send_a_amq_message() throws Exception {
MessageProducer messageProducer = (MessageProducer) ("messageProducer");
int a = 100;
while (a > 0) {
("Hello, I am amq sender num :" + a--);
try {
//暂停一下,好让消息消费者去取消息打印出来
(1000);
} catch (InterruptedException e) {
();
}
}
}
}
然后控制台的结果如下(这里只贴出关键信息, 其他配置的log的省略了)
2016-09-22 16:15:00,330 [main] INFO [] - to send message:Hello, I am amq sender num :100
2016-09-22 16:15:00,348 [main] DEBUG [] - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3)
2016-09-22 16:15:00,348 [main] DEBUG [] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3)
2016-09-22 16:15:00,349 [main] DEBUG [] - Publishing message on exchange [exchangeTest], routingKey = [queueTestKey]
2016-09-22 16:15:00,357 [main] DEBUG [] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3)
2016-09-22 16:15:00,358 [main] DEBUG [] - Publishing message on exchange [exchangeTest], routingKey = [queueTestChris]
2016-09-22 16:15:00,368 [main] DEBUG [] - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,2)
2016-09-22 16:15:00,369 [main] DEBUG [] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)
2016-09-22 16:15:00,369 [main] DEBUG [] - Publishing message on exchange [exchangeTest2], routingKey = []
2016-09-22 16:15:00,370 [pool-1-thread-6] DEBUG [] - Storing delivery for Consumer: tags=[[-hyW85GZHk-AHLLFJUmNLDQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
2016-09-22 16:15:00,372 [SimpleAsyncTaskExecutor-1] DEBUG [] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestKey, deliveryTag=1, messageCount=0])
2016-09-22 16:15:00,373 [SimpleAsyncTaskExecutor-1] INFO [] - consumer receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestKey, deliveryTag=1, messageCount=0])
2016-09-22 16:15:00,374 [SimpleAsyncTaskExecutor-1] DEBUG [] - Retrieving delivery for Consumer: tags=[[-hyW85GZHk-AHLLFJUmNLDQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
2016-09-22 16:15:00,379 [pool-2-thread-4] DEBUG [] - Storing delivery for Consumer: tags=[[-T-c1red0T_HHyCFfpXLYIQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
2016-09-22 16:15:00,381 [SimpleAsyncTaskExecutor-1] DEBUG [] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest2, receivedRoutingKey=, deliveryTag=1, messageCount=0])
2016-09-22 16:15:00,382 [SimpleAsyncTaskExecutor-1] INFO [] - wuxing receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest2, receivedRoutingKey=, deliveryTag=1, messageCount=0])
2016-09-22 16:15:00,383 [SimpleAsyncTaskExecutor-1] DEBUG [] - Retrieving delivery for Consumer: tags=[[-T-c1red0T_HHyCFfpXLYIQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
2016-09-22 16:15:00,396 [pool-1-thread-5] DEBUG [] - Storing delivery for Consumer: tags=[[-h5ERpaWrnqmkNhbfM7S8Ww]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0
2016-09-22 16:15:00,397 [SimpleAsyncTaskExecutor-1] DEBUG [] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestChris, deliveryTag=1, messageCount=0])
2016-09-22 16:15:00,398 [SimpleAsyncTaskExecutor-1] INFO [] - chris receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestChris, deliveryTag=1, messageCount=0])
2016-09-22 16:15:00,400 [SimpleAsyncTaskExecutor-1] DEBUG [] - Retrieving delivery for Consumer: tags=[[-h5ERpaWrnqmkNhbfM7S8Ww]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0
我们可以看到生产者有发出一个信息, 然后发布在了三个通道上.
1. on exchange [exchangeTest] , routingKey = [queueTestKey]
2. on exchange [exchangeTest] , routingKey = [queueTestChris]
3. on exchange [exchangeTest2] , routingKey = []
然后三个消费者分别收到了他们的消息..至此, 整个test就结束了.
对项目有兴趣的童鞋可以拿项目的源码玩一玩 源码在这里
文章中有什么错误或者不妥的地方, 还望大家指正, 和大家共勉!!!