
Apache ActiveMQ是最流行和最强大的开源消息集成模式服务器。
Apache ActiveMQ是速度快,支持多跨语言的客户端和协议,带有易于使用企业集成模式和许多先进的功能在充分支持JMS 1.1和J2EE 1.4。ActiveMQ是Apache下发布Apache 2许可证。
Apache ActiveMQ主要用于模块应用数据交互和分布式应用,支持消息队列,消息发布/订阅,用于异步和服务器交换数据
code案例:
properties 文件配置
# activeMq 地址 端口
jms_url = tcp://127.0.0.1:61616 #队列名称
jms_test_monitor_data_queue = test_queue
spring xml 配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd
">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>classpath:propertiesConfig/test.properties</value>
</property>
</bean> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg index="0" value="${jms_url}"/>
<property name="useAsyncSend" value="true"/>
</bean> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="20" />
</bean> <!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean> <!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean> <!-- 消息消费者 start--> <!-- 定义Queue监听器 -->
<!--<bean id="testMessageReceiver" class="com.maven.project.web.jmsMessageOper.TestMonitorQueue"/>
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="${jms_test_monitor_data_queue}" ref="testMessageReceiver"/>
</jms:listener-container> --> <!-- 定义Topic监听器 -->
<!--<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="topicReceiver1"/>
<jms:listener destination="test.topic" ref="topicReceiver2"/>
</jms:listener-container> --> <!-- 定义activme 队列监听 -->
<!-- 对应的监听类, com.maven.project.web.jmsMessageOper.TestMonitorQueue -->
<bean id="testMessageReceiver" class="com.maven.project.web.jmsMessageOper.TestMonitorQueue"/>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="${jms_test_monitor_data_queue}"/>
<property name="messageListener" ref="testMessageReceiver"/>
</bean> <!-- 消息消费者 end -->
</beans>
activemq 监听类
package com.maven.project.web.jmsMessageOper; import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage; public class TestMonitorQueue implements MessageListener { public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("============"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
activemq 消息发送类
package com.maven.project.web.jmsMessageOper; import java.io.Serializable;
import java.util.Map; import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.StreamMessage; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Controller; @Controller
public class MessageSender { @Autowired
private JmsTemplate jmsQueueTemplate; public void sendTextMessage(final String queueName, final String txtMessage) {
jmsQueueTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(txtMessage);
}
});
} public void sendObjectMessage(final String queueName, final Object objectMessage) {
jmsQueueTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage((Serializable) objectMessage);
}
});
} public void sendMapMessage(final String queueName, final Map<String, Object> mapMessage) {
jmsQueueTemplate.send(queueName, new MessageCreator() {
@SuppressWarnings("unchecked")
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
for (Map.Entry<String, Object> entry : ((Map<String, Object>) mapMessage).entrySet()) {
mapMessage.setObject(entry.getKey(), entry.getValue());
}
return mapMessage;
}
});
} public void sendByteMessage(final String queueName, final byte[] message) {
jmsQueueTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(message);
return bytesMessage;
}
});
} public void sendStreamMessage(final String queueName, final Object message) {
jmsQueueTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeObject(message);
return streamMessage;
}
});
}
}
消息发送调用
package com.maven.project.web.action; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import com.maven.project.tools.utils.SenderMessageQueueName;
import com.maven.project.web.jmsMessageOper.MessageSender; @Controller
@RequestMapping("/user")
public class UserLoginAction { @Autowired
private MessageSender messageSender; @RequestMapping("/login")
public void login(HttpServletRequest request,HttpServletResponse response){
messageSender.sendTextMessage("test_queue","消息发送内容"); // test_queue 消息队列名称
}
}
maven pom 配置
<!-- activemq相关依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>