ActiveMQ学习--002--Topic消息例子程序

时间:2024-08-23 18:33:50

一、非持久的Topic消息示例

注意 此种方式消费者只能接收到 消费者启动之后,发送者发送的消息。

发送者

package com.lhy.mq.helloworld;

import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class NoPersistenceTopicSender { public static void main(String[] args) throws Exception { //第一步:建立ConnectionFactory工厂对象。需要填入用户名、密码、连接地址,均使用默认即可,默认端口为"tcp://localhost:61616"
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"lhy","123456",
//ActiveMQConnectionFactory.DEFAULT_USER,
//ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://127.0.0.1:61616"); Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("NB-NB"); //队列名称 MessageProducer producer = session.createProducer(null);// // 第六步:可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)
//producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("我是消息内容 -333- "+i);
producer.send(destination, message); System.err.println("生产者发送消息:"+message.getText());
}
session.commit(); if(connection != null){
connection.close();
}
} }

接收者

package com.lhy.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class NoPersitenceTopicReceiver { public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"lhy","123456",
"tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("NB-NB"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive();
while(message != null){
TextMessage textMsg = (TextMessage)message;
System.err.println("消费消息:"+textMsg.getText());
//接收下一个消息
message = consumer.receive(1000L);
} //提交一下事务,否则不确认消息,消息不会出队列
session.commit();
session.close();
connection.close();
}
}

二、持久订阅例子程序

发送者

package com.lhy.mq.helloworld;

import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class PersistenceTopicSender { public static void main(String[] args) throws Exception { //第一步:建立ConnectionFactory工厂对象。需要填入用户名、密码、连接地址,均使用默认即可,默认端口为"tcp://localhost:61616"
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"lhy","123456",
"tcp://127.0.0.1:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("Persistence-Topic"); //队列名称
MessageProducer producer = session.createProducer(null);// //默认为持久订阅,注意这个一定在start之前设置
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start(); for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("我是消息内容 -666- "+i);
producer.send(destination, message); System.err.println("生产者发送-topic-消息:"+message.getText());
}
session.commit(); if(connection != null){
connection.close();
}
} }

消费者,可以有多个消费者

1, 消费者需要在Connection上设置消费者id,来识别消费者

2,需要创建TopicSubscriber 来订阅

3,设置好之后再start  这个Connection

4,一定要先运行一次消费者,来向ActiveMQ注册这个消费者,然后再运行发送消息,这样无论消费者是否在线,都会接收到消息。否则只能接收到注册之后的消息。

package com.lhy.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消费者需要先运行一次,向producer注册一下
* @author dell
*
*/
public class PersitenceTopicReceiver { public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"lhy","123456",
"tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
//设置消费者的id,向发送者先注册一下,producer就知道谁在订阅
connection.setClientID("client2"); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("Persistence-Topic");
TopicSubscriber consumer = session.createDurableSubscriber(destination, "T1");//创建一个持久订阅
//最后start
connection.start(); Message message = consumer.receive();
while(message != null){
TextMessage textMsg = (TextMessage)message;
System.err.println("消费消息:"+textMsg.getText());
//接收下一个消息
message = consumer.receive(1000L);
} //提交一下事务,否则不确认消息,消息不会出队列
session.commit();
session.close();
connection.close();
}
}

分别修改消费者的clientID为 client1、client2运行,相当于2个消费者。

管控台:2个消费者,

ActiveMQ学习--002--Topic消息例子程序

ActiveMQ学习--002--Topic消息例子程序

ActiveMQ学习--002--Topic消息例子程序