【JMS】JMS之ActiveMQ的使用

时间:2022-02-04 22:43:08

这篇文章主要是简单介绍一下JMS和ActiveMQ,以及使用ActiveMQ来写两个demo。

1. JMS是啥

百度百科的解释:

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS只是接口,不同的提供商或者开源组织对其有不同的实现,ActiveMQ就是其中之一,它支持JMS,是Apache推出的。JMS中有几个对象模型:

连接工厂:ConnectionFactory 
JMS连接:Connection 
JMS会话:Session 
JMS目的:Destination 
JMS生产者:Producer 
JMS消费者:Consumer 
JMS消息两种类型:点对点和发布/订阅。

可以看出JMS实际上和JDBC有点类似,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。本文主要使用ActiveMQ。

2. ActiveMQ

关于ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。 
使用ActiveMQ首先需要去官网下载,我用的是apache-activemq-5.11.1,下载后解压缩,会看到里面有个activemq-all-5.11.1.jar,这个jar是接下来我们写程序的时候需要加进工程里的。 
在使用ActiveMQ之前,首先得先启动,刚才解压后的目录中有个bin目录,里面有win32和win64两个目录,根据自己电脑选择其中一个打开运行activemq.bat启动ActiveMQ。 
启动完成后,在浏览器中输入http://127.0.0.1:8161/admin/来访问ActiveMQ的服务器,用户名和密码是admin/admin。如下:

【JMS】JMS之ActiveMQ的使用

我们等会儿主要是看Queues和Topics这两个选项,因为这就是上面提到的点对点和发布/订阅的查看窗口。 
OK,准备工作做好了,下面开始针对这两中消息类型写两个demo来跑一下。

3. 点对点消息

首先写消息的生产者,也就是发送消息。

package org.shanheyongmu.cn.pointtopoint;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消息的生产者,也就是发送消息
*/
public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
private static final int SENDNUM=10;//定义消息发送数量 public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂,用来生产Connection
Connection connection=null;//连接
Session session;//会话,接收或者发送消息的线程
Destination destination;//消息的目的地
MessageProducer messageProducer;//消息发送者 //实例化 连接工厂
connectionFactory=new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
try{
connection=connectionFactory.createConnection();
connection.start();//启动连接
session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//获取Session
destination=session.createQueue("FirstQueue1");//创建消息队列,名为FirstQueue1
messageProducer=session.createProducer(destination);//创建消息生产者
sendMessage(session,messageProducer);//发送消息
session.commit();//因为上面加了事务Boolean.TRUE表示有事务,所以commit
}catch(JMSException e){
e.printStackTrace();
} finally{
if(connection!=null){
try{
connection.close();
}catch(JMSException e){
e.printStackTrace();
}
}
}
}
/**
* 发送消息
* @param session
* @param messageProducer
* @throws JMSException
*/
private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException {
for(int i=0;i<JMSProducer.SENDNUM;i++){
TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
messageProducer.send(message);
}
} }

可以看出,主要的流程就是上面提到的JMS的一些对象模型,这个和JDBC很类似,接下来再写消息的消费者,也就是接收方。

package org.shanheyongmu.cn.pointtopoint;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址 public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂,用来生产Connection
Connection connection=null;
Session session;//会话,接收或者发送消息的线程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息消费者
//实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
try{
connection=connectionFactory.createConnection();//通过连接工厂获取链接
connection.start();//启动连接
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);//获取session 不需要加事务了
destination=session.createQueue("FirstQueue1");//创建消息队列,名为FirstQueue1
messageConsumer=session.createConsumer(destination);//创建消息消费者
//注册消息监听
messageConsumer.setMessageListener(new Listener());
}catch(Exception e){
e.printStackTrace();
}
} }

从代码中可以看出,接收消息需要一个监听器,这个监听器可以自己来实现,需要实现MessageListener接口,如下:

package org.shanheyongmu.cn.pointtopoint;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage; /**
* 接收消息需要个监听类
* */
public class Listener implements MessageListener { public void onMessage(Message message) {
try{
System.out.println("收到的消息: "+((TextMessage)message).getText());
}catch(JMSException e){
e.printStackTrace();
}
} }

ok,现在就可以运行这两个程序了,首先运行JMSProducer,然后运行JMSConsumer就可以看到控制台的消息输出。然后打开ActiveMQ的服务器,可以看到如下信息: 
【JMS】JMS之ActiveMQ的使用

可以看出,总共产生了20条消息,消费了20条消息,这就是点对点的模式。

4. 订阅/发布消息

下面写一个订阅/发布消息的demo。和上面点对点类似,不过代码有一些细节上的差异,另外订阅方用两个Consumer来模拟一下,发布方用一个Producer来模拟。

发布消息:

package org.shanheyongmu.cn.subscriber;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; /**
* 发送消息
*/
public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
private static final int SENDNUM=10;//定义消息发送数量 public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂,用来生产Connection
Connection connection=null;//会话
Session session;//会话,接收或者发送消息的线程
Destination destination;//消息的目的地
MessageProducer messageProducer;//消息发送者
//实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
try{
connection=connectionFactory.createConnection();//通过连接工厂获取连接
connection.start();//启动连接
session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);//获取session
destination=session.createTopic("FirstTopic");//创建消息队列 名为FirstTopic1
messageProducer=session.createProducer(destination);//创建消息生产者
sendMessage(session,messageProducer);//发送消息
session.commit();//因为上面加了事务Boolean.TRUE表示有事务,所以要commit
}catch(JMSException e){
e.printStackTrace();
}finally{
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
} /**
* 发布消息
* @param session
* @param messageProducer
* @throws JMSException
*/
private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException {
for(int i=0;i<JMSProducer.SENDNUM;i++){
TextMessage message=session.createTextMessage("ActiveMQ 发布的消息"+i);
System.out.println("ActiveMQ发布的消息"+i);
messageProducer.send(message); } }
}

和点对点有个细微的区别,这里是用session.createTopic,注意一下即可。 
订阅消息:

package org.shanheyongmu.cn.subscriber;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.shanheyongmu.cn.pointtopoint.Listener; /**
*订阅消息
*/
public class JMSConsumer {
private static final String USERNAME=ActiveMQConnection.DEFAULT_USER;//默认用户名
private static final String PASSWROD=ActiveMQConnection.DEFAULT_PASSWORD;//默认密码
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址 public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂,用来生产Connection
Connection connection=null;//连接
Session session;//会话,接收或者发送消息的线程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息消费者 //实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(USERNAME,PASSWROD,BROKEURL);
try{
connection=connectionFactory.createConnection();//通过连接工厂获取连接
connection.start();//启动连接
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);//获取session 不需要加事务了
destination=session.createTopic("FirstTopic1");//创建消息队列,名为FirstTopic1
messageConsumer=session.createConsumer(destination);//创建消息消费者
//注册消息监听
messageConsumer.setMessageListener(new Listener());
}catch(JMSException e){
e.printStackTrace();
}
} }

同样地,这里也是使用createTopic,另一个订阅方的代码和这个一样,就不写了,然后运行所有的JMSConsumer,再运行JMSProducer,就会看到控制台会打印出消息。再看一下服务器上的状态: 
【JMS】JMS之ActiveMQ的使用 
我们可以看到,有两个订阅者,发布了10条消息,消费了20条。以上就是ActiveMQ使用的简单demo。

【JMS】JMS之ActiveMQ的使用的更多相关文章

  1. 解决Maven中Missing artifact javax&period;jms&colon;jms&colon;jar&colon;1&period;1&colon;compile

    搭建好项目后报错: Missing artifact javax.jms:jms:jar:1.1:compile  于POM.xml中 解决方案: 一 :在nexus中配置一个代理仓库     地址为 ...

  2. maven 下载jar失败: Missing artifact javax&period;jms&colon;jms&colon;jar&colon;1&period;1

    想从*仓库下载, 却出现404, 原来,而*仓库中都只有pom文件, 而没有jar包. 那就换一个 仓库吧: http://repository.jboss.com/maven2/: 终于找到了你 ...

  3. log4j的1&period;2&period;15版本,在pom&period;xml中的顶层project报错错误: Failure to transfer javax&period;jms&colon;jms&colon;jar&colon;1&period;1 from https&colon;&sol;&sol;maven-repository&period;dev&period;java&period;net&sol;nonav&sol;repository&period;&period;&period;&period;&period;&period;

    在动态网站工程中,添加了Pom依赖,当添加log4j的1.2.15版本依赖时,在pom.xml中的顶层project报错错误: Failure to transfer javax.jms:jms:ja ...

  4. JMS消息中间件系列&lbrack;ActiveMQ&rsqb;&lpar;一&rpar;

    版本5.13.3的特性: 1.Supports a variety of Cross Language Clients and Protocols from Java, C, C++, C#, Rub ...

  5. Spring整合JMS&lpar;一&rpar;——基于ActiveMQ实现

    1.1     JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到 ...

  6. 消息中间件系列一:入门、JMS规范、ActiveMQ使用

    一.入门 1. 消息中间件的定义 没有标准定义,一般认为,采用消息传送机制/消息队列 的中间件技术,进行数据交流,用在分布式系统的集成 2. 为什么要用消息中间件 解决分布式系统之间消息的传递.电商场 ...

  7. JMS学习以及jms的实现activeMq

    1.JMS规范介绍: http://www.cnblogs.com/hapjin/p/5431706.html http://elim.iteye.com/blog/1893038 http://bl ...

  8. JMS学习&lpar;五&rpar;--ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系

    一,消息的持久化和非持久化 ①DeliveryMode 这是传输模式.ActiveMQ支持两种传输模式:持久传输和非持久传输(persistent and non-persistent deliver ...

  9. 实现JMS规范的ActiveMQ

    ActiveMQ是Apache软件基金会的开源产品,支持AMQP协议.MQTT协议(和XMPP协议作用类似).Openwire协议和Stomp协议等多种消息协议.并且ActiveMQ完整支持JMS A ...

  10. Spring整合JMS&lpar;一&rpar;——基于ActiveMQ实现 (转)

    *注:别人那复制来的 1.1     JMS简介 JMS的全称是Java Message Service,即Java消 息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者 ...

随机推荐

  1. Zero Copy 简介

    转自:http://blog.csdn.net/zzz_781111/article/details/7534649 许多web应用都会向用户提供大量的静态内容,这意味着有很多data从硬盘读出之后, ...

  2. php课程---JavaScript改变HTML中的元素

    <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/ ...

  3. HTML&amp&semi;CSS----练习&lpar;运算符&rpar;

    <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/ ...

  4. Asp&period;net forms认证注意事项

    1.N台服务器配置文件的相关配置要一致 <authentication mode="Forms"> <forms timeout="3600" ...

  5. HDU 4386

    http://acm.hdu.edu.cn/showproblem.php?pid=4386 题意:给四条边长,问能否组成四边形,如果能,求最大面积 求最大面积用海伦公式的四边形推广,p=(a+b+c ...

  6. Text selection in div&lpar;contenteditable&rpar; when double click

    背景: 在最近项目中,碰到一个问题:有一个可编辑的div需要双击时可编辑,blur或者回车时将编辑结果保存.你可能注意到双击时,文字会被选中,可编辑区域不会focus到光标位置.考虑到兼容性问题,写了 ...

  7. vim插件

    所需即所获:像 IDE 一样使用 vim https://github.com/yangyangwithgnu/use_vim_as_ide mark.vim http://www.vim.org/s ...

  8. Ani动态光标格式解析

    数据结构: Ani文件中的数据是按区段存放的,区段数据结构如下: 标识符(4字节ASCII),数据长度(一个DWORD),数据 按照此规则来看Ani文件,文件起始12字节可以理解为标准文件头,除数据长 ...

  9. 学会C sharp计算机编程语言 轻松开发财务、统计软件

    就像人们用同一种语言才可以顺畅交流一样,语言是计算机编程的根本,是IT世界交流的工具.运用这些计算机语言,人们可以创造出一个美妙的世界.你点击某个网页或是安装一个应用程序软件,这简简单单动作的背后,就 ...

  10. 【2&period;0】SpringBoot2配置Druid数据源及监控

    什么是Druid? Druid首先是Java语言中最好的数据库连接池,也是阿里巴巴的开源项目.Druid是阿里巴巴开发的号称为监控而生的数据库连接池,在功能.性能.扩展性方面,都超过其他数据库连接池, ...