JMS与activeMQ,消息中间件入门

时间:2022-03-10 03:56:59
什么是中间件
中间件是介于操作系统和应用程序之间的技术类组件
中间件的作用
提取通用的部分,屏蔽了底层的通讯,交互,连接等复杂又通用化的功能,以产品的形式提供出来,系统在交互时,直接采用中间件进行连接和交互即可,避免了大量的代码开发和人工成本。

什么是消息中间件
关注于数据的发送和接收,可以利用高效可靠的异步消息传递机制集成分布式系统的组件为什么使用消息中间件解耦、削峰、异步、顺序保证、横向扩展等有时候一条消息需要发送给多个应用、服务或给它们调用,如果使用接口调用的,会导致接口混乱,不易于维护和扩展,如下图使用了消息中间件,带来的好处一览无遗。
JMS与activeMQ,消息中间件入门
什么是JMS
JMS(java message service)是一个java平台中关于面向消息中间件的API,用于在两个应用程序之间或分布式系统中发送消息和异步通信
什么是AMQP
AMQP(advanced message queuing protocal)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端和消息中间件可传递消息,跨语言和平台。
JMS与activeMQ,消息中间件入门
搭建activeMQ,我个人把activeMQ搭建在centos上,下载相关压缩包,解压后就可以使用了,非常方便客户端需要activemq-core.jar,最好使用maven,比较方便点,不用自己去管理它的依赖关系

常用的消息中间件有activeMQ、rabbitMQ、kafka和rocketMQ,下面我使用activeMQ作为入门,写2个demo。
使用队列模式
package activemq_test.product_consumer_design;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* writer: holien
* Time: 2017-09-11 15:19
* Intent: 生产者类(队列模式,消费者或生产者谁先启动无所谓)
*/
public class Product {
public static void main(String[] args) throws JMSException {
// 使用tcp协议连接到linux上的activeMQ服务器,端口号为61616
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://10.101.123.124:61616");
// connect和session都是javax.jms包下的
Connection connection = factory.createConnection();
connection.start();
// 第一个参数为是否使用事务
// 第二个参数为该session发送的消息给消费者使用后,消费者不用手动向activeMQ确定消息已经收到
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列并产生一个目的地
Destination destination = session.createQueue("test-queue");
// 创建指向test-queue目的地的消息生产者
MessageProducer producer = session.createProducer(destination);
// 异步发送100条消息
for (int i = 0; i < 100; i++) {
TextMessage message = session.createTextMessage("test" + i);
producer.send(message);
System.out.println("生产:" + message.getText());
}
// 关闭资源
producer.close();
session.close();
connection.close();
}
}
package activemq_test.product_consumer_design;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * writer: holien * Time: 2017-09-11 15:19 * Intent: 消费者类(队列模式,消费者或生产者谁先启动无所谓) */public class Consumer {    public static void main(String[] args) throws JMSException {        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://10.101.123.124:61616");        // connect和session都是javax.jms包下的        Connection connection = factory.createConnection();        connection.start();        // 第一个参数为是否使用事务        // 第二个参数为该session发送的消息给消费者使用后,消费者不用手动向activeMQ确定消息已经收到        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列并产生一个目的地        Destination destination = session.createQueue("test-queue");        // 创建指向test-queue目的地的消息生产者        MessageConsumer consumer = session.createConsumer(destination);        //接收1000毫秒内到达的消息,如果没有收到此方法将阻塞等待直到指定超时时间//        Message message = consumer.receive(1000);        // 设置消息监听器,当有多个消费者时自动负载均衡        // 此方法是异步的,若要消费全部消息,必须确保在关闭资源前        consumer.setMessageListener((message) -> {            TextMessage textMessage = (TextMessage)message;            try {                System.out.println("消费:" + textMessage.getText());            } catch (JMSException e) {                e.printStackTrace();            }        });        // 关闭资源        consumer.close();        session.close();        connection.close();    }}
使用主题模式(pub/sub)
package activemq_test.subscriber_design;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * writer: holien * Time: 2017-09-11 15:19 * Intent: 发布者类(主题模式,订阅者必须先于发布者启动,否则接收不到消息) */public class Publisher {    public static void main(String[] args) throws JMSException {        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://10.101.123.124:61616");        // connect和session都是javax.jms包下的        Connection connection = factory.createConnection();        connection.start();        // 第一个参数为是否使用事务,第二个参数为        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列并产生一个目的地        Destination destination = session.createTopic("test-topic");        // 创建指向test-queue目的地的消息生产者        MessageProducer publisher = session.createProducer(destination);        // 异步发布100条消息        for (int i = 0; i < 100; i++) {            TextMessage message = session.createTextMessage("test" + i);            publisher.send(message);            System.out.println("发布:" + message.getText());        }        // 关闭资源        publisher.close();        session.close();        connection.close();    }}
package activemq_test.subscriber_design;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * writer: holien * Time: 2017-09-11 15:19 * Intent: 订阅者类(主题模式,订阅者必须先于发布者启动,否则接收不到消息) */public class Subscriber1 {    public static void main(String[] args) throws JMSException {        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://10.101.123.124:61616");        // connect和session都是javax.jms包下的        Connection connection = factory.createConnection();        connection.start();        // 第一个参数为是否使用事务,第二个参数为        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建主题并产生一个目的地        Destination destination = session.createTopic("test-topic");        // 创建指向test-queue目的地的消息生产者        MessageConsumer subscriber = session.createConsumer(destination);        //接收1000毫秒内到达的消息,如果没有收到此方法将阻塞等待直到指定超时时间//        Message message = consumer.receive(1000);        // 设置消息监听器,只要订阅了就可以接收消息,类似广播        // 此方法是异步的,若要接收全部消息,必须确保在关闭资源前        subscriber.setMessageListener((message) -> {            TextMessage textMessage = (TextMessage)message;            try {                System.out.println("订阅2:" + textMessage.getText());            } catch (JMSException e) {                e.printStackTrace();            }        });        // 关闭资源//        subscriber.close();//        session.close();//        connection.close();    }}
主题模式和队列模式的区别:对于主题模式发布的消息,所有订阅者都能接收,而队列模式产生的消息只会发送给某个消费者。

activeMQ启动后,我们可以在浏览器访问ip:8161,这是一个监控页面,可以观察到queue和topic的消息收发情况以及目前的连接有哪些,如下图

JMS与activeMQ,消息中间件入门


activeMQ与spring整合

除了spring常规的jar,还需要加入spring-jms.jar,这个jar提供了很多类可以与activeMQ进行整合application-activemq.xml这个配置文件如下,注释也比较全,其中要注意,我们不能直接使用activeMQ的factoryBean,得来spring-jms提供的factoryBean来包装,另外,jmsTemplate这个模板帮我们封装了session的创建和销毁,我们只需要调用它的send方法,指定destination和message就行,类似Hibernate提供的template。而我们自定义的消息监听器会放置在监听器容器中,监听器类一被spring加载就可以对指定队列/主题发送的消息进行监听了。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<!-- 开启注解,如果同时需要扫描包,则使用component-scan代替<context:annotation-config/> -->
<!--<context:component-scan base-package=""/>-->
<context:annotation-config/>

<!-- jms提供的连接池,承接作用 -->
<bean id="jmsConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
</bean>

<!-- activeMQ的连接工厂对象 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://10.101.123.124:61616"/>
</bean>

<!-- 指向activeMQ服务器端名为spring-queue的队列,按需可配置多个 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="spring-queue"/>
</bean>

<!-- 指向activeMQ服务器端名为spring-topic的队列,按需可配置多个 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="spring-topic"/>
</bean>

<!-- 模板,自动管理session,线程安全 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<!-- 不建议把发送目的地写死,在service方法里指定 -->
<!--<property name="defaultDestination" ref="queueDestination"/>-->
</bean>

<!-- 自定义的生产者业务类,用来向队列发送消息 -->
<bean id="producerService" class="activemq_spring.ProducerService"/>

<!-- 自定义的发布者业务类,用来向主题发送订阅消息 -->
<bean id="publisherService" class="activemq_spring.PublisherService"/>

<!-- jms消息监听容器,这个类一加载就对队列进行消息监听 -->
<bean id="jmsMessageListenerContainerForQueue" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="consumerMessageListenerForQueue"/>
</bean>

<!-- jms消息监听容器,这个类一加载就对主题(订阅)进行消息监听 -->
<bean id="jmsMessageListenerContainerForTopic" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destination" ref="topicDestination"/>
<property name="messageListener" ref="consumerMessageListenerForTopic"/>
</bean>

<!-- 自定义的队列消息监听类,包含消息处理逻辑 -->
<bean id="consumerMessageListenerForQueue" class="activemq_spring.ConsumerMessageListenerForQueue"/>

<!-- 自定义的主题(订阅)消息监听类,包含消息处理逻辑 -->
<bean id="consumerMessageListenerForTopic" class="activemq_spring.ConsumerMessageListenerForTopic"/>

</beans>
package activemq_spring;import javax.jms.*;/** * writer: holien * Time: 2017-09-13 12:42 * Intent: 自定义的消息监听器类,包含消息处理逻辑 */public class ConsumerMessageListenerForQueue implements MessageListener {    @Override    public void onMessage(Message message) {        TextMessage textMessage = (TextMessage)message;        try {            System.out.println("receive from queue:" + textMessage.getText());        } catch (JMSException e) {            e.printStackTrace();        }    }}
package activemq_spring;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * writer: holien * Time: 2017-09-13 12:42 * Intent: 自定义的主题(订阅)消息监听器类,包含消息处理逻辑 */public class ConsumerMessageListenerForTopic implements MessageListener {    @Override    public void onMessage(Message message) {        TextMessage textMessage = (TextMessage)message;        try {            System.out.println("receive from topic:" + textMessage.getText());        } catch (JMSException e) {            e.printStackTrace();        }    }}
package activemq_spring;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import javax.annotation.Resource;import javax.jms.Destination;/** * writer: holien * Time: 2017-09-12 23:23 * Intent: 向activeMQ队列发送消息的生产者业务类 */public class ProducerService {    @Resource(name = "jmsTemplate")    private JmsTemplate jmsTemplate;    @Resource(name = "queueDestination")    private Destination queueDestination;    public void sendMessage(String message) {        // 指定目的地,在匿名内部类MessageCreator里使用session创建消息并返回给jmsTemplate去发送        jmsTemplate.send(queueDestination, (session) -> {            return session.createTextMessage(message);        });    }}
package activemq_spring;import org.springframework.jms.core.JmsTemplate;import javax.annotation.Resource;import javax.jms.Destination;/** * writer: holien * Time: 2017-09-12 23:23 * Intent: 向activeMQ主题发送消息的发布者业务类 */public class PublisherService {    @Resource(name = "jmsTemplate")    private JmsTemplate jmsTemplate;    @Resource(name = "topicDestination")    private Destination topicDestination;    public void sendMessage(String message) {        // 指定目的地,在匿名内部类MessageCreator里使用session创建消息并返回给jmsTemplate去发送        jmsTemplate.send(topicDestination, (session) -> {            return session.createTextMessage(message);        });    }}
package activemq_spring;import org.springframework.context.support.ClassPathXmlApplicationContext;/** * writer: holien * Time: 2017-09-11 15:19 * Intent: 启动类 */public class AppStart {    public static void main(String[] args) {        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application-activemq.xml");        ProducerService producerService = context.getBean(ProducerService.class);        PublisherService publisherService = context.getBean(PublisherService.class);        // 向队列发送消息        // 监听器类也配置在application-activemq.xml中,所以会即刻接收消息        for (int i = 0; i < 20; i++) {            producerService.sendMessage("queueMessage" + i);        }        System.out.println("-------------------------------------");        // 向主题发送消息        for (int i = 0; i < 20; i++) {            publisherService.sendMessage("topicMessage" + i);        }        context.close();    }}

学习一种消息队列需要投入很多时间,很多特性也没有涉足,还需加强,java体系真的很庞大,各种组件,redis的作者正在开发disque,一种分布式内存消息队列,学无止境啊...