ActiveMQ消息中间件

时间:2023-01-25 18:47:21

1 为什么要使用消息中间件

  说到消息中间件,就不得不说它最具代表性的三个特点,那就是解耦、异步和削峰。

  如果在系统中用户成功登录系统之后,此时会调用短信服务,通知用户登录成功,然后再调用积分系统,给用户的账号增加积分,接着再调用日志系统,把登录的行为保存日志中,如果在调用某一个系统的时候调用失败,那么可能造成整个业务操作回滚,这种情况就是多个系统之间耦合度太高导致的。像前面说的这种情况,无形之中会增加用户登录的耗时时间,因为用户并不想了解登录的时候系统到底调用了哪些服务,而只是单纯的想登录系统,所以,这个时候就需要加上消息中间件,当用户登录的时候,把登录的消息放到消息中间件里,然后消息中间件返回给客户端登录是否成功的结果,接着消息中间件会异步的把消息发送给需要调用的系统,这样就可以节省下来许多等待的时间。

2 JMS规范

  定义:中文全称为Java消息服务,是一个Java平台中面向消息中间件的API,用于在两个系统之间或者是分布式系统中发送或接受消息,实现异步通信。

  相关概念:

    提供者:实现JMS规范的消息中间件服务器

    客户端:发送或接受消息的应用程序

    生产者/发布者:创建并发送消息的客户端

    消费者/订阅者:接受并处理消息的客户端

    消息:应用程序之间传递的数据内容

    消息模式:在客户端之间传递消息的方式,JMS定义了主题(发布/订阅)和队列(点对点)两种模式

      队列模式:① 客户端包括生产者和消费者 ② 队列中的消息只能被一个消费者所消费 ③  消费者可以随时消费队列中的消息

      主题模式: ①客户端包括发布者和订阅者 ② 主题中的消息可以被多个消费者消费 ③ 消费者不能消费订阅之前就发送到主题中的消息

    编码接口:①ConnectionFactory:用于创建连接到消息中间件的连接工厂② Connection:代表应用程序和消息服务器之间的应用链路 ③ Destination:指消息发布和接收的地点,包括队列和主题④Session:表示一个单线程的上下文,用于发送和接收消息

3 队列模式

ActiveMQ消息中间件ActiveMQ消息中间件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.huyj.spring-jms</groupId>
    <artifactId>spring-jms</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spring.version>4.2.5.RELEASE</spring.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
            <!--排除activemq-core中引入的spring-context-->
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
</project>
pom.xml
ActiveMQ消息中间件ActiveMQ消息中间件
         //1.创建ConnectionFactory
        ConnectionFactory connetionFactory = new ActiveMQConnectionFactory(url);
        //2.创建connection
        Connection connection = connetionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话
        //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目标
        Destination destination = session.createQueue(queueName);
        //6.创建一个生产者
        MessageProducer messageProducer = session.createProducer(destination);
        //设置生产者的模式,有两种可选,默认情况下是支持持久化的
        //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
        //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
        //messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for(int i=0;i<100;i++){
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("test"+i);
            //8.发布消息
            messageProducer.send(textMessage);
            logger.info("队列消息发送:"+textMessage.getText());
        }
        //9.关闭连接
        connection.close();        
生产者代码 
ActiveMQ消息中间件ActiveMQ消息中间件
        //1.创建ConnectionFactory
        ConnectionFactory connetionFactory = new ActiveMQConnectionFactory(url);
        //2.创建connection
        Connection connection = connetionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目标
        Destination destination = session.createQueue(queueName);
        //6.创建一个生产者
        MessageConsumer messageConsumer = session.createConsumer(destination);
        //7.创建一个监听器
        // 消息的监听是一个异步的过程
        messageConsumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try{
                    logger.info("接收消息:"+textMessage.getText());
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        });
        //8.关闭连接
        connection.close();
消费者代码

4 主题模式

  只需要修改创建目标处的代码:session.createQueue改为session.createTopic即可

5 Spring集成JMS连接ActiveMQ

ActiveMQ消息中间件ActiveMQ消息中间件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-4.0.xsd">

    <!--开启注解的配置 针对类似@Resource这样的注解-->
    <context:annotation-config/>

    <!--ActiveMQ为我们提供的ConnectionFactory-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.112.141:61616"/>
    </bean>
    <!--spring jms为我们提供的连接池-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
    </bean>

    <!--一个队列目的地,点对点的-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue-spring"/>
    </bean>
    <!--一个主题目的地,发布订阅模式-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic-spring"/>
    </bean>

</beans>
公共xml配置
ActiveMQ消息中间件ActiveMQ消息中间件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

<import resource="common.xml"/>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean class="com.huyj.jms.producer.impl.ProducerServiceImpl"/>
</beans>
生产者xml配置
ActiveMQ消息中间件ActiveMQ消息中间件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!--导入公共配置-->
    <import resource="common.xml"/>
    <!--配置消息监听器-->
    <bean id="consumerMessageListener" class="com.huyj.jms.consumer.ConsumerMessageListener"/>
    <!--配置消息监听容器-->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="queueDestination"/>
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>
</beans>
消费者xml配置
ActiveMQ消息中间件ActiveMQ消息中间件
public class ProducerService{

    @Autowired
    private JmsTemplate jmsTemplate;  //用于发送和接收消息的模板
    @Resource(name = "queueDestination")
    private Destination destination;

    public void sendMessage(final String message) {
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
        System.out.println("发送消息:"+message);
    }
}
生产者消息发送
ActiveMQ消息中间件ActiveMQ消息中间件
public class AppProducer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        ProducerService service = context.getBean(ProducerService.class);
        for(int i=0;i<100;i++){
            service.sendMessage("test"+i);
        }
        context.close();
    }
}
发送消息主程序
ActiveMQ消息中间件ActiveMQ消息中间件
public class ConsumerMessageListener implements MessageListener {
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("接收消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}
消费者接收消息
ActiveMQ消息中间件ActiveMQ消息中间件
public class AppConsumer {
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    }
}
接收消息主程序