消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

时间:2021-05-29 20:43:20

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;

由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

ActiveMQ是用来干什么的?

用来处理消息,也就是处理JMS的。消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有着深入的应用,

队列的主要作用是消除高并发访问高峰,加快网站的响应速度。

在不使用消息队列的情况下,用户的请求数据直接写入数据库,高发的情况下,会对数据库造成巨大的压力,

同时也使得系统响应延迟加剧,但使用队列后,用户的请求发给队列后立即返回。

例如:不能直接给用户提示订单提交成功,京东上提示:“您提交了订单,

请等待系统确认”再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。

由于消息队列的服务处理速度远快于数据库,因此用户的响应延迟可得到有效改善。

ActiveMQ的使用场景?

1、异步调用。

2、一对多通信。

3、做多个系统的集成、同构、异构。

4、作为RPC的替代。

5、多个应用相互解耦。

6、作为事件驱动架构的幕后支撑。

7、为了提高系统的可伸缩性。

ActiveMQ的特点?

支持Java消息服务(JMS) 1.1 版本

Spring Framework

集群 (Clustering)

支持的编程语言包括:CC++C#DelphiErlangAdobe FlashHaskellJavaJavaScriptPerlPHPPikePythonRuby

协议支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP

怎么样安装ActiveMQ?

下载地址:http://www.apache.org/dyn/closer.cgi?path=/activemq/apache-activemq/5.8.0/apache-activemq-5.8.0-bin.zip

解压即可完成ActiveMQ的安装

解压后目录结构如下

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

在bin目录下面如果是如果是32位就选择win32 ,64位就选择win64 然后点击activemq.bat启动

三种启动方式:

(1)普通启动 ./activemq start
(2)启动并指定日志文件 ./activemq start >tmp/smlog
(3)后台启动方式nohup ./activemq start >/tmp/smlog
前两种方式下在命令行窗口关闭时或者ctrl+c时导致进程退出,采用后台启动方式则可以避免这种情况

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

使用点对点的方式实现洗息队列

第一步:导入依赖

 <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>

  

第二步:创建生产者

package com.wish.peertopeer;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*; public class P2pProducer {
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
// JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Destination destination = session.createQueue("my-queue");
// MessageProducer:消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 发送一条消息
for (int i = 1; i <= 5; i++) {
sendMsg(session, producer, i);
}
connection.close();
}
/**
* 在指定的会话上,通过指定的消息生产者发出一条消息
*
* @param session
* 消息会话
* @param producer
* 消息生产者
*/
public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
// 创建一条文本消息
TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
// 通过消息生产者发出消息
producer.send(message);
}
}

  

第三步:创建消费者

package com.wish.peertopeer;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class P2pConsumer {
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
// JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
Destination destination = session.createQueue("my-queue");
// 消费者,消息接收者
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.println("收到消息:" + message.getText());
} else
break;
}
session.close();
connection.close();
}
}

  

实现效果

启动生产者

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

浏览http://localhost:8161/admin/queues.jsp查看

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

启动消费者

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

浏览http://localhost:8161/admin/queues.jsp查看

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

使用发布订阅的方式实现消息队列

第一步:也是导入依赖与上面一样

第二步:创建生产者

package com.wish.publishandsubscribe;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class PapProducer { private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String TOPIC = "my-topic"; public static void main(String[] args) throws JMSException {
start();
} static public void start() throws JMSException {
System.out.println("生产者已经启动....");
// 创建ActiveMQConnectionFactory 会话工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
Connection connection = activeMQConnectionFactory.createConnection();
// 启动JMS 连接
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
send(producer, session);
System.out.println("发送成功!");
connection.close();
} static public void send(MessageProducer producer, Session session) throws JMSException {
for (int i = 1; i <= 5; i++) {
System.out.println("我是消息" + i);
TextMessage textMessage = session.createTextMessage("我是消息" + i);
Destination destination = session.createTopic(TOPIC);
producer.send(destination, textMessage);
}
}
}

  

第三步:创建消费者

package com.wish.publishandsubscribe;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*;
public class PapConsumer {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String TOPIC = "my-topic"; public static void main(String[] args) throws JMSException {
start();
} static public void start() throws JMSException {
System.out.println("消费点启动...");
// 创建ActiveMQConnectionFactory 会话工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
Connection connection = activeMQConnectionFactory.createConnection();
// 启动JMS 连接
connection.start();
// 不开消息启事物,消息主要发送消费者,则表示消息已经签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个队列
Topic topic = session.createTopic(TOPIC);
MessageConsumer consumer = session.createConsumer(topic);
// consumer.setMessageListener(new MsgListener());
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
if (textMessage != null) {
System.out.println("接受到消息:" + textMessage.getText());
// textMessage.acknowledge();// 手动签收
// session.commit();
} else {
break;
}
}
connection.close();
}
}

  

实现效果

启动生产者

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

浏览http://localhost:8161/admin/topics.jsp查看

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

启动消费者

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

浏览http://localhost:8161/admin/topics.jsp查看

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ