Activemq使用教程
解压activmq进入bin\win64 启动activemq.bat
启动成功
浏览器访问http://127.0.0.1:8161
创建maven工程
在pom.xml中添加依赖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.8</version> </dependency> </dependencies>
创建创建者和发布者
Producer 代码
package com.td.active; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class producer { public static void main(String[] args) throws JMSException { //1、创建工厂连接对象,需要指定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(queue); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-queue"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); } }
consumer 代码
public class consumer { public static void main(String[] args) throws JMSException, IOException { //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(queue); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); } }
开启生产者生产消息
如果出现以下路径错误
生产者启动成功生成一条消息在浏览器中可以看到
启动消费者
ActiveMQ整合spring及项目中运用
activeMQ与spring看一整合到一起使用,除了添加ActiveMQ相关的jar包外,还需要添加spring的jar包:
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </dependency>
然后编写applicationContext-activemq.xml文件,
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 --> <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <constructor-arg name="brokerURL" value="tcp://127.0.0.1:61616"/> </bean> <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="tagertConnectionFactory"/> </bean> <!-- 配置生产者 --> <!-- Spring使用JMS工具类,可以用来发送和接收消息 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这里是配置的spring用来管理connectionfactory的connectionfactory --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 配置destination --> <!-- 队列目的地 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="spring-queue"/> </bean> <!-- 话题目的地 --> <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="item-add-topic"/> </bean> </beans>
在使用的类中注入模板来使用
@Autowired private JmsTemplate jmsTemplate; @Resource(name="itemAddTopic") private Destination destination;
发送消息的示例
发送消息列表
public void addUser(){ //第一个参数目的地 可以是队列的名称spring-queue 也可以是ip //第二个参数是发送消息的对象 jmsTemplate.send("spring-queue", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("要发送的消息"); } }); }
发送主题
try { Topic topic = jmsTemplate.getConnectionFactory().createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createTopic("item-add-topic"); jmsTemplate.send(topic, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("要发送的消息"); } }); } catch (JMSException e) { e.printStackTrace(); }
消费者项目
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 --> <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <constructor-arg name="brokerURL" value="tcp://127.0.0.1:61616"/> </bean> <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="tagertConnectionFactory"/> </bean> <!-- 配置destination --> <!-- 队列目的地 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="spring-queue"/> </bean> <!-- 话题目的地 --> <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="item-add-topic"/> </bean> <!-- 配置监听器 --> <bean id="myListener" class="com.td.active.MyListener"/> <bean id="itemAddListener" class="com.td.active.ItemAddListener"/> <!-- 配置系统监听器 消息列表 --> <!-- <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="myListener"/> </bean> --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="itemAddTopic"/> <property name="messageListener" ref="itemAddListener"/> </bean> </beans>
通过配置监听器实现接收消息
列表监听器
public class MyListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage testMessage = (TextMessage) message; String text = testMessage.getText(); System.out.println("接收到消息 = " + text); } catch (JMSException e) { e.printStackTrace(); } } }
主题监听器
public class ItemAddListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage testMessage = (TextMessage) message; String text = testMessage.getText(); System.out.println("接收到消息 = " + text); } catch (JMSException e) { e.printStackTrace(); } } }