Spring与ActiveMQ整合

时间:2021-04-13 15:14:48

在实际的项目中如果使用原生的ActiveMQ API开发会比较麻烦,因为需要创建连接工厂,创建连接等,我们应该使用一个模板来做这些繁琐的事情,Spring帮我们做了!

Spring提供了对JMS的支持,需要添加Spring支持jms的包,如下:

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>

在spring-amq.xml中配置JmsTemplate(这样的配置没啥问题,在实际的项目中就是这样配置的)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> <context:component-scan base-package="com.winner.spring"/> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://192.168.0.129:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean> <!--使用缓存可以提升效率-->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="1"/>
</bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean> <!--测试Queue,队列的名字是spring-queue-->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!--<constructor-arg index="0" value="spring-queue"/>-->
<constructor-arg name="name" value="spring-queue"/>
</bean> <!--测试Topic-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean> </beans>

生产者

@Service
public class AMQSenderServiceImpl implements AmqSenderService { private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class); @Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate; //目的地队列的明证,我们要向这个队列发送消息
@Resource(name = "destinationQueue")
private Destination destination; //向特定的队列发送消息
@Override
public void sendMsg(MqParamDto mqParamDto) {
final String msg = JSON.toJSONString(mqParamDto);
try {
logger.info("将要向队列{}发送的消息msg:{}", destination, msg);
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
}); } catch (Exception ex) {
logger.error("向队列{}发送消息失败,消息为:{}", destination, msg);
} }
}

运行结果:

Spring与ActiveMQ整合

Spring与ActiveMQ整合

如果是Topic的话就换一下,下面的spring-topic是主题的名字。

<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic" />
</bean>

其他不用改!

如果想要在Spring中配置消费者的话,就不需要再启动接收的客户端了,这样在测试的时候可以不需要写消费者的代码,因为我们要么是生产者要么是消费者!

可以通过配置一个listener来实现,实际项目中采用的是这种方式

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://192.168.0.129:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean> <!--使用缓存可以提升效率-->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="1"/>
</bean> <!--测试Queue,队列的名字是spring-queue-->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue"/>
</bean> <!--测试Topic-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean> <bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="destinationQueue"/>
<property name="messageListener" ref="messageListener"/>
</bean> <!--消息监听器-->
<bean id="messageListener" class="com.winner.spring.MyMessageListener">
</bean> </beans>

监听器:

public class MyMessageListener implements MessageListener {

    @Override
public void onMessage(Message msg) {
if (msg instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) msg;
String message = txtMsg.getText();
//实际项目中拿到String类型的message(通常是JSON字符串)之后,
//会进行反序列化成对象,做进一步的处理
System.out.println("receive txt msg===" + message);
} catch (JMSException e) {
throw new RuntimeException(e);
}
} else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}

不需要写消费者的代码就可以知道消息有没有推送成功

Spring与ActiveMQ整合

ActiveMQ结合Spring开发最佳实践和建议:

1:Camel框架支持大量的企业集成模式,可以大大简化集成组件间的大量服务和复杂的消息流。而Spring框架更注重简单性,仅仅支持基本的最佳实践。
2:Spring消息发送的核心架构是JmsTemplate,隔离了像打开、关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑。但是
JmsTemplate损害了ActiveMQ的PooledConnectionFactory对session和消息producer的缓存机制而带来的性能提升。
3:新的Spring里面,可以设置org.springframework.jms.connection.CachingConnectionFactory的sessionCacheSize,或者干脆使用ActiveMQ的PooledConnectionFactory
4:不建议使用JmsTemplate的receive()调用,因为在JmsTemplate上的所有调用都是同步的,这意味着调用线程需要被阻塞,直到方法返回,这对性能影响很大
5:请使用DefaultMessageListenerContainer,它允许异步接收消息并缓存session和消息consumer,而且还可以根据消息数量动态的增加或缩减监听器的数量