ActiveMQ 消息持久化到数据库

时间:2023-02-16 08:11:57

1:前言

     这一段给公司开发消息总线有机会研究ActiveMQ,今天撰文给大家介绍一下他的持久化消息。本文只介绍三种方式,分别是持久化为文件,MYSql,Oracle。下面逐一介绍。

A:持久化为文件

     这个你装ActiveMQ时默认就是这种,只要你设置消息为持久化就可以了。涉及到的配置和代码有

<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>

producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);

B:持久化为MySql

     你首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar

     接下来你修改配置文件

<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
</persistenceAdapter>

在配置文件中的broker节点外增加
<bean id="derby-ds"class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

从配置中可以看出数据库的名称是activemq,你需要手动在MySql中增加这个库。

然后重新启动消息队列,你会发现多了3张表

1:activemq_acks

2:activemq_lock

3:activemq_msgs

C:持久化为Oracle

    和持久化为MySql一样。这里我说两点

1;在ActiveMQ安装文件夹里的Lib文件夹中增加Oracle的JDBC驱动。驱动文件位于Oracle客户端安装文件中的product\11.1.0\client_1\jdbc\lib文件夹下。

2:

<bean id="derby-ds"class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:test"/>
<property name="username" value="qdcommu"/>
<property name="password" value="qdcommu"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

这里的jdbc:oracle:thin:@10.53.132.47:1521:test按照自己实际情况设置一下就可以了,特别注意的是test是SID即服务名称而不是TNS中配置的节点名。各位同学只需要替换IP,端口和这个SID就可以了。 消息消费者的事先代码:  消息消费者的事先代码: 
package easyway.activemq.app;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/***
* 消息持久化到数据库
* @author longgangbai
*/
public class MessageCustomer {
private static Logger logger=LogManager.getLogger(MessageProductor.class);
private String username=ActiveMQConnectionFactory.DEFAULT_USER;
private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
private static String QUEUENAME="ActiveMQ.QUEUE";
protected static final int messagesExpected = 10;
protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url+"?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);


/***
* 创建Broker服务对象
* @return
* @throws Exception
*/
public BrokerService createBroker()throws Exception{
BrokerService broker=new BrokerService();
broker.addConnector(url);
return broker;
}

/**
* 启动BrokerService进程
* @throws Exception
*/
public void init() throws Exception{
BrokerService brokerService=createBroker();
brokerService.start();
}
/**
* 接收的信息
* @return
* @throws Exception
*/
public int receiveMessage() throws Exception{
Connection connection=connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
return receiveMessages(messagesExpected,session);
}


/**
* 接受信息的方法
* @param messagesExpected
* @param session
* @return
* @throws Exception
*/
protected int receiveMessages(int messagesExpected, Session session) throws Exception {
int messagesReceived = 0;
for (int i=0; i<messagesExpected; i++) {
Destination destination = session.createQueue(QUEUENAME);
MessageConsumer consumer = session.createConsumer(destination);
Message message = null;
try {
logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected);
message = consumer.receive(2000);
logger.info("Received : " + message);
if (message != null) {
session.commit();
messagesReceived++;
}
} catch (Exception e) {
logger.debug("Caught exception " + e);
session.rollback();
} finally {
if (consumer != null) {
consumer.close();
}
}
}
return messagesReceived;
}


public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
}

 消息生产者的代码:
package easyway.activemq.app;

import java.io.File;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.sql.DataSource;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter;
import org.apache.commons.dbcp.BasicDataSourceFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import easyway.activemq.app.utils.BrokenPersistenceAdapter;
/**
* 消息持久化到数据库
* @author longgangbai
*
*/
public class MessageProductor {
private static Logger logger=LogManager.getLogger(MessageProductor.class);
private String username=ActiveMQConnectionFactory.DEFAULT_USER;
private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
private static String queueName="ActiveMQ.QUEUE";
private BrokerService brokerService;
protected static final int messagesExpected = 10;
protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61617?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);
/***
* 创建Broker服务对象
* @return
* @throws Exception
*/
public BrokerService createBroker()throws Exception{
BrokerService broker=new BrokerService();
BrokenPersistenceAdapter jdbc=createBrokenPersistenceAdapter();
broker.setPersistenceAdapter(jdbc);
jdbc.setDataDirectory(System.getProperty("user.dir")+File.separator+"data"+File.separator);
jdbc.setAdapter(new MySqlJDBCAdapter());
broker.setPersistent(true);
broker.addConnector("tcp://localhost:61617");
//broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
return broker;
}
/**
* 创建Broken的持久化适配器
* @return
* @throws Exception
*/
public BrokenPersistenceAdapter createBrokenPersistenceAdapter() throws Exception{
BrokenPersistenceAdapter jdbc=new BrokenPersistenceAdapter();
DataSource datasource=createDataSource();
jdbc.setDataSource(datasource);
jdbc.setUseDatabaseLock(false);
//jdbc.deleteAllMessages();
return jdbc;
}
/**
* 创建数据源
* @return
* @throws Exception
*/
public DataSource createDataSource() throws Exception{
Properties props=new Properties();
props.put("driverClassName", "com.mysql.jdbc.Driver");
props.put("url", "jdbc:mysql://localhost:3306/activemq");
props.put("username", "root");
props.put("password", "root");
DataSource datasource=BasicDataSourceFactory.createDataSource(props);
return datasource;
}
/**
* 启动BrokerService进程
* @throws Exception
*/
public void init() throws Exception{
createBrokerService();
brokerService.start();
}
public BrokerService createBrokerService() throws Exception{
if(brokerService==null){
brokerService=createBroker();
}
return brokerService;
}

public void sendMessage() throws JMSException{
Connection connection=connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for(int i=0;i<messagesExpected;i++){
logger.debug("Sending message " + (i+1) + " of " + messagesExpected);
producer.send(session.createTextMessage("test message " + (i+1)));
}
connection.close();
}



public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
}

持久化适配器类
package easyway.activemq.app.utils;


import java.io.IOException;

import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokenPersistenceAdapter extends JDBCPersistenceAdapter {

private final Logger LOG = LoggerFactory.getLogger(BrokenPersistenceAdapter.class);

private boolean shouldBreak = false;

@Override
public void commitTransaction(ConnectionContext context) throws IOException {
if ( shouldBreak ) {
LOG.warn("Throwing exception on purpose");
throw new IOException("Breaking on purpose");
}
LOG.debug("in commitTransaction");
super.commitTransaction(context);
}

public void setShouldBreak(boolean shouldBreak) {
this.shouldBreak = shouldBreak;
}
}

测测试代码如下:
package easyway.activemq.app.test;

import easyway.activemq.app.MessageProductor;

public class MessageProductorTest {

public static void main(String[] args) throws Exception {
MessageProductor productor =new MessageProductor();
productor.init();
productor.sendMessage();
//productor.createBrokerService().stop();
}

}

package easyway.activemq.app.test;

import easyway.activemq.app.MessageCustomer;

public class MessageCustomerTest {
public static void main(String[] args) throws Exception {
MessageCustomer customer=new MessageCustomer();
//customer.init(); //当两台机器在不同的服务器上启动客户端的broker进程
customer.receiveMessage();

}
}

备注:运行过程为:首先执行MessageProductorTest,MessageCustomerTest。

        mysql数据库activemq必须存在。关于消息持久化的表结构如下:

 ActiveMQ  消息持久化到数据库

 

ActiveMQ  消息持久化到数据库