ActiveMQ主从配置

时间:2023-12-05 09:50:14

这种方式有个问题,activemq1有消息没消费完但是突然宕机,虽然程序会自动连到activemq2。但是activemq1的消息只有等机器恢复后才会被消费。

1.启动:我这里使用的是apache-activemq-5.13.3,是在windows下使用的,发现根据文档说的双击activemq.bat启动不了,那就只好使用命令启动,CMD进入到apache-activemq-5.13.3\bin下,输入activemqbat start。这样就可以启动了。

2.主从配置:第一个activemq解压到apache-activemq-5.13.3,第二个解压到apache-activemq-5.13.3-2

  第一个activemq直接输入命令启动

  第二个需要修改参数:a.打开apache-activemq-5.13.3-2\conf\activemq.xml,修改broker标签里面的brokerName,不要和第一个相同就行

            b.修改activemq.xml中的transportConnectors,删除其他,只留一个openwire就行,修改uri里面的端口号

            c.在transportConnectors上面添加(如果一会儿启动的时候这里报错,请手动敲打下面三行,不要复制)

              <networkConnectors>

                <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/>
              </networkConnectors>

ActiveMQ主从配置

            d.修改\conf\jetty.xml文件的115行,端口号随便写一个。(这里是jetty的访问端口)

配置文件修改完成,启动第一个activemq,启动第二个activemq。

接下来是代码中brokerURL需要改成使用failover。这样启动生产者和消费者后,程序就可以在主从直接自动切换(可以尝试轮流关闭主从)。

生产者代码如下:

 import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; public class Sender {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// MessageProducer:消息发送者
MessageProducer producer;
// TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
String brokerURL = "failover://(tcp://localhost:61616,tcp://localhost:61617)";
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, brokerURL);
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
// 得到消息生成者
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
while (true) {
sendMessage(session, producer);
session.commit();// commit后消息才会发出去
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
} static int i = 1; public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
i++;
}
}

producer

消费者代码如下:

 import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
String brokerURL = "failover://(tcp://localhost:61616,tcp://localhost:61617)";
// String brokerURL = "tcp://localhost:61616";
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
brokerURL);
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MyListener());
System.out.println("started...");
while(true){
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
class MyListener implements MessageListener{ public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

Receiver

以上代码部分摘自网络

这是配置主从的一个方案,还有一种方案是使用文件系统。