Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型

时间:2023-03-09 21:59:14
Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型

前言:本文是基于Spring和ActiveMQ的一个示例文章,包括了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,只是做了比较简单的实现,无任何业务方面的东西,作为一个入门教程。

适合对象:希望学习ActiveMQ的朋友,以及利用Spring将ActiveMQ集成到系统中

目录结构

Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型

Maven依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<!-- Jar版本管理 -->
    <properties>
        <springframework>4.0.2.RELEASE</springframework>
        <log4j>1.2.17</log4j>
        <activemq>5.9.0</activemq>
    </properties>
    <dependencies>
        <!-- Spring web mvc -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${springframework}</version>
        </dependency>
        <!-- 提供JMS,Freemarker,Quartz集成服务 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${springframework}</version>
        </dependency>
         
        <!-- 集成JMS -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${springframework}</version>
        </dependency>
         
        <!-- xbean 如<amq:connectionFactory /> -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
        <!-- log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j}</version>
        </dependency>
        <!-- Active MQ -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>${activemq}</version>
        </dependency>
        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

jar包截图

Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型

web.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
<?xml version="1.0" encoding="UTF-8"?>
<!-- 通过http://java.sun.com/xml/ns/javaee/获取最新的schemaLocation -->
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    version="3.0">
    <display-name>SpringActivemqServer</display-name>
    <!-- WebAppRootKey -->
    <context-param>
        <param-name>webAppRootKey</param-name>
        <param-value>example.SpringActivemqServer</param-value>
    </context-param>
    <!-- Log4J Start -->
    <context-param>
        <param-name>log4jConfigLocation</param-name>
        <param-value>classpath:log4j.properties</param-value>
    </context-param>
    <context-param>
        <param-name>log4jRefreshInterval</param-name>
        <param-value>6000</param-value>
    </context-param>
    <!-- Spring Log4J config -->
    <listener>
        <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
    </listener>
    <!-- Log4J End -->
    <!-- Spring 编码过滤器 start -->
    <filter>
        <filter-name>characterEncoding</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>characterEncoding</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>
    <!-- Spring 编码过滤器 End -->
    <!-- Spring Application Context Listener Start -->
    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath*:applicationContext.xml,classpath*:ActiveMQ.xml</param-value>
    </context-param>
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
    <!-- Spring Application Context Listener End -->
    <!-- Spring MVC Config Start -->
    <servlet>
        <servlet-name>SpringMVC</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:spring-mvc.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>SpringMVC</servlet-name>
        <!-- Filter all resources -->
        <url-pattern>/</url-pattern>
    </servlet-mapping>
    <!-- Spring MVC Config End -->
</web-app>

applicationContext.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd">
      
     <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
      
     <!-- 配置扫描路径 -->
     <context:component-scan base-package="org.xdemo.example">
       <!-- 只扫描Service,也可以添加Repostory,但是要把Controller排除在外,Controller由spring-mvc.xml去加载 -->
       <!-- <context:include-filter type="annotation" expression="org.springframework.stereotype.Service" /> -->
       <!-- <context:include-filter type="annotation" expression="org.springframework.stereotype.Repository" /> -->
       <!-- <context:include-filter type="annotation" expression="org.springframework.stereotype.Component" /> -->
       <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
     </context:component-scan>
</beans>

spring-mvc.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0" encoding="UTF-8"?>  
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"   
       xmlns:aop="http://www.springframework.org/schema/aop"   
       xmlns:context="http://www.springframework.org/schema/context"  
       xmlns:mvc="http://www.springframework.org/schema/mvc"   
       xmlns:tx="http://www.springframework.org/schema/tx"   
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
       xsi:schemaLocation="http://www.springframework.org/schema/aop   
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd   
        http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd   
        http://www.springframework.org/schema/mvc   
        http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd   
        http://www.springframework.org/schema/tx   
        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">  
   
      <!-- 启用MVC注解 -->
    <mvc:annotation-driven />
     
    <!-- 静态资源文件,不会被Spring MVC拦截 -->
    <mvc:resources location="/resources/" mapping="/resources/**"/>
     
    <!-- 指定Sping组件扫描的基本包路径 -->
    <context:component-scan base-package="org.xdemo.example" >
        <!-- 这里只扫描Controller,不可重复加载Service -->
        <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
    </context:component-scan>
     
      <!-- JSP视图解析器-->
    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">  
        <property name="prefix" value="/WEB-INF/views/" />  
        <property name="suffix" value=".jsp" />
        <property name="order" value="1" />
    </bean>
     
     
</beans>

ActiveMQ.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>
     
    <!-- ====Producer side start====-->
     
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>
     
    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
    </bean>
     
    <!-- ====Producer side end====-->
     
    <!-- ====Consumer side start====-->
     
    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="queueReceiver"/>
        <jms:listener destination="test.queue" ref="queueReceiver2"/>
    </jms:listener-container>
     
    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="topicReceiver"/>
        <jms:listener destination="test.topic" ref="topicReceiver2"/>
    </jms:listener-container>
     
    <!-- ====Consumer side end==== -->
</beans>

队列消息生产者

QueueSender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package org.xdemo.example.SpringActivemq.mq.producer.queue;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午9:40:24
 * @描述 发送消息到队列
 */
@Component
public class QueueSender {
     
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean
     
    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
     
}

主题(Topic)消息生产者TopicSender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package org.xdemo.example.SpringActivemq.mq.producer.topic;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午9:40:32
 * @描述 发送消息到主题
 */
@Component
public class TopicSender {
     
    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;
     
    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String topicName,final String message){
        jmsTemplate.send(topicName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

消费者

队列消费者QueueReceiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 
 */
package org.xdemo.example.SpringActivemq.mq.consumer.queue;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午10:11:51
 * @描述 队列消息监听器
 */
@Component
public class QueueReceiver implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText());
        catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

QueueReceiver2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 
 */
package org.xdemo.example.SpringActivemq.mq.consumer.queue;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午10:11:51
 * @描述 队列消息监听器
 */
@Component
public class QueueReceiver2 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText());
        catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

主题(Topic)消费者

TopicReceiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 
 */
package org.xdemo.example.SpringActivemq.mq.consumer.topic;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午10:13:47
 * @描述 Topic消息监听器
 */
@Component
public class TopicReceiver implements MessageListener{
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("TopicReceiver1接收到消息:"+((TextMessage)message).getText());
        catch (JMSException e) {
            e.printStackTrace();
        }
    }
     
}

TopicRecever.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 
 */
package org.xdemo.example.SpringActivemq.mq.consumer.topic;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午10:13:47
 * @描述 Topic消息监听器
 */
@Component
public class TopicReceiver2 implements MessageListener{
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("TopicReceiver2接收到消息:"+((TextMessage)message).getText());
        catch (JMSException e) {
            e.printStackTrace();
        }
    }
     
}

控制器Controller代码

ActivemqController.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
 
 */
package org.xdemo.example.SpringActivemq.controller;
import javax.annotation.Resource;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.xdemo.example.SpringActivemq.mq.producer.queue.QueueSender;
import org.xdemo.example.SpringActivemq.mq.producer.topic.TopicSender;
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午10:54:11
 * @描述 测试 
 */
@Controller
@RequestMapping("/activemq")
public class ActivemqController {
     
    @Resource QueueSender queueSender;
    @Resource TopicSender topicSender;
     
    /**
     * 发送消息到队列
     * @param message
     * @return String
     */
    @ResponseBody
    @RequestMapping("queueSender")
    public String queueSender(@RequestParam("message")String message){
        String opt="";
        try {
            queueSender.send("test.queue", message);
            opt="suc";
        catch (Exception e) {
            opt=e.getCause().toString();
        }
        return opt;
    }
     
    /**
     * 发送消息到主题
     * @param message
     * @return String
     */
    @ResponseBody
    @RequestMapping("topicSender")
    public String topicSender(@RequestParam("message")String message){
        String opt="";
        try {
            topicSender.send("test.topic", message);
            opt="suc";
        catch (Exception e) {
            opt=e.getCause().toString();
        }
        return opt;
    }
     
}

前端页面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<%
    String path = request.getContextPath();
    String basePath = request.getScheme() + "://"
            + request.getServerName() + ":" + request.getServerPort()
            + path + "/";
%>
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
<base href="<%=basePath%>">
<title>ActiveMQ Demo程序</title>
<meta http-equiv="pragma" content="no-cache">
<meta http-equiv="cache-control" content="no-cache">
<meta http-equiv="expires" content="0">
<script type="text/javascript" src="<%=basePath%>resources/jquery-1.11.0.min.js"></script>
<style type="text/css">
.h1 {
    margin: 0 auto;
}
#producer{
    width: 48%;
    border: 1px solid blue;
    height: 80%;
    align:center;
    margin:0 auto;
}
body{
    text-align :center;
div {
    text-align :center;
}
textarea{
    width:80%;
    height:100px;
    border:1px solid gray;
}
button{
    background-color: rgb(6215666);
    border: none;
    font-weight: bold;
    color: white;
    height:30px;
}
</style>
<script type="text/javascript">
     
    function send(controller){
        if($("#message").val()==""){
            $("#message").css("border","1px solid red");
            return;
        }else{
            $("#message").css("border","1px solid gray");
        }
        $.ajax({
            type: 'post',
            url:'<%=basePath%>activemq/'+controller,
            dataType:'text'
            data:{"message":$("#message").val()},
            success:function(data){
                if(data=="suc"){
                    $("#status").html("<font color=green>发送成功</font>");
                    setTimeout(clear,1000);
                }else{
                    $("#status").html("<font color=red>"+data+"</font>");
                    setTimeout(clear,5000);
                }
            },
            error:function(data){
                $("#status").html("<font color=red>ERROR:"+data["status"]+","+data["statusText"]+"</font>");
                setTimeout(clear,5000);
            }
             
        });
    }
     
    function clear(){
        $("#status").html("");
    }
</script>
</head>
<body>
    <h1>Hello ActiveMQ</h1>
    <div id="producer">
        <h2>Producer</h2>
        <textarea id="message"></textarea>
        <br>
        <button onclick="send('queueSender')">发送的Queue</button>
        <button onclick="send('topicSender')">发送的Topic</button>
        <br>
        <span id="status"></span>
    </div>
</body>
</html>

运行截图:

Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型

Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型

消息发送采用的是Spring提供的JmsTemplate,采用的是异步操作的方式,将消息发布到队列中,消费者可以随时接受这条消息。

从上图可以看出队列模型和PUB/SUB模型的区别,Queue只能由一个消费者接收,其他Queue中的成员无法接受到被已消费的信息,而Topic则可以,只要是订阅了Topic的消费者,全部可以获取到生产者发布的信息。

如果有不懂的在下面问我。

备注:原文地址http://www.xdemo.org/spring-activemq-pub-sub/

代码我分享到了开源中国上,链接地址:

https://git.oschina.net/jiafuwei0407/Hello

通过maven把它build成一个IDE项目,执行以下命令,打开CMD:

$ cd SpringActiveMQ 
$ mvn eclipse:eclipse or mvn idea:idea

我在原作者的基础上增加了本地ActiveMQ的访问,就是不需要启动ActiveMQ服务器,就可以发生消息

Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型

下面这段配置 ,增加了自定义消息的配置以及自定义消息的转换器,代码中都有注释。

Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型