Centos7安装RocketMQ及配置测试

时间:2023-11-22 12:29:14

环境

Centos7

RocketMQ 3.2.6

安装位置 /usr/local/alibaba-rockermq

外网ip 182.254.145.66

内网ip 10.105.23.114

安装

wget https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz

tar alibaba-rocketmq-3.2.6.tar.gz

cd  alibaba-rocketmq

启动

nohup sh mqnamesrv -n 10.105.23.114:9876 & 

nohup sh mqbroker -n 10.105.23.114:9876

java测试

使用maven构建环境

			<!-- http://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
			<dependency>
				<groupId>com.alibaba.rocketmq</groupId>
				<artifactId>rocketmq-client</artifactId>
				<version>3.2.3</version>
			</dependency>
package rocketmq;

import java.util.Date;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("182.254.145.66:9876");
        producer.setInstanceName("rmq-instance");
        producer.start();
        try {
            for (int i = 0; i < 3; i++) {
                Message msg = new Message("TopicA-test",// topic
                    "TagA",// tag
                        (new Date() + "Hello RocketMQ ,QuickStart" + i)
                                .getBytes()// body
                );
                SendResult sendResult = producer.send(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

package rocketmq;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

        consumer.setNamesrvAddr("182.254.145.66:9876");
        consumer.setInstanceName("rmq-instance");
        consumer.subscribe("TopicA-test", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

运行consumer后发现

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <10.105.23.114:10911> failed

在nohup.out里发现 

The broker[localhost, 10.105.23.114:10911] boot success. and name server is 182.254.145.65:9876

哎,看来还是外网内网ip的问题

上次在安装Tair的时候就碰到过类似的问题 详见  Centos7安装Tair及配置测试

最后经过多方搜索,在官方的用户说明里看到下面的方法

Centos7安装RocketMQ及配置测试

经过我修改后的broker.p

namesrvAddr=127.0.0.1:9876
brokerIP1=182.254.145.66
brokerName=localhost
brokerClusterName=DefaultCluster
brokerId=0
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
rejectTransactionMessage=false
fetchNamesrvAddrByAddressServer=false
storePathRootDir=/root/store
storePathCommitLog=/root/store/commitlog
flushIntervalCommitLog=1000
flushCommitLogTimed=false
deleteWhen=04
fileReservedTime=72
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInMemory=32
maxTransferBytesOnMessageInDisk=65536
maxTransferCountOnMessageInDisk=8
accessMessageInMemoryMaxRatio=40
messageIndexEnable=true
messageIndexSafe=false
haMasterAddress=
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
cleanFileForciblyEnable=true

ok!

这说明什么?说明第一手资料很重要

参考资料

http://www.jialeens.com/archives/681.html

http://www.cnblogs.com/xiaodf/p/5075167.html