ActiveMQ

时间:2024-03-02 20:10:41

Queue与Topic比较
在这里插入图片描述

发送消息

public class Sender{
	
	public static void main(String[] args){
	
		Sender sender = new Sender();
		
		String msg = "Hello World!";
		
		sender.sendMessage(msg);
		System.out.println("发送消息结束:" + msg);
	}

	public void sendMessage(String msg){
		
		String user = ActiveMQConnection.DEFAULT_USER;
		String password = ActiveMQConnection.DEFAULT_PASSWORD;

		String url = AcbiveMQConnection.DEFAULT_BROKER_URL;
		String subject = "TOOL.DEFAULT";

		//1.初始化连接工厂
		ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user, password, url);
		try{
			//2.创建连接
			Connection connection = contectionFactory.createConnection();
			connection.start();

			//3.创建会话
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//4.打开队列
			Destination destination = session.createQueue(subject);

			//5. MessageProducer负责发送消息
			MessageProducer producer = session.createProducer(destination);
			TextMessage message = session.createTextMessage();

			for(int i = 0;i < 10;i++){
				String tmp = i + ":" + msg;
				message.setStringProperty("hello",tmp);

				//6.发送消息
				producer.send(message);
				System.out.println("send:" + tmp);
				Thread.sleep(3000);
				//只有在commit之后,消息才会进入队列
				session.commit();

				//7.关闭会话和连接
				session.close();
				conneciton.close();
			}
		}catch(JMSException e){
			e.printStackTrace();
		}catch(InterruptedException e){
			e.printStackTrace();
		}
	}
}

通过Queue接收消息

public class Receiver{
	
	public static void main(String[] args){
		String user = ActiveMQConnection.DEFAULT_USER;
	    String password = ActiveMQConnection.DEFAULT_PASSWORD;
	    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	    String subject = "TOOL.DEFAULT";

		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
		Connection connection;
		try{
			connection = connectionFactory.createConnection();
      		connection.start();

			final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue(subject);

			// MessageConsumer负责接受消息
			MessageConsumer consumer = session.createConsumer(destination);
			consumer.setMessageListener(new MessageListener(){
				
				public void onMessage(Message msg){
					TextMessage message = (TextMessage)msg;
					try{
						String hello = message.getStringProperty("hello");
            			System.out.println("收到消息:\t" + hello);
            			session.commit();
					}catch(JMSException e){
						e.printStackTrace();
					}
				}
			});
			session.close();
			connection.close();
		}catch(JMSException e){
			e.printStackTrace();
		}
	}
}

通过Topic发布消息

public class Publisher{
	
	public static void main(String[] args){
		Publisher pb = new Publisher();
    	String msg = "Hello World!~~~~~";
    	pb.sendMessage(msg);
    	System.out.println("发送消息结束:" + msg);
	}

	public void sendMessage(String msg){
		
		String user = ActiveMQConnection.DEFAULT_USER;
    	String password = ActiveMQConnection.DEFAULT_PASSWORD;

		String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    	String subject = "MQ.TOPIC";

		ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user, password, url);
		try{
			Connection connection = contectionFactory.createConnection();
      		connection.start();
      		Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			
			//创建要发布的主题,和Queue的区别就在此
			Destination destination = session.createTopic(subject);
			MessageProducer producer = session.createProducer(destination);
			TextMessage message = session.createTextMessage();
      		message.setStringProperty("hello", msg);

			producer.send(message);//发送消息

			session.commit();
		    session.close();
		    connection.close();
		}catch(JMSException e){
			e.printStackTrace();
		}
	}
}

通过Topic订阅消息

public class Subscriber{
	
	public static void main(String[] args){
		String user = ActiveMQConnection.DEFAULT_USER;
	    String password = ActiveMQConnection.DEFAULT_PASSWORD;
	    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	    String subject = "MQ.TOPIC";
	    
	    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
	    Connection connection;
	    try{
	    	connection = connectionFactory.createConnection();
	        connection.start();
	        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
	        Topic topic = session.createTopic(subject);
	        // MessageConsumer负责接受消息
	        MessageConsumer consumer = session.createConsumer(topic);
	        consumer.setMessageListener(new MessageListener(){
	        	
	        	public void onMessage(Message msg) {
	        	
		          TextMessage message = (TextMessage) msg;
		          try {
		            String hello = message.getStringProperty("hello");
		            System.out.println("订阅者---SecondSubscriber---收到消息:\t" + hello);
		            session.commit();
		          } catch (JMSException e) {
		            e.printStackTrace();
		          }
		        }
	        });
	        session.close();
	        connection.close();
	    }catch(JMSException e){
	    	e.printStackTrace();
	    }
	}
}

Spring JMS收发消息

一、依赖

<dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
		<version>4.12</version>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-all</artifactId>
		<version>5.11.0</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-jms</artifactId>
		<version>4.1.4.RELEASE</version>
	</dependency>
	<dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-test</artifactId>  
        <version>4.1.4.RELEASE</version>  
    </dependency> 

二、队列(Queue)消息的收发

点对点消息,如果没有消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。
在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得(拉模式)

①、配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">

	<!-- 配置JMS连接工厂 -->
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="failover:(tcp://localhost:61616)" />
	</bean>
	
	<!-- 定义消息队列(Queue) -->
	<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<!-- 设置消息队列的名字 -->
		<constructor-arg>
			<value>queue1</value>
		</constructor-arg>
	</bean>
	
	<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="defaultDestination" ref="queueDestination" />
		<property name="receiveTimeout" value="10000" />
	</bean>
	
	<!--queue消息生产者 -->
	<bean id="producerService" class="com.yoodb.mq.queue.ProducerServiceImpl">
		<property name="jmsTemplate" ref="jmsTemplate"></property>
	</bean>

	<!--queue消息消费者 -->
	<bean id="consumerService" class="com.yoodb.mq.queue.ConsumerServiceImpl">
		<property name="jmsTemplate" ref="jmsTemplate"></property>
	</bean>
</beans>

②、消息生产者

public class ProducerServiceImpl implements ProducerService{
	
	private JmsTemplate jmsTemplate;

	//向指定队列发送消息
	public void sendMessage(Destination destination,final Stringmsg){
		
		System.out.println("向队列" + destination.toString() + "发送了消息___" + msg);
		jmsTemplate.send(destination,new MessageCreator(){
			public Message createMessage(Session session)throws JMSException{
				return session.createTextMessage(msg);
			}
		});
	}

	//向默认队列发送消息
	publc void sendMessage(final String msg){
		String destination =  jmsTemplate.getDefaultDestination().toString();
		System.out.println("向队列" +destination+ "发送了消息___" + msg);

		jmsTemplate.send(new MessageCreator(){
			public Message createMesssage(Session session)throws JMSException{
				return session.createTextMessage(msg);
			}
		});
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate){
		this.jmsTemplate = jmsTemplate;
	}
}

③、消息消费者代码

public class ConsumerServiceImpl implements ConsumerService{

	private JmsTemplate jsmTemplate;

	//接收消息
	public void receive(Destination destination){
		TextMessage tm = (TextMessage)jmsTemplate.receive(destination);
		try{
			System.out.println("从队列" + destination.toString() + "收到消息" + tm.getText());	
		}catch(JMSException e){
			e.printStackTrace();
		}
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate){
		this.jmsTemplate = jmsTemplate;
	}
}

队列消息监听,在接受消息的时候,可以不用消息消费者代码的方式,Spring JMS同样提供了消息监听的模式,对应的配置和代码内容。

<!-- 定义消息队列(Queue),我们监听一个新的队列,queue2 -->
<bean id="queueDestination2" class="org.apache.activemq.command.ActiveMQQueue">
	<!-- 设置消息队列的名字 -->
	<constructor-arg>
		<value>queue2</value>
	</constructor-arg>
</bean>

<!-- 配置消息队列监听者(Queue),代码下面给出,只有一个onMessage方法 -->
<bean id="queueMessageListener" class="com.yoodb.mq.queue.QueueMessageListener" />

<!-- 消息监听容器(Queue),配置连接工厂,监听的队列是queue2,监听器是上面定义的监听器 -->
<bean id="jmsContainer"
	class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory" />
	<property name="destination" ref="queueDestination2" />
	<property name