情景分析:
1.在项目中,添加商品的业务逻辑中,需要添加一个同步索引库的业务逻辑。
如果把代码直接写在本项目中,那么就会导致业务逻辑耦合度高,业务拆分不明确
2.我们把业务逻辑在***search项目中实现,调用服务在***manager实现,将业务逻辑分开实现,但是这样又会导致服务之间的耦合度变高,服务的启动有先后顺序,如果调用服务先启动,执行服务后启动,那么调用服务将会失败。
3.这个时候我们需要一个消息通信机制,这里ActiveMQ应运而生,它是一个消息队列,也是一个消息中间件。
我之前一直做的是Android开发,其实这种现象不管在什么平台上应该都是经常遇到的问题,两个相对独立的模块之前需要进行通信,都有一定的方式,比如说Android中用到的EventBus,它的作用和ActiveMQ的作用一样,通过注册的方式监听通知,收到特定的通知后执行相应的处理逻辑,相信大家对这点并不陌生,那么下面我们一起来学一学ActiveMQ的相关知识,同时我也会通过一个简单的demo来给大家一个最直观的展示。
一、什么是ActiveMQ?
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:
1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支持通过JDBC和journal提供高速的消息持久化
7. 从设计上保证了高性能的集群,客户端-服务器,点对点
8. 支持Ajax
9. 支持与Axis的整合
10. 可以很容易得调用内嵌JMS provider,进行测试
二、ActiveMQ的消息形式
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· StreamMessage – Java原始值的数据流
· MapMessage–一套名称-值对
· TextMessage–一个字符串对象
· ObjectMessage–一个序列化的 Java对象
· BytesMessage–一个字节的数据流
三、ActiveMQ的安装和启动
进入http://activemq.apache.org/下载ActiveMQ
安装步骤
1.把ActiveMQ的压缩包上传到Linux服务器上
2.解压缩
3.进入到apache-activemq-5.12.0/bin目录下使用activemq命令
启动:
./activemq start关闭
./activemq stop查看状态
./activemq status
注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2
可以看到我们的activemq已经启动了,那么我们通过浏览器访问一下ActiveMQ的控制台。
进入到http://192.168.25.88:8161/admin/,用户名和密码都是admin,注意,这里的端口号使用的是8161默认端口号。
看到这个页面,说明我们的ActiveMQ正常工作了,系不系很开心啊~
但素,如果你这个时候点击导航栏的第二个按钮Queues选项时,出现了503错误,如下图所示
那么恭喜你,这个问题很正常的啦,幸好这里有应对方法,大家点击链接跳转到 《ActiveMQ点击Queues选项报503错误的解决方法》 ,就可以解决这个问题了~记得修改完了一定要重启ActiveMQ,否则不会更新滴~
四、ActiveMQ的使用方法
4.1 新建一个工程activemq-demo
首先来看一下工程框架
4.2 代码分析
下面我们分析一下工程正常运行起来的配置代码。
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.activemq</groupId>
<artifactId>activeMQ-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<dependencies>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<configuration>
<path>/</path>
<port>8080</port>
</configuration>
</plugin>
</plugins>
</build>
</project>
我们添加了activemq的依赖包:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
4.3 创建Queue的生产者Producer
这里我在test文件夹下新建了一个文件TestActiveMQ.java,用来编写测试用例,这里直接贴上第一段代码,具体讲解大家可以看注释,写的很清楚。
package com.activemq.test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
/**
* ActiveMQ测试类
* @author zhangyan
*
*/
public class TestActiveMQ {
//queue
//producer
@Test
public void testQueueProducer() throws Exception{
//创建一个连接工厂对象ConnectionFactory对象,需要指定activeMQ的ip及端口
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.88:61616");
//使用连接工厂创建连接
Connection connection = connectionFactory.createConnection();
//开启连接,调用start方法
connection.start();
//使用connection对象创建一个session对象
//第一个参数表示是否开启事务,这是mq的事务,一般不使用事务,保证数据的最终一致,可以使用消息队列实现
//如果第一个参数为true,那么第二个参数自动忽略,人如果不开启事务,第二个参数为消息的应答模式,一般自动应答就可以
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用session对象创建一个Destination对象,一种是topic,另一种是queue,现在使用queue
//参数表示消息队列的名称
Queue queue = session.createQueue("test-queue");
//使用session对象创建一个producer对象
MessageProducer producer = session.createProducer(queue);
//创建一个TextMessage对象
/*TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("hello activemq");*/
TextMessage textMessage = session.createTextMessage("hello activemq");
//发送消息
producer.send(textMessage);
//关闭资源
producer.close();
session.close();
connection.close();
}
}
这里Queue queue = session.createQueue(“test-queue”);表示创建了一个名称为“test-queue”的消息。
右键执行上面的测试方法,运行成功的话就是已经在消息队列中生成了一个点对点的消息并且已经发出去了,那么我们来看看ActiveMQ的管理中心
可以看到管理中心确实有一个test-queue消息,数量为1,说明我们已经成功的了发送了一条消息,点击test-queue继续看看
进入到详情后我们可以看到我们具体的消息内容。
4.4 创建Queue的消费者Consumer
既然我们之前已经正确的发出了一条消息,那么我们现在需要创建一个消费者来消费掉对应的那条消息,因为队列中的消息如果发出后只要有一个消费者接收的话,那么这条消息就消费掉了,后来创建的consumer并不能收到之前发出的消息,这里我们继续在测试类中添加代码。
//queue
//consumer
@Test
public void testQueueConsumer() throws Exception{
//创建一个连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.88:61616");
//使用连接工厂对象创建一个连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用连接对象创建一个session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用session创建一个Destination,destination应该和消息的发送者一致
Queue queue = session.createQueue("test-queue");
//使用session创建一个消费者对象Consumer
MessageConsumer consumer = session.createConsumer(queue);
//向Consumer对象中设置一个MessageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
//取消息的内容
if (msg instanceof TextMessage) {
TextMessage textMessage = (TextMessage) msg;
//打印消息内容
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//系统等待接收消息
/*while(true) {
Thread.sleep(100);
}*/
System.in.read();
//关闭连接
consumer.close();
session.close();
connection.close();
}
邮件执行这段代码,观察console打印的内容:
看到了吧,这里我们的消费者将之前的生产者发出的消息给接收到了,然后我们来看看ActiveMQ管理中心有什么变化
这里很明显,队列中的消息个数变成了0,消费者的个数由0变成了1,消费掉的消息个数也从0变成了1。
4.5 创建Topic的生产者producer
前面我们已经实现了queue消息队列的形式,下面我们要实现的是topic发布/订阅模式的写法了。在代码中继续添加以下代码:
//topic
//producer
@Test
public void testTopicProducer() throws Exception {
// 创建一个连接工厂对象ConnectionFactory对象,需要指定activeMQ的ip及端口
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.88:61616");
// 使用连接工厂创建连接
Connection connection = connectionFactory.createConnection();
// 开启连接,调用start方法
connection.start();
// 使用connection对象创建一个session对象
// 第一个参数表示是否开启事务,这是mq的事务,一般不使用事务,保证数据的最终一致,可以使用消息队列实现
// 如果第一个参数为true,那么第二个参数自动忽略,人如果不开启事务,第二个参数为消息的应答模式,一般自动应答就可以
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 使用session对象创建一个Destination对象,一种是topic,另一种是queue,现在使用queue
// 参数表示消息队列的名称
Topic topic = session.createTopic("test-topic");
// 使用session对象创建一个producer对象
MessageProducer producer = session.createProducer(topic);
// 创建一个TextMessage对象
/*
* TextMessage textMessage = new ActiveMQTextMessage();
* textMessage.setText("hello activemq topic");
*/
TextMessage textMessage = session.createTextMessage("hello activemqtopic2");
// 发送消息
producer.send(textMessage);
// 关闭资源
producer.close();
session.close();
connection.close();
}
4.6 创建Topic的消费者Consumer
//topic
//consumer
@Test
public void testTopicConsumer() throws Exception{
//创建一个连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.88:61616");
//使用连接工厂对象创建一个连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用连接对象创建一个session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用session创建一个Destination,destination应该和消息的发送者一致
Topic topic = session.createTopic("test-topic");
//使用session创建一个消费者对象Consumer
MessageConsumer consumer = session.createConsumer(topic);
//向Consumer对象中设置一个MessageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
//取消息的内容
if (msg instanceof TextMessage) {
TextMessage textMessage = (TextMessage) msg;
//打印消息内容
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//系统等待接收消息
/*while(true) {
Thread.sleep(100);
}*/
System.out.println("topic消费者1");
System.in.read();
//关闭连接
consumer.close();
session.close();
connection.close();
}
这里我们先启动下面的一段测试代码,表示我们已经订阅了这个topic,当生产者发出一条topic消息时,就能被所有订阅的消费者接收到。
右键执行第二段代码,观察控制台。然后执行第一段代码,再观察控制台。
可以看到我们订阅的消息已经成功接收到了。
4.7 通过ActiveMQ管理中心发送消息
这里我们进入管理中心后,点击顶部的Topics,会进入到topic列表,这里找到我们自己设置的test-topic这个名称的topic,然后点击进入到这个详情页面,这个页面就是可以通过直接在输入框中输入内容然后点击发送按钮,这样已经订阅了这个topic的消费者就能收到发布的消息,我们这里简单的写了一句话,下面看看项目的控制台输出情况:
可以看到在之前的一条消息下面,接收了我们刚才发出的最新的消息,到此,我们的ActiveMQ基本的用法讲完了,第二部分我们会讲解如何集成到spring中使用,感谢大家~