最近学习到ActiveMQ,之前也没有用过相关或者类似的工具,因此特地写个文章进行相关的学习记录。
相关参考博文:https://www.cnblogs.com/cyfonly/p/6380860.html、https://blog.csdn.net/qq_26641781/article/details/80408987、https://blog.csdn.net/qinweili751/article/details/80620104
1.安装ActiveMQ
(1)进入官网http://activemq.apache.org/,选择最新的版本下载
(2)再选择对应的系统环境(我这里选择的是windows版本)
(3)下载完成后将其解压(我这里将它存放在D盘根目录下),目录结构如下
(4)进入bin/win64/目录,启动activemq.bat文件(注意:MQ与jdk版本必须要匹配。我这里下载的MQ是5.15版本,对应的jdk最低要求是1.8)。
(5)启动完成后,输入浏览器地址http://localhost:8161/admin,会弹出用户名/密码输入框
我们的账号密码是存放在ActiveMQ根目录的conf/jetty-realm.properties文件中。
打开可以看到最下面已经有两个创建好了的用户了。如果我们需要添加自己的用户,或是修改它们的角色,都可以按照上面所写的格式"用户名:密码 [,角色]"来进行配置(角色被定义在~/conf/jetty.xml中)。
这里我们使用默认帐号,admin/admin
(6)至此,我们的ActiveMQ已经安装完成。
2.使用ActiveMQ
- ActiveMQ的使用一般分为以下几个步骤:
- connectionFactory:创建连接工厂;
- connection:从连接工厂中得到连接;
- session:从连接中获得一个会话;
- destination:从会话中获取一个destination。可以是Queue(P2P)或Topic(Pub/Sub)
- Producer:根据session和destination创建服务生产者。
- Message:根据session创建消息。
- send:消息生产者将message发送给MQ
- Consumer:根据session和destination创建服务消费者。
- receive:接收MQ中的消息。可以是同步接收,也可以是创建监听器异步接收。
- 关闭资源。
由于我这里是Springboot的项目,因此有部分步骤已经在自动配置中处理好了。
(1)引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.4</version> </dependency>
(2)添加相关配置
#默认端口是61616,而不是我们访问网站的8161端口
spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=false
(3)添加配置类(PS:如果不配置该类,默认只会使用P2P,即设置Queue为destination。如果要使用Topic,则必须要配置下面的类)。
@Configuration public class JmsConfig { @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(false); //Queue是P2P,因此Pub/Sub设置为false。默认是false。 factory.setConnectionFactory(connectionFactory); return factory; } @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(true); //Topic是Pub/Sub,需要显示声明。 factory.setConnectionFactory(connectionFactory); return factory; } }
(4)编写服务消费者
这里为了能区别P2P和Pub/Sub,创建了两个服务消费者。
消费者1
@Service public class ConsumerService { @JmsListener(destination = "springboot.queue.test", containerFactory = "jmsListenerContainerQueue") public void receiveQueue(String msg) { System.out.println(LocalDateTime.now().toString() + " consumer接收到Queue消息:" + msg); } @JmsListener(destination = "springboot.topic.test", containerFactory = "jmsListenerContainerTopic") public void receiveTopic(String msg) { System.out.println(LocalDateTime.now().toString() + " consumer接收到Topic消息:" + msg); } }
消费者2
@Service public class Consumer2Service { @JmsListener(destination = "springboot.queue.test", containerFactory = "jmsListenerContainerQueue") public void receiveQueue(String msg) { System.out.println(LocalDateTime.now().toString() + " consumer2接收Queue消息:" + msg); } @JmsListener(destination = "springboot.topic.test", containerFactory = "jmsListenerContainerTopic") public void receiveTopic(String msg) { System.out.println(LocalDateTime.now().toString() + " consumer2接收到Topic消息:" + msg); } }
(5)编写服务生产者
@Service public class ProducerService { @Autowired private JmsTemplate jmsTemplate; public void sendMessage(Destination destination, String msg) { System.out.println(LocalDateTime.now().toString() + " productor发送消息:" + msg); jmsTemplate.convertAndSend(destination, msg); } }
(6)测试类
先测试P2P下的消息:
@Test public void testMQQueue() { Destination destination = new ActiveMQQueue("springboot.queue.test"); for (int i = 0; i < 3; i++) { producerService.sendMessage(destination, "hellow world " + i); } }
输出结果
2019-05-22T17:29:39.324 productor发送消息:hellow world 0
2019-05-22T17:29:39.373 consumer2接收Queue消息:hellow world 0
2019-05-22T17:29:39.379 productor发送消息:hellow world 1
2019-05-22T17:29:39.385 productor发送消息:hellow world 2
2019-05-22T17:29:39.388 consumer接收到Queue消息:hellow world 1
2019-05-22T17:29:39.391 consumer2接收Queue消息:hellow world 2
可以看到生产者每发出一个消息,都只会有一个消费者对消息进行处理。并且这里采用的是轮询的方式,即这次是消费者1接收了消息,下次就是消费者2接收,再下次又是消费者1。以此类推。
然后我们再测试下Pub/Sub的消息:
@Test public void testMQTopic() { Destination destination = new ActiveMQTopic("springboot.topic.test"); for (int i = 0; i < 2; i++) { producerService.sendMessage(destination, "hellow world " + i); } }
输出结果:
2019-05-22T17:35:58.535 productor发送消息:hellow world 0
2019-05-22T17:35:58.576 productor发送消息:hellow world 1
2019-05-22T17:35:58.581 consumer接收到Topic消息:hellow world 0
2019-05-22T17:35:58.582 consumer接收到Topic消息:hellow world 1
2019-05-22T17:35:58.582 consumer2接收到Topic消息:hellow world 0
2019-05-22T17:35:58.584 consumer2接收到Topic消息:hellow world 1
这里可以看到,每一个消息被发出来后,会被所有的服务消费者接收并处理。