ActiveMQ很好的支持了消息的持久性。
消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送。
消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查制定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。
对此,我做了如下测试:
一、对Queue类型的持久化测试
环境:
1、新建配置文件activemq-persistence.xml
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:///${activemq.base}/conf/credentials.properties</value> </property> </bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data">
<managementContext> <managementContext createConnector="false"/> </managementContext>
<persistenceAdapter> <amqPersistenceAdapter syncOnWrite="false" directory="${activemq.base}/mydata" maxFileLength="2kb"/> </persistenceAdapter> <!-- The transport connectors ActiveMQ will listen to --> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616"/> </transportConnectors>
</broker> |
2、测试类
public class Publisher {
private static String brokerURL = "tcp://localhost:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; private transient MessageProducer producer;
private static int count = 10; private static int total; private static int id = 1000000;
private String jobs[] = new String[]{"suspend", "delete"}; private String username = "publisher"; private String password = "password";
public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(username,password); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); }
public void close() throws JMSException { if (connection != null) { connection.close(); } }
public static void main(String[] args) throws JMSException { Publisher publisher = new Publisher(); while (total < 1000) { for (int i = 0; i < count; i++) { publisher.sendMessage(); } total += count; System.out.println("Published '" + count + "' of '" + total + "' job messages"); try { Thread.sleep(1000); } catch (InterruptedException x) { } } publisher.close();
}
public void sendMessage() throws JMSException { int idx = 0; while (true) { idx = (int)Math.round(jobs.length * Math.random()); if (idx < jobs.length) { break; } } String job = jobs[idx]; Destination destination = session.createQueue("JOBS." + job); Message message = session.createObjectMessage(id++); System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination); producer.send(destination, message); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); }
} |
public class Consumer {
private static String brokerURL = "tcp://localhost:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session;
private String username = "publisher"; private String password = "password";
private String jobs[] = new String[]{"suspend", "delete"};
public Consumer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(username,password); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); }
public void close() throws JMSException { if (connection != null) { connection.close(); } }
public static void main(String[] args) throws JMSException { Consumer consumer = new Consumer(); for (String job : consumer.jobs) { Destination destination = consumer.getSession().createQueue("JOBS." + job); MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination); messageConsumer.setMessageListener(new Listener(job)); } }
public Session getSession() { return session; }
} |
3、测试
测试一:
A、 先运行Publisher类,待运行完毕后,运行Consumer类
B、 在此过程中"${activemq.base}/mydata/journal”目录下没任何新的Data文件生成
C、 再次运行Consumer,消费不到任何信息
测试二:
A、 先运行Publisher类
B、 重启电脑
C、 运行Consumer类,无任何信息被消费
测试三:
A、 把Publisher类中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改为producer.setDeliveryMode(DeliveryMode. PERSISTENT);
B、 先运行Publisher类,待运行完毕后,运行Consumer类
C、 在此过程中"${activemq.base}/mydata/journal”目录下有N多新的Data文件生成
测试四:
A、 把Publisher类中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改为producer.setDeliveryMode(DeliveryMode. PERSISTENT);
B、 运行Publisher类
C、 重新电脑
D、 运行Consumer类,有信心被消费
结论:
通过以上测试,可以发现,在P2P类型中当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中,而当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。而且P2P中消息一旦被Consumer消费就从broker中删除。
二、Pub/Sub
为了确保发布/订阅应用程序能够接收到所有已发布的消息,在发布端可以使用 PERSISTENT 传递模式传输消息以确保消息不回在传输过程中丢失,而在订阅端则可以使用持久订阅来保证能够收到所有已发布的消息。
环境:
对上面的环境作出如下修改:
1、 在Consumer类中加入:connection.setClientID("clientID001");
2、 把
MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);改为
MessageConsumer messageConsumer =
consumer.getSession().createDurableSubscriber((Topic) destination, "mySub");
测试一:
A、 先启动Publisher类
B、 再启动Consumer类
C、 结果无任何记录被订阅
测试二:
A、 先启动Consumer类,让Comsumer在相关主题上进行订阅
B、 停止Consumer类,再启动Publisher类
C、 待Publisher类运行完成后,再启动Consumer类
D、 结果发现相应主题的信息被订阅