概述
ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。
特性
- 遵循JMS规范:ActiveMQ的各种特性是JMS1.1规范的实现。它们包括同步和异步消息传递,一次和只有一次的消息传递,对于预订者的持久消息等等。依附于JMS规范意味着,不论JMS消息提供者是谁,同样的基本特性都是有效的。(JMS可查看前篇博文(ActiveMQ基础教程(一)JMS概述https://www.jianshu.com/p/639627f88a6e)。
- 连接:ActiveMQ提供各种连接选择,包括HTTP,HTTPS,IP多点传送,SSL,STOMP,TCP,UDP,XMPP等。大量的连接协议支持使之具有更好的灵活性。
- 支持多种语言客户端:ActiveMQ对多种语言提供客户端API,除了Java之外还有C/C++、.NET、Perl、PHP、Ruby、Python等。这使得ActiveMQ能用在Java之外的其它语言中。很多其它语言都可以通过ActiveMQ提供的客户端API使用ActiveMQ的全部特性。当然,ActiveMQ代理器(broker)仍然是运行在java虚拟机上,但是客户端能够使用其它的被支持的语言。
- 可插拔的持久性和安全:ActiveMQ提供多种持久性方案可供选择,也可以完全按自己需求定制验证和授权。例如,ActiveMQ通过KahaDB提供自己的超快速消息持久方案(ultra-fast message persistence),但也支持标准的JDBC方案。ActiveMQ可以通过配置文件提供简单的验证和授权,也提供标准的JAAS登陆模块。
- 简单的管理:ActiveMQ是为开发者设计的。它并不需要专门的管理工具,因为它提供各种易用且强大的管理特性。有很多方法去监控ActiveMQ的各个方面,可以通过JMX使用JConsole或ActiveMQ web console;可以运行ActiveMQ消息报告;可以用命令行脚本;可以通过日志。
- 支持集群:为了利于扩展,多个ActiveMQ broker能够联合工作。这个方式就是network of brokers并且能支持多种拓扑结构。
安装与管理后台
安装
ActiveMQ官网下载地址:http://activemq.apache.org/download.html
ActiveMQ 提供了Windows 和Linux、Unix 等几个版本。具体安装方法请自行查找资料进行安装,博主这边就不多叙述。
管理后台
安装成功启动ActiveMQ服务后,在浏览器输入http://localhost:8161,用户名密码默认都是 admin。下面为登陆成功后的页面:
Queues页面
Queues是队列方式消息,从菜单栏中点击Queues可以进入到Queues页面,页面主要内容包括:
- Name:消息队列的名称。
- Number Of Pending Messages:未被消费的消息数目。
- Number Of Consumers:消费者的数量。
- Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。
- Messages Dequeued:出了队列的消息,可以理解为是被消费掉的消息数量。在Queues里它和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。
Topics页面
Topics是主题方式消息,从菜单栏中点击Topics可以进入到Topics页面,页面主要内容包括:
- Name:主题名称。
- Number Of Pending Messages:未被消费的消息数目。
- Number Of Consumers:消费者的数量。
- Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。
- Messages Dequeued:出了队列的消息,可以理解为是被消费掉的消息数量。在Topics里,因为多消费者从而导致数量会比入队列数高。
Subscribers页面
Subscribers 是查看订阅者的页面,可以查看订阅者的信息等。只在Topics消息类型中这个页面才会有数据。
Connections页面
Connections页面可以查看到所有的连接数。
使用
Queue消息模式
点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。
生产者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i < 10; i++) {
/** * 创建一条文本消息 */
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
System.out.println("发送消息:Activemq 发送消息" + i);
/** * 通过消息生产者发出消息 */
messageProducer.send(message);
}
}
}
运行结果图:
我们可以看的,当运行JmsProducer程序时,在ActiveMQ控制台,可以看到生产者往queue.test的队列中发送了10条消息,因为这时还没有消费者,所以这边的Number Of Pending Messages显示的是10,
Number Of Consumers显示的是0,Messages Enqueued显示的也是10。
消费者
public class JmsConsumer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
while (true) {
/** * 接收数据的时间(等待) 100 ms */
TextMessage textMessage = (TextMessage) messageConsumer.receive(1000 * 100);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
运行结果图:
我们可以看到,但运行JmsConsumer程序时,在运行程序的控制台中我们可以看到消费者消费了刚刚生产者生产的消息。在ActiveMQ控制台,可以看到所以这边的Number Of Pending Messages显示的是0,Number Of Consumers显示的是1,Messages Enqueued显示的是10,Messages Dequeued显示的也是10,即消息被消费。
在前面的消费者例子中,我们这边使用while (true) 死循环来不停接受消息。这样很浪费cpu资源,实际生产中不会这么做。下面,我们采用注册一个监听器的方法,当监听到有消息入队列后,才去接收消息。
public class JmsConsumerMessageListener {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
/** * 第六步:创建监听器 */
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
当生产者一生产消息到队列中时,我们的消费者就马上进行消费,注意程序中我们没有将会话和连接关闭,因为监听器是异步的,如果关闭后就无法接收到消息。
Topic消息模式
订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到。
发布者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String TOPIC_NAME = "topic.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为topic.test的主题 */
destination = session.createTopic(TOPIC_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i < 10; i++) {
/** * 创建一条文本消息 */
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
System.out.println("发送消息:Activemq 发送消息" + i);
/** * 通过消息生产者发出消息 */
messageProducer.send(message);
}
}
}
运行结果图:
订阅者
public class JmsConsumer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String TOPIC_NAME = "topic.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为topic.test的主题 */
destination = session.createTopic(TOPIC_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
我们可以发现,Topic消息模式的代码跟Queue消息模式的代码基本是一样的,除了在创建消息目的地的时候,一个是queue一个是topic;还有一点区别就是,Topic消息模式,订阅者需要先订阅,才能接收到发布者发布的消息。
谈谈Session
在通过Connection创建Session的时候,需要设置2个参数,一个是否支持事务,另一个是签收的模式。
签收就是消费者接受到消息后,需要告诉消息服务器,我收到消息了。当消息服务器收到回执后,本条消息将失效。因此签收将对PTP模式产生很大影响。如果消费者收到消息后,并不签收,那么本条消息继续有效,很可能会被其他消费者消费掉!
签收方式有三种:
- AUTO_ACKNOWLEDGE:表示在消费者receive消息的时候自动的签收。客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
- CLIENT_ACKNOWLEDGE:表示消费者receive消息后必须手动的调用acknowledge()方法进行签收。
- DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
发送消息的数据类型
我们上面演示的全都是字符串的消息类型,但ActiveMQ支持的还有ObjectMessage,StreamMessage,MapMessage,BytesMessage等消息类型。下面我们来看看其他消息类型是如何编写的,以下都是以队列的消息模式进行。
ObjectMessage
传输对象
public class User implements Serializable {
private static final long serialVersionUID = 2504467948968634865L;
private String userName;
private String password;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"userName='" + userName + '\'' +
", password='" + password + '\'' +
'}';
}
}
生产者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "object.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 设置所有对所有序列化包都信任 */
connectionFactory.setTrustAllPackages(true);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为object.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/** * 创建一条Object消息 */
ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) session.createObjectMessage();
for (int i = 0; i < 10; i++) {
User user = new User();
user.setUserName("hyn" + i);
user.setPassword("qwe" + i);
System.out.println("发送消息:Activemq 发送消息" + user.toString());
/** * 对象需要序列化 */
objectMessage.setObject(user);
/** * 通过消息生产者发出消息 */
messageProducer.send(objectMessage);
}
}
}
运行结果图:
消费者
public class JmsConsumerMessageListener {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "object.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 设置所有对所有序列化包都信任 */
connectionFactory.setTrustAllPackages(true);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为object.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
/** * 第六步:创建监听器 */
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
User user = (User) ((ActiveMQObjectMessage) message).getObject();
System.out.println("收到的消息:" + user.toString());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
从代码中我们可以看的,ObjectMessage跟TextMessage代码差不多,只不过有两个地方需要注意:
- 所传输的对象必须是序列化的,也就是要实现Serializable接口;
- 在创建连接工厂时,需要添加对所有或需要传输的序列化对象所在的包为白名单,这个是从ActiveMQ 5.12.2 开始为了增强这个框架的安全性,ActiveMQ将强制用户配置可序列化的包名;
BytesMessage
首先我们项目的资源目录下新建两个文件,producer.txt 和 consumer.txt,在producer.txt输入如下内容,consumer.txt为空。
生产者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "bytes.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为bytes.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/** * 创建一条Byte消息 */
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(getFileByte(System.getProperty("user.dir")+"/src/main/resources/producer.txt"));
messageProducer.send(bytesMessage);
}
/** * 读取文件 * * @param fileUrl * @return */
public static byte[] getFileByte(String fileUrl) {
byte[] buffer = null;
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
buffer = new byte[fileInputStream.available()];
fileInputStream.read(buffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return buffer;
}
}
运行结果图:
消费者
public class JmsConsumerMessageListener {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "bytes.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为bytes.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
/** * 第六步:创建监听器 */
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
FileOutputStream fileOutputStream = null;
try {
BytesMessage bytesMessage = (BytesMessage) message;
fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
byte[] content = new byte[1024];
int len;
while ((len = bytesMessage.readBytes(content)) != -1) {
fileOutputStream.write(content, 0, len);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
从结果可以看出,consumer.txt的内容结果跟product.txt内容是一致的,即消息接收成功。当然,发送文件的话我们也可以使用StreamMessage,下面我们来看看StreamMessage的使用。
StreamMessage
同样需要在项目中新建producer.txt 和 consumer.txt两个文件;
生产者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "stream.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/** * 创建一条streamMessage消息 */
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeBytes(getFileByte(System.getProperty("user.dir") + "/src/main/resources/producer.txt"));
messageProducer.send(streamMessage);
}
/** * 读取文件 * * @param fileUrl * @return */
public static byte[] getFileByte(String fileUrl) {
byte[] buffer = null;
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
buffer = new byte[fileInputStream.available()];
fileInputStream.read(buffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return buffer;
}
}
运行结果图:
消费者
public class JmsConsumerMessageListener {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "stream.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
/** * 第六步:创建监听器 */
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
FileOutputStream fileOutputStream = null;
try {
StreamMessage streamMessage = (StreamMessage) message;
fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
byte[] content = new byte[1024];
int len;
while ((len = streamMessage.readBytes(content)) != -1) {
fileOutputStream.write(content, 0, len);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
MapMessage
生产者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "map.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为map.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/** * 创建一条mapMessage消息 */
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","hyn");
mapMessage.setInt("age",27);
messageProducer.send(mapMessage);
}
}
运行结果图:
消费者
public class JmsConsumerMessageListener {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "map.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
/** * 第六步:创建监听器 */
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("name:" + mapMessage.getString("name"));
System.out.println("age:" + mapMessage.getInt("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
ActiveMQ的应用
保证消息的成功处理
消息发送成功后,接收端接收到了消息。然后进行处理,但是可能由于某种原因,高并发也好,IO阻塞也好,反正这条消息在接收端处理失败了。而点对点的特性是一条消息,只会被一个接收端给接收,只要接收端A接收成功了,接收端B,就不可能接收到这条消息,如果是一些普通的消息还好,但是如果是一些很重要的消息,比如说用户的支付订单,用户的退款,这些与金钱相关的,是必须保证成功的,那么这个时候要怎么处理呢?
我们可以在创建session的时候使用 CLIENT_ACKNOWLEDGE 模式。创建session的时候是需要指定事务以及消息的处理模式的。我们之前是这样创建session:
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AUTO_ACKNOWLEDGE的消息处理模式是当消息发送给接收端之后,就自动确认成功了,而不管接收端有没有处理成功,而一旦确认成功后,就会把队列里面的消息给清除掉,避免下一个接收端接收到同样的消息。
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
而当我们使用CLIENT_ACKNOWLEDGE的消息处理模式时,如果接收端不确认消息,那么activemq将会把这条消息一直保留,直到有一个接收端确定了消息。那么要怎么确认消息呢?具体代码如下:
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
//确认接收,并成功处理了消息
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
避免消息队列的并发
主动接收队列消息
之前的代码里面,实现了一个监听器,监听消息的传递,这样只要每有一个消息,都会即时的传递到程序中。但是,这样的处理,在高并发的时候,因为它是被动接收,并没有考虑到程序的处理能力,可能会压跨系统,那要怎么办呢?
答案就是把被动变为主动,当程序有着处理消息的能力时,主动去接收一条消息进行处理
if(当程序有能力处理){//当程序有能力处理时接收
Message receive = consumer.receive();
//这个可以设置超时时间,超过则不等待消息
recieve.receive(10000);
//其实receive是一个阻塞式方法,一定会拿到值的
if(null != receive){
String text = ((TextMessage)receive).getText();
receive.acknowledge();
System.out.println(text);
}else{
//没有值
}
}
使用多个接收端
ActiveMQ是支持多个接收端的,如果当程序无法处理这么多数据的时候,可以考虑多个线程,或者增加服务器来处理。
消息有效期的管理
这样的场景也是有的,一条消息的有效时间,当发送一条消息的时候,可能希望这条消息在指定的时间被处理,如果超过了指定的时间,那么这条消息就失效了,就不需要进行处理了,那么我们可以使用ActiveMQ的设置有效期来实现。具体设置如下:
producer.setTimeToLive(long l);
过期消息,处理失败的消息如何处理
过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。这个队列是ActiveMQ自动创建的。如果需要查看这些未被处理的消息,可以进入这个队列中查看:
//指定一个目的地,也就是一个队列的位置
destination = session.createQueue("ActiveMQ.DLQ");
这样就可以进入队列中,然后实现接口,或者通过receive()方法,就可以拿到未被处理的消息,从而保证正确的处理。
整理文章主要为了自己日后复习用,文章中可能会引用到别的博主的文章,如涉及到博主的版权问题,请博主联系我。
概述
ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。
特性
- 遵循JMS规范:ActiveMQ的各种特性是JMS1.1规范的实现。它们包括同步和异步消息传递,一次和只有一次的消息传递,对于预订者的持久消息等等。依附于JMS规范意味着,不论JMS消息提供者是谁,同样的基本特性都是有效的。(JMS可查看前篇博文(ActiveMQ基础教程(一)JMS概述https://www.jianshu.com/p/639627f88a6e)。
- 连接:ActiveMQ提供各种连接选择,包括HTTP,HTTPS,IP多点传送,SSL,STOMP,TCP,UDP,XMPP等。大量的连接协议支持使之具有更好的灵活性。
- 支持多种语言客户端:ActiveMQ对多种语言提供客户端API,除了Java之外还有C/C++、.NET、Perl、PHP、Ruby、Python等。这使得ActiveMQ能用在Java之外的其它语言中。很多其它语言都可以通过ActiveMQ提供的客户端API使用ActiveMQ的全部特性。当然,ActiveMQ代理器(broker)仍然是运行在java虚拟机上,但是客户端能够使用其它的被支持的语言。
- 可插拔的持久性和安全:ActiveMQ提供多种持久性方案可供选择,也可以完全按自己需求定制验证和授权。例如,ActiveMQ通过KahaDB提供自己的超快速消息持久方案(ultra-fast message persistence),但也支持标准的JDBC方案。ActiveMQ可以通过配置文件提供简单的验证和授权,也提供标准的JAAS登陆模块。
- 简单的管理:ActiveMQ是为开发者设计的。它并不需要专门的管理工具,因为它提供各种易用且强大的管理特性。有很多方法去监控ActiveMQ的各个方面,可以通过JMX使用JConsole或ActiveMQ web console;可以运行ActiveMQ消息报告;可以用命令行脚本;可以通过日志。
- 支持集群:为了利于扩展,多个ActiveMQ broker能够联合工作。这个方式就是network of brokers并且能支持多种拓扑结构。
安装与管理后台
安装
ActiveMQ官网下载地址:http://activemq.apache.org/download.html
ActiveMQ 提供了Windows 和Linux、Unix 等几个版本。具体安装方法请自行查找资料进行安装,博主这边就不多叙述。
管理后台
安装成功启动ActiveMQ服务后,在浏览器输入http://localhost:8161,用户名密码默认都是 admin。下面为登陆成功后的页面:
Queues页面
Queues是队列方式消息,从菜单栏中点击Queues可以进入到Queues页面,页面主要内容包括:
- Name:消息队列的名称。
- Number Of Pending Messages:未被消费的消息数目。
- Number Of Consumers:消费者的数量。
- Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。
- Messages Dequeued:出了队列的消息,可以理解为是被消费掉的消息数量。在Queues里它和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。
Topics页面
Topics是主题方式消息,从菜单栏中点击Topics可以进入到Topics页面,页面主要内容包括:
- Name:主题名称。
- Number Of Pending Messages:未被消费的消息数目。
- Number Of Consumers:消费者的数量。
- Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。
- Messages Dequeued:出了队列的消息,可以理解为是被消费掉的消息数量。在Topics里,因为多消费者从而导致数量会比入队列数高。
Subscribers页面
Subscribers 是查看订阅者的页面,可以查看订阅者的信息等。只在Topics消息类型中这个页面才会有数据。
Connections页面
Connections页面可以查看到所有的连接数。
使用
Queue消息模式
点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。
生产者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i < 10; i++) {
/** * 创建一条文本消息 */
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
System.out.println("发送消息:Activemq 发送消息" + i);
/** * 通过消息生产者发出消息 */
messageProducer.send(message);
}
}
}
运行结果图:
我们可以看的,当运行JmsProducer程序时,在ActiveMQ控制台,可以看到生产者往queue.test的队列中发送了10条消息,因为这时还没有消费者,所以这边的Number Of Pending Messages显示的是10,
Number Of Consumers显示的是0,Messages Enqueued显示的也是10。
消费者
public class JmsConsumer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
while (true) {
/** * 接收数据的时间(等待) 100 ms */
TextMessage textMessage = (TextMessage) messageConsumer.receive(1000 * 100);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
运行结果图:
我们可以看到,但运行JmsConsumer程序时,在运行程序的控制台中我们可以看到消费者消费了刚刚生产者生产的消息。在ActiveMQ控制台,可以看到所以这边的Number Of Pending Messages显示的是0,Number Of Consumers显示的是1,Messages Enqueued显示的是10,Messages Dequeued显示的也是10,即消息被消费。
在前面的消费者例子中,我们这边使用while (true) 死循环来不停接受消息。这样很浪费cpu资源,实际生产中不会这么做。下面,我们采用注册一个监听器的方法,当监听到有消息入队列后,才去接收消息。
public class JmsConsumerMessageListener {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
/** * 第六步:创建监听器 */
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
当生产者一生产消息到队列中时,我们的消费者就马上进行消费,注意程序中我们没有将会话和连接关闭,因为监听器是异步的,如果关闭后就无法接收到消息。
Topic消息模式
订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到。
发布者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String TOPIC_NAME = "topic.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为topic.test的主题 */
destination = session.createTopic(TOPIC_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i < 10; i++) {
/** * 创建一条文本消息 */
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
System.out.println("发送消息:Activemq 发送消息" + i);
/** * 通过消息生产者发出消息 */
messageProducer.send(message);
}
}
}
运行结果图:
订阅者
public class JmsConsumer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String TOPIC_NAME = "topic.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为topic.test的主题 */
destination = session.createTopic(TOPIC_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
我们可以发现,Topic消息模式的代码跟Queue消息模式的代码基本是一样的,除了在创建消息目的地的时候,一个是queue一个是topic;还有一点区别就是,Topic消息模式,订阅者需要先订阅,才能接收到发布者发布的消息。
谈谈Session
在通过Connection创建Session的时候,需要设置2个参数,一个是否支持事务,另一个是签收的模式。
签收就是消费者接受到消息后,需要告诉消息服务器,我收到消息了。当消息服务器收到回执后,本条消息将失效。因此签收将对PTP模式产生很大影响。如果消费者收到消息后,并不签收,那么本条消息继续有效,很可能会被其他消费者消费掉!
签收方式有三种:
- AUTO_ACKNOWLEDGE:表示在消费者receive消息的时候自动的签收。客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
- CLIENT_ACKNOWLEDGE:表示消费者receive消息后必须手动的调用acknowledge()方法进行签收。
- DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
发送消息的数据类型
我们上面演示的全都是字符串的消息类型,但ActiveMQ支持的还有ObjectMessage,StreamMessage,MapMessage,BytesMessage等消息类型。下面我们来看看其他消息类型是如何编写的,以下都是以队列的消息模式进行。
ObjectMessage
传输对象
public class User implements Serializable {
private static final long serialVersionUID = 2504467948968634865L;
private String userName;
private String password;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"userName='" + userName + '\'' +
", password='" + password + '\'' +
'}';
}
}
生产者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "object.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 设置所有对所有序列化包都信任 */
connectionFactory.setTrustAllPackages(true);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为object.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/** * 创建一条Object消息 */
ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) session.createObjectMessage();
for (int i = 0; i < 10; i++) {
User user = new User();
user.setUserName("hyn" + i);
user.setPassword("qwe" + i);
System.out.println("发送消息:Activemq 发送消息" + user.toString());
/** * 对象需要序列化 */
objectMessage.setObject(user);
/** * 通过消息生产者发出消息 */
messageProducer.send(objectMessage);
}
}
}
运行结果图:
消费者
public class JmsConsumerMessageListener {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "object.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 设置所有对所有序列化包都信任 */
connectionFactory.setTrustAllPackages(true);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为object.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
/** * 第六步:创建监听器 */
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
User user = (User) ((ActiveMQObjectMessage) message).getObject();
System.out.println("收到的消息:" + user.toString());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
从代码中我们可以看的,ObjectMessage跟TextMessage代码差不多,只不过有两个地方需要注意:
- 所传输的对象必须是序列化的,也就是要实现Serializable接口;
- 在创建连接工厂时,需要添加对所有或需要传输的序列化对象所在的包为白名单,这个是从ActiveMQ 5.12.2 开始为了增强这个框架的安全性,ActiveMQ将强制用户配置可序列化的包名;
BytesMessage
首先我们项目的资源目录下新建两个文件,producer.txt 和 consumer.txt,在producer.txt输入如下内容,consumer.txt为空。
生产者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "bytes.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为bytes.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/** * 创建一条Byte消息 */
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(getFileByte(System.getProperty("user.dir")+"/src/main/resources/producer.txt"));
messageProducer.send(bytesMessage);
}
/** * 读取文件 * * @param fileUrl * @return */
public static byte[] getFileByte(String fileUrl) {
byte[] buffer = null;
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
buffer = new byte[fileInputStream.available()];
fileInputStream.read(buffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return buffer;
}
}
运行结果图:
消费者
public class JmsConsumerMessageListener {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "bytes.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为bytes.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
/** * 第六步:创建监听器 */
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
FileOutputStream fileOutputStream = null;
try {
BytesMessage bytesMessage = (BytesMessage) message;
fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
byte[] content = new byte[1024];
int len;
while ((len = bytesMessage.readBytes(content)) != -1) {
fileOutputStream.write(content, 0, len);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
从结果可以看出,consumer.txt的内容结果跟product.txt内容是一致的,即消息接收成功。当然,发送文件的话我们也可以使用StreamMessage,下面我们来看看StreamMessage的使用。
StreamMessage
同样需要在项目中新建producer.txt 和 consumer.txt两个文件;
生产者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "stream.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/** * 创建一条streamMessage消息 */
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeBytes(getFileByte(System.getProperty("user.dir") + "/src/main/resources/producer.txt"));
messageProducer.send(streamMessage);
}
/** * 读取文件 * * @param fileUrl * @return */
public static byte[] getFileByte(String fileUrl) {
byte[] buffer = null;
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
buffer = new byte[fileInputStream.available()];
fileInputStream.read(buffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return buffer;
}
}
运行结果图:
消费者
public class JmsConsumerMessageListener {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "stream.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
/** * 第六步:创建监听器 */
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
FileOutputStream fileOutputStream = null;
try {
StreamMessage streamMessage = (StreamMessage) message;
fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
byte[] content = new byte[1024];
int len;
while ((len = streamMessage.readBytes(content)) != -1) {
fileOutputStream.write(content, 0, len);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
MapMessage
生产者
public class JmsProducer {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "map.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息生产者 */
MessageProducer messageProducer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为map.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消息生产者 */
messageProducer = session.createProducer(destination);
/** * 第六步:发送消息,这个步骤包括创建消息,然后发送消息 */
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/** * 发送消息 * * @param session * @param messageProducer * @throws JMSException */
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/** * 创建一条mapMessage消息 */
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","hyn");
mapMessage.setInt("age",27);
messageProducer.send(mapMessage);
}
}
运行结果图:
消费者
public class JmsConsumerMessageListener {
/** * 默认连接用户名 */
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** * 默认用户密码 */
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** * 默认连接地址 */
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "map.test";
public static void main(String[] args) {
/** * 第一步:创建连接工厂 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/** * 连接 */
Connection connection = null;
/** * 会话 */
Session session = null;
/** * 消息目的地 */
Destination destination = null;
/** * 消息消费者 */
MessageConsumer messageConsumer = null;
try {
/** * 第二步:创建连接 */
connection = connectionFactory.createConnection();
/** * 启动连接 */
connection.start();
/** * 第三步:创建会话 */
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** * 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列 */
destination = session.createQueue(QUEUE_NAME);
/** * 第五步:创建消费者 */
messageConsumer = session.createConsumer(destination);
/** * 第六步:创建监听器 */
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("name:" + mapMessage.getString("name"));
System.out.println("age:" + mapMessage.getInt("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
ActiveMQ的应用
保证消息的成功处理
消息发送成功后,接收端接收到了消息。然后进行处理,但是可能由于某种原因,高并发也好,IO阻塞也好,反正这条消息在接收端处理失败了。而点对点的特性是一条消息,只会被一个接收端给接收,只要接收端A接收成功了,接收端B,就不可能接收到这条消息,如果是一些普通的消息还好,但是如果是一些很重要的消息,比如说用户的支付订单,用户的退款,这些与金钱相关的,是必须保证成功的,那么这个时候要怎么处理呢?
我们可以在创建session的时候使用 CLIENT_ACKNOWLEDGE 模式。创建session的时候是需要指定事务以及消息的处理模式的。我们之前是这样创建session:
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AUTO_ACKNOWLEDGE的消息处理模式是当消息发送给接收端之后,就自动确认成功了,而不管接收端有没有处理成功,而一旦确认成功后,就会把队列里面的消息给清除掉,避免下一个接收端接收到同样的消息。
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
而当我们使用CLIENT_ACKNOWLEDGE的消息处理模式时,如果接收端不确认消息,那么activemq将会把这条消息一直保留,直到有一个接收端确定了消息。那么要怎么确认消息呢?具体代码如下:
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
//确认接收,并成功处理了消息
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
避免消息队列的并发
主动接收队列消息
之前的代码里面,实现了一个监听器,监听消息的传递,这样只要每有一个消息,都会即时的传递到程序中。但是,这样的处理,在高并发的时候,因为它是被动接收,并没有考虑到程序的处理能力,可能会压跨系统,那要怎么办呢?
答案就是把被动变为主动,当程序有着处理消息的能力时,主动去接收一条消息进行处理
if(当程序有能力处理){//当程序有能力处理时接收
Message receive = consumer.receive();
//这个可以设置超时时间,超过则不等待消息
recieve.receive(10000);
//其实receive是一个阻塞式方法,一定会拿到值的
if(null != receive){
String text = ((TextMessage)receive).getText();
receive.acknowledge();
System.out.println(text);
}else{
//没有值
}
}
使用多个接收端
ActiveMQ是支持多个接收端的,如果当程序无法处理这么多数据的时候,可以考虑多个线程,或者增加服务器来处理。
消息有效期的管理
这样的场景也是有的,一条消息的有效时间,当发送一条消息的时候,可能希望这条消息在指定的时间被处理,如果超过了指定的时间,那么这条消息就失效了,就不需要进行处理了,那么我们可以使用ActiveMQ的设置有效期来实现。具体设置如下:
producer.setTimeToLive(long l);
过期消息,处理失败的消息如何处理
过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。这个队列是ActiveMQ自动创建的。如果需要查看这些未被处理的消息,可以进入这个队列中查看:
//指定一个目的地,也就是一个队列的位置
destination = session.createQueue("ActiveMQ.DLQ");
这样就可以进入队列中,然后实现接口,或者通过receive()方法,就可以拿到未被处理的消息,从而保证正确的处理。
整理文章主要为了自己日后复习用,文章中可能会引用到别的博主的文章,如涉及到博主的版权问题,请博主联系我。