一. JMS规范
1. JMS概念
本质是API,Java平台消息中间件的规范,java应用程序之间进行消息交换。并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
2. JMS对象模型
1)连接工厂:创建一个JMs连接
2)JMS连接:客户端和服务器之间的一个连接。
3)JMS会话:客户和服务器会话的状态,建立在连接之上的
4)JMS目的:消息队列
5)JMS生产者:消息的生成
6)JMS消费者:接收消息
7)Broker:消息中间件的实例(ActiveMq)
3. JMS---点对点模式
队列,一个消息只有一个消费者(即使有多个接受者监听队列),消费者是要向队列应答成功
4. JMS---消息类型
TextMessage | 字符串 |
BytesMessage | 连续字节流 |
MapMessage | 键值对 |
StreamMessage | 流 |
ObjectMessage | 对象 |
5. JMS---主题模式(发布订阅)
发布到Topic的消息会被当前主题所有的订阅者消费
二. JMS的实现---ActiveMQ
1. 安装
a.下载:
http://activemq.apache.org/activemq-580-release.html
b.启动:
c.运行:
初始用户名和密码:admin/admin
2. ActiveMQ java 客户端
a. pom 引用
<!--ActiveMq--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency>
b. 原生ActiveMQ代码演示
生产者:
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JmsProducer { //默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //发送的消息数量 private static final int SENDNUM = 10; public static void main(String[] args) { ConnectionFactory connectionFactory;//连接工厂 Connection connection = null;//连接 Session session;//会话 Destination destination;//队列/目的 MessageProducer messageProducer;//消息的生产者 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //createSession(paramA,paramB); //paramA: 是否启用事务 true 是, false 否 //paramB:消息的确认模式 //AUTO_ACKNOWLEDGE 自动签收 //CLIENT_ACKNOWLEDGE 客户端自行调用acknowledge方法签收 //DUPS_OK_ACKNOWLEDGE 不是必须签收,消费可能会重复发送在第二次重新传送消息的时候,消息头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。 destination = session.createQueue("HelloWAM"); messageProducer = session.createProducer(destination); for(int i=0;i<SENDNUM;i++){ String msg = "发送消息"+i+" "+System.currentTimeMillis(); TextMessage message = session.createTextMessage(msg); System.out.println("发送消息:"+msg); messageProducer.send(message); } session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally { if(connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
消费者:
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 动脑学院-Mark老师 * 创建日期:2017/10/17 * 创建时间: 18:01 */ public class JmsConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory;//连接工厂 Connection connection = null;//连接 Session session;//会话 接受或者发送消息的线程 Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息的消费者 //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME, JmsConsumer.PASSWORD, JmsConsumer.BROKEURL); try { //通过连接工厂获取连接 connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建一个连接HelloWorld的消息队列 destination = session.createQueue("HelloWAM"); //创建消息消费者 messageConsumer = session.createConsumer(destination); //读取消息 while(true){ TextMessage textMessage = (TextMessage)messageConsumer.receive(10000); if(textMessage != null){ System.out.println("Accept msg : "+textMessage.getText()); }else{ break; } } } catch (JMSException e) { e.printStackTrace(); } } }
c. 与Spring的集成
增加pom依赖
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.11.RELEASE</version> </dependency>
代码:
1.添加命名空间
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
2.生产者配置
<!-- ActiveMQ 连接工厂 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <property name="sessionCacheSize" value="100"></property> </bean> <!-- Spring JmsTemplate 的消息生产者 start--> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- false: 队列模式 true : 发布订阅模式 --> <property name="pubSubDomain" value="false"></property> </bean> <!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- 发布订阅模式--> <property name="pubSubDomain" value="true"></property> </bean> <!--Spring JmsTemplate 的消息生产者 end--> <!--接收消费者应答的监听器--> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="tempqueue" ref="getResponse"></jms:listener> </jms:listener-container>
3.生产者代码(点对点)
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.*; import java.awt.peer.SystemTrayPeer; /** * * @author Mark * @description 队列消息生产者,发送消息到队列 * */ @Component("queueSender") public class QueueSender { @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; @Autowired private GetResponse getResponse; public void send(String queueName,final String message){ jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(message); //配置消费者应答相关内容 Destination tempDest = session.createTemporaryQueue(); MessageConsumer responseConsumer = session.createConsumer(tempDest); responseConsumer.setMessageListener(getResponse); msg.setJMSReplyTo(tempDest); //消费者应答的id,发送出的消息和应答消息进行匹配 String uid = System.currentTimeMillis()+""; msg.setJMSCorrelationID(uid); return msg; } }); } }
4.消费者配置
<!-- ActiveMQ 连接工厂 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- Spring Caching连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <property name="sessionCacheSize" value="100" /> </bean> <!-- 消息消费者 start--> <!-- 定义Queue监听器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener> <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener> </jms:listener-container> <!-- 定义Topic监听器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener> <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener> </jms:listener-container> <!-- 消息消费者 end --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- 队列模式--> <property name="pubSubDomain" value="false"></property> </bean>
5.消费者代码(点对点)
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import javax.xml.soap.Text; /** * * @author Mark * @description 队列消息监听器 * */ @Component public class QueueReceiver1 implements MessageListener { @Autowired private ReplyTo replyTo; public void onMessage(Message message) { try { String textMsg = ((TextMessage)message).getText(); System.out.println("QueueReceiver1 accept msg : "+textMsg); //do my 业务工作 replyTo.send(textMsg,message); } catch (JMSException e) { e.printStackTrace(); } } }