目录
在生产环境中,手工签收的方式比较合适,因为某个消息在消费端没有成功处理的情况下,可以不给ActiveMQ消息中间件发送针对这个消息的确认签收。同时,记录相关信息到日志文件或数据库中,以便后续做相应处理。在默认情况下,消息在ActiveMQ消息中间件中是不会过期的,可以根据实际的项目需要去设置消息的过期时间,单位毫秒。
消息优先级总共十个,即0-9。其中,0-4是普通消息,5-9是加急消息,默认优先级为4。加急消息理论上优先于普通消息被消费。消息的优先级并不能确保消息发送和消费的先后顺序,如果项目需要的话,可以在ActiveMQ中间件与消费者之间添加一个排队系统,来保证消费者顺序消费消息。
1、ActiveMQ的消息持久化机制
ActiveMQ的持久化机制包含JDBC,KahaDB、LevelDB
在activemq.xml中查默认的broker持久化机制是kahaDB,现改为JDBC持久化:
<persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" />
</persistenceAdapter>
createTablesOnStartup默认值是true,即每次ActiveMQ启动的时候都重新创建数据表,一般是首次启动设置为true,之后设置为false。
同时,在broker标签外设置bean:
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3406/activemq?relaxAutoCommit=true"/>
<property name="username" value="mysql"/>
<property name="password" value="password"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
将mysql-connector-java-5.1.34.jar包放置到activemq的lib目录下(如果提示数据库连接还有问题,就把这三个jar包也放到lib目录):
重新启动ActiveMQ,注意用命令netstat -ano |findstr 61616查看下端口是否被占用,一般都要taskkill /f /pid (pid) 杀死进程。登录ActiveMQ的默认控制台http://127.0.0.1:8161/admin,若ActiveMQ正常启动和运行,则表示其JDBC持久化机制设置没有问题;否则查看data目录下的log文件,排查出错原因。
因为使用了JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。activemq_msgs用于存储消息,Queue和Topic都存储在这个表中。
2、执行Producer
package com.cb;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception{
//1.创建ConnectionFactory对象
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
"cb",
"123456",
"tcp://localhost:61616");
//2.创建一个Connection并开启
Connection connection=connectionFactory.createConnection();
connection.start();
//3.创建Session会话,用来接收消息,通过参数可以设置:是否启用事务、消息签收模式
//参数设置生产者使用事务、客户端(消费者)签收方式
Session session=connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
//4.创建Destination对象。在点对点模式中,该对象被称为Queue;在发布订阅模式中,该对象被称为Topic
Destination destination=session.createQueue("queue1");
//5.创建消息的生产者
MessageProducer messageProducer=session.createProducer(null);
//6.设置生产者的消息持久化与非持久化特性
//messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//7.选择需要的JMS消息格式,创建并发送消息,此处选择的是TextMessage字符串对象
TextMessage textMessage=session.createTextMessage();
textMessage.setText("生产者"+"activemq消息测试");
//messageProducer.send(textMessage);
//第3个参数:是否持久化;第4个参数:优先级(0~4普通 5~9加急);第5个参数:消息在ActiveMQ中间件中存放的有效期
messageProducer.send(destination, textMessage,DeliveryMode.PERSISTENT, 4, 1000*60*10);
//使用事务,必须有commit操作
session.commit();
//8.释放Connection
if(null!=connection){
connection.close();
}
}
}
3、执行Consumer
package com.cb;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
public static void main(String[] args) throws Exception{
//1.创建ConnectionFactory对象
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
"cb",
"123456",
"tcp://localhost:61616");
//2.创建一个Connection并开启
Connection connection=connectionFactory.createConnection();
connection.start();
//3.创建Session会话,用来接收消息,通过参数可以设置:是否启用事务、消息签收模式
Session session=connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
//4.创建Destination对象。在点对点模式中,该对象被称为Queue;在发布订阅模式中,该对象被称为Topic
Destination destination=session.createQueue("queue1");
//5.创建消息的消费者
MessageConsumer messageConsumer=session.createConsumer(destination);
//6.消费者从消息中间件的Queue获取消息
while(true){
TextMessage textMessage=(TextMessage) messageConsumer.receive();
if(null==textMessage){
break;
}
System.out.println("消费者接收到的内容:"+textMessage.getText());
//手动确认签收
textMessage.acknowledge();
}
//7.释放Connection
if(null!=connection){
connection.close();
}
}
}
因为消息已经被消费掉,再次查看mysql数据库中的activemq_msgs表,发现消息已经不存在了。
4、ActiveMQ消息的有效期
在上述的Producer类中,将消息的有效期设置为10分钟,若这条消息发送到了ActiveMQ消息中间件但一直未被消费,直到10分钟的时间到,消息则过期。
现在执行Producer,但是不执行Consumer。查看ActiveMQ的管控台:
10分钟之后: