深入浅出 JMS(二) - ActiveMQ 入门指南

时间:2022-04-05 12:06:38

深入浅出 JMS(二) - ActiveMQ 入门指南

上篇博文深入浅出 JMS(一) – JMS 基本概念,我们介绍了消息通信的规范JMS,这篇博文介绍一款开源的 JMS 具体实现—— ActiveMQ。ActiveMQ 是一个易于使用的消息中间件。

一、消息中间件和 ActiveMQ

(1) 消息中间件(MOM:Message Orient middleware)

我们简单的介绍一下消息中间件,对它有一个基本认识就好,消息中间件有很多的用途和优点:

  1. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;
  2. 负责建立网络通信的通道,进行数据的可靠传送;
  3. 保证数据不重发,不丢失;
  4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务。

(2) ActiveMQ

首先简单的介绍一下 MQ,MQ 英文名 MessageQueue,中文名也就是大家用的消息队列,干嘛用的呢,说白了就是一个消息的接受和转发的容器,可用于消息推送。

下面进入我们今天的主题,为大家介绍 ActiveMQ。

Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.
Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4.

ActiveMQ 是由 Apache 出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

下面我们下载一个版本,玩一玩。

二、运行 ActiveMQ 服务

(1) 下载,解压缩

ActiveMQ 下载网站:http://activemq.apache.org/activemq-5153-release.html

大家现在好之后,将 apache-activemq-5.15.3-bin.zip 解压缩,我们可以看到它的整体目录结构:

深入浅出 JMS(二) - ActiveMQ 入门指南

从它的目录来说,还是很简单的:

  1. bin:存放的是脚本文件
  2. conf:存放的是基本配置文件
  3. data:存放的是日志文件
  4. docs:存放的是说明文档
  5. examples:存放的是简单的实例
  6. lib:存放的是 activemq 所需 jar 包
  7. webapps:用于存放项目的目录

(2) 启动 ActiveMQ

我们了解 activemq 的基本目录,下面我们运行一下 activemq 服务,双击 bin 目录下的 bin/win64/activemq.bat 脚本文件,就可以看下图的效果。

深入浅出 JMS(二) - ActiveMQ 入门指南

从上图我们可以看到 activemq 的存放地址,以及浏览器要访问的地址。

(3) 测试

ActiveMQ 默认使用的 TCP 连接端口是 61616, 通过查看该端口的信息可以测试 ActiveMQ 是否成功启动

netstat -an | find "61616" 

TCP     0.0.0.0:61616     0.0.0.0:0       LISTENING

(4) 监控

ActiveMQ 默认启动时,启动了内置的 jetty 服务器,提供一个用于监控 ActiveMQ 的 admin 应用。地址:http://127.0.0.1:8161/admin/

用户名和密码都是 admin

深入浅出 JMS(二) - ActiveMQ 入门指南

三、ActiveMQ 特性列表

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持 JMS1.1 和 J2EE 1.4 规范 (持久化,XA消息,事务)
  3. 对 Spring 的支持,ActiveMQ 可以很容易内嵌到使用 Spring 的系统里面去,而且也支持 Spring2.0 的特性
  4. 通过了常见 J2EE 服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过 JCA 1.5 resource adaptors 的配置,可以让 ActiveMQ 可以自动的部署到任何兼容 J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过 JDB C和 journal 提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持 Ajax
  9. 支持与 Axis 的整合
  10. 可以很容易得调用内嵌 JMS provider,进行测试

四、什么情况下使用 ActiveMQ

  1. 多个项目之间集成

    • 跨平台
    • 多语言
    • 多项目
  2. 降低系统间模块的耦合度,解耦

    • 软件扩展性
  3. 系统前后端隔离

    • 前后端隔离,屏蔽高安全区

五、ActiveMQ 快速入门

们首先写一个简单的 Hello World 示例,让大家感受下 Activemq,我们需要实现接受者和发送者两部分代码的编写。

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>

(1) 生产者

public class Producer {

    public static void main(String[] args) throws JMSException {
//1. 创建 ConnectionFactory 连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory( // (1)
//ActiveMQConnectionFactory.DEFAULT_USER,
//ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"admin", "password",
"tcp://localhost:61616/"
); //2. 创建 connection,并启动连接
Connection connection = factory.createConnection(); // (2)
connection.start(); //3. Session 是一个发送或接收消息的线程
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // (3) //4. 指定生产消息目标禾消费消息来源的 Destination 对象
Destination dest = session.createQueue("queue1"); // (4) //5. 创建生产者
MessageProducer producer = session.createProducer(dest); // (5) //6. 指定签收模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // (6) //7. 创建消息
for (int i = 0; i < 10; i++) { // (7)
TextMessage message = session.createTextMessage("aaaaaaaaaaaaaaa");
producer.send(message);
} if (connection != null) {
connection.close();
}
}
}
  1. 创建 ConnectionFactory 实例,ActiveMQConnectionFactory 构造函数传入三个参数,分别是用户名,密码,消息地址,tcp 端口可以在 conf/activemq.xml 中配制。

  2. 创建 connection,并启动连接,Connection 默认是关闭的。注意:使用结束后要关闭 connection.close()

  3. 创建 session 会话,一个 connection 可以创建多个 session,Session 是一个发送或接收消息的线程。

    • 参数1:是否开启事务,如果开启事务,则在必须提交 session

      for (int i = 0; i < 10; i++) {  // (7)
      TextMessage message = session.createTextMessage("aaaaaaaaaaaaaaa");
      producer.send(message);
      }
      session.commit();
    • 参数2:签收模式,有 Session.AUTO_ACKNOWLEDGE(自动)、Session.CLIENT_ACKNOWLEDGE(手动 常用)、Session.DUPS_OK_ACKNOWLEDGE(可能重复签收),如若选择 Session.CLIENT_ACKNOWLEDGE,则必须在消费端确认,否则 ActiveMQ 不认为消息已经消费。生产中一般使用 Session.CLIENT_ACKNOWLEDGE 签收,不要相信自动签收方式。

      # 客户端处理
      TextMessage message = (TextMessage) consumer.receive();
      message.acknowledge();
  4. 通过 session 创建 Destination 对象,指定生产消息目标禾消费消息来源的对象。在 PTP 模式中 Destination 被称作 Queue 即队列;在 Pub/sub 模式 Destination 被称作 Topic 即主题,在程序中可以使用多个 Queue 和 Top。

  5. 创建消息的发送和接收对象(生产者和消贵者) MessageProducer/MessageConsumer

  6. 指定签收模式。使用 MessageProducer 的 setDeliveryMode 方法为其设置持久化特性和非持久化特性(DeliveryMode)

  7. 发送消息。使用 JMS 规范的 TextMessage 形式创建数据(通过 session 对象),并且 MessageProducer 的 send 方法发送数据。同理客户端使用 receive 方法进行接收数据。最后不要忘记关闭 Connection 连接。

    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)

    • destination 可以指定将不同的消息发送到不同的 destination,但消费端就必须指定对应的 destination

    • deliveryMode 是否开启持久化,默认为持久性,DeliveryMode.NON_PERSISTENTDeliveryMode.PERSISTENT

    • priority 优先级 0-9,默认为 4,优先级越高越先消费,概率

    • timeToLive ActiveMQ 中消息保留的时间,单位秒,默认永久保存

(2) 消费者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {

    public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616/"
); Connection connection = factory.createConnection();
connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createQueue("queue1"); // (1) MessageConsumer consumer = session.createConsumer(dest); while (true) {
TextMessage message = (TextMessage) consumer.receive();
//message.acknowledge();
if (message == null)
break;
System.out.println(message.getText());
} if (connection != null) {
connection.close();
}
}
}
  1. 消费端与生产者大同小异,注意对应的参数最好设定为一致。必须从对应的 Destination 取出数据,否则无法取到数据。