JMS服务器ActiveMQ持久化消息到MySQL数据库中配置及测试
1、下载AMQ:http://activemq.apache.org/download.html,最新版本是5.5.0;
2、解压apache-activemq-5.5.0-bin.zip文件到文件系统(比如D:\ActiveMQ-5.5.0);
3、执行bin/activemq.bat脚本即可启动AMQ:
- INFO | ActiveMQ 5.5.0 JMS Message Broker (localhost) is starting
- ......
- INFO | Listening for connections at: tcp://SHI-AP33382A:61616
当看到上面的日志输出时,表示AMQ已经启动了;
4、默认情况下,AMQ使用conf/activemq.xml作为配置文件,我们可修改它,然后以 bin/activemq.bat xbean:./conf/my.xml启动AMQ;
三、持久化消息(MySQL)
1、建立MySQL数据库:要使用MySQL存储消息,必须告诉AMQ数据源:
- /**
- * 创建数据库
- */
- CREATE DATABASE misc DEFAULT CHARSET=UTF8;
- /**
- * 创建用户和授权
- */
- GRANT ALL PRIVILEGES ON misc.* TO 'misc_root'@'%' IDENTIFIED BY 'misc_root_pwd';
- GRANT ALL PRIVILEGES ON misc.* TO 'misc_root'@'localhost' IDENTIFIED BY 'misc_root_pwd';
通过上面的SQL脚本,我们建立了名为misc的数据库,并且把所有权限都赋予了misc_root的用户;
由于AMQ需要在本数据库中建立数据表,因此用户的权限必须具有建表权限,ActiveMQ是内置的建库脚本程序,所以,只需要正确的配置数据源即可;
因为接下来我们修改AMQ的默认配置文件,所以先备份conf/activemq.xml文件;
2、添加MySQL数据源:默认情况下,AMQ使用KahaDB存储(我对KahaDB不了解),注释到KahaDB的配置方式,改为MySQL的:
- <!--
- <persistenceAdapter>
- <kahaDB directory="${activemq.base}/data/kahadb"/>
- </persistenceAdapter>
- -->
- <persistenceAdapter>
- <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#MySQL-DS"/>
- </persistenceAdapter>
该配置表示,我们将要使用一个叫做“MySQL-DS”的JDBC数据源;
3、配置MySQL数据源:在</broker>节点后面,增加MySQL数据源配置:
- <!-- MySQL DataSource -->
- <bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
- <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
- <property name="url" value="jdbc:mysql://127.0.0.1:3308/misc?useUnicode=true&characterEncoding=UTF-8"/>
- <property name="username" value="misc_root"/>
- <property name="password" value="misc_root_pwd"/>
- <property name="poolPreparedStatements" value="true"/>
- <property name="maxActive" value="200"/>
- </bean>
其实这就是一个Spring的Bean的配置,注意id与上面的保持一致;
拷贝mysql 数据库驱动mysql-connector-java-5.1.13-bin.jar和DBCP jar commons-dbcp-1.2.2.jar到activeMQ的classpath路径下。。。
四、查看MySQL数据表
重新启动AMQ,启动完成之后,我们发现,misc数据库多了3张数据表:
- mysql> SHOW tables;
- +----------------+
- | Tables_in_misc |
- +----------------+
- | activemq_acks |
- | activemq_lock |
- | activemq_msgs |
- +----------------+
启动的log 信息如下
C:\Documents and Settings\johnson yang\Desktop\apache-activemq-5.5.0-bin\apache-activemq-5.5.0\bin>activemq.bat
Java Runtime: Sun Microsystems Inc. 1.6.0_16 C:\Java\jdk1.6.0_16\jre
Heap sizes: current=5056k free=4635k max=520256k
JVM args: -Dcom.sun.management.jmxremote -Xmx512M -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -
Dactivemq.classpath=C:\Documents and Settings\johnson yang\Desktop\apache-activemq-5.5.0-bin\apache-activemq-5.5.0\bin\../conf;C:\Documents and Settings\joh
nson yang\Desktop\apache-activemq-5.5.0-bin\apache-activemq-5.5.0\bin\../conf; -Dactivemq.home=C:\Documents and Settings\johnson yang\Desktop\apache-activem
q-5.5.0-bin\apache-activemq-5.5.0\bin\.. -Dactivemq.base=C:\Documents and Settings\johnson yang\Desktop\apache-activemq-5.5.0-bin\apache-activemq-5.5.0\bin\
..
ACTIVEMQ_HOME: C:\Documents and Settings\johnson yang\Desktop\apache-activemq-5.5.0-bin\apache-activemq-5.5.0\bin\..
ACTIVEMQ_BASE: C:\Documents and Settings\johnson yang\Desktop\apache-activemq-5.5.0-bin\apache-activemq-5.5.0\bin\..
Loading message broker from: xbean:activemq.xml
INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@11ff436: startup date [Tue Aug 02 15:18:45 CST 2011]; root of context hierarchy
WARN | destroyApplicationContextOnStop parameter is deprecated, please use shutdown hooks instead
INFO | PListStore:C:\Documents and Settings\johnson yang\Desktop\apache-activemq-5.5.0-bin\apache-activemq-5.5.0\bin\..\data\localhost\tmp_storage started
INFO | UsingPersistence Adapter: JDBCPersistenceAdapter(org.apache.commons.dbcp.BasicDataSource@10a5c21)
INFO | Database adapter driver override recognized for : [mysql-ab_jdbc_driver] - adapter: classorg.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter
INFO | Database lock driver override not found for : [mysql-ab_jdbc_driver]. Will use default implementation.
INFO | Attempting to acquire the exclusive lock to become the Master broker
INFO | Becoming the master on dataSource: org.apache.commons.dbcp.BasicDataSource@10a5c21
INFO | ActiveMQ 5.5.0 JMS Message Broker (localhost) is starting
INFO | For help or more information please see: http://activemq.apache.org/
INFO | Listening for connections at: tcp://sha-lri-pc-085:61618
INFO | Connector openwire Started
INFO | ActiveMQ JMS Message Broker (localhost, ID:sha-lri-pc-085-1402-1312269539231-0:1) started
INFO | jetty-7.1.6.v20100715
INFO | ActiveMQ WebConsole initialized.
INFO | Initializing Spring FrameworkServlet 'dispatcher'
INFO | ActiveMQ Console at http://0.0.0.0:8161/admin
INFO | Initializing Spring root WebApplicationContext
INFO | OSGi environment not detected.
INFO | Apache Camel 2.7.0 (CamelContext: camel) is starting
INFO | JMX enabled. Using ManagedManagementStrategy.
INFO | Found 5 packages with 16 @Converter classes to load
INFO | Loaded 152 type converters in 0.719 seconds
INFO | Connector vm://localhost Started
INFO | Route: route1 started and consuming from: Endpoint[activemq://example.A]
INFO | Total 1 routes, of which 1 is started.
INFO | Apache Camel 2.7.0 (CamelContext: camel) started in 2.422 seconds
INFO | Camel Console at http://0.0.0.0:8161/camel
INFO | ActiveMQ Web Demos at http://0.0.0.0:8161/demo
INFO | RESTful file access application at http://0.0.0.0:8161/fileserver
INFO | Started SelectChannelConnector@0.0.0.0:8161
数据表activemq_msgs即为持久化消息表;
五、持久化消息
系统启动完毕之后,消息表中内容为空:
- mysql> SELECT * FROM activemq_msgs;
- Empty set
1、发送消息:打开http://127.0.0.1:8161/demo/页面,找到“Send a message”链接,打开页面(http://127.0.0.1:8161/demo/send.html),填写完表格后,点击“Send”按键,即AMQ投递了一个消息;
2、查看消息:发送之后,我们可以看到数据表中多了一条消息:
- mysql> SELECT * FROM activemq_msgs;
- +----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
- | ID | CONTAINER | MSGID_PROD | MSGID_SEQ | EXPIRATION | MSG | PRIORITY |
- +----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
- | 1 | queue://FOO.BAR | ID:SHI-AP33382A-1486-1309840138441-2:2:1:1 | 1 | 0 | | 5 (优先级是五,就是优先级是高的。。实际生产中要重新设置)|
- +----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
3、取得消息:找到“Receive a message”链接,打开页面(http://127.0.0.1:8161/demo/message/FOO/BAR?readTimeout=10000&type=queue),发现该页面不是一个标准HTML页面,查看其源代码,其内容是不是就是刚才的消息内容?
4、查看消息:消息消费之后,我们可以看到数据表没有消息了:
- mysql> SELECT * FROM activemq_msgs;
- Empty set
5、我们可以生产多条消息,然后一条一条的消费,发现消息表中的消息一条一条的减少;
6、在发送消息页面,“Destination Type”如果选择“Topic”的话,则消息表中并没有数据,原因在于“Queue”为ptp模式消息,“Topic”为发布/订阅模式消息,当没有订阅者时,消息直接丢掉了。