环境
RocketMQ 3.2.6
安装位置 /usr/local/alibaba-rockermq
外网ip 182.254.145.66
内网ip 10.105.23.114
安装
wget https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz
tar alibaba-rocketmq-3.2.6.tar.gz
cd alibaba-rocketmq
启动
nohup sh mqnamesrv -n 10.105.23.114:9876 &
nohup sh mqbroker -n 10.105.23.114:9876
java测试
使用maven构建环境
<!-- http://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.3</version> </dependency>
package rocketmq; import java.util.Date; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); producer.setNamesrvAddr("182.254.145.66:9876"); producer.setInstanceName("rmq-instance"); producer.start(); try { for (int i = 0; i < 3; i++) { Message msg = new Message("TopicA-test",// topic "TagA",// tag (new Date() + "Hello RocketMQ ,QuickStart" + i) .getBytes()// body ); SendResult sendResult = producer.send(msg); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } } package rocketmq; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("182.254.145.66:9876"); consumer.setInstanceName("rmq-instance"); consumer.subscribe("TopicA-test", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
运行consumer后发现
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <10.105.23.114:10911> failed
在nohup.out里发现
The broker[localhost, 10.105.23.114:10911] boot success. and name server is 182.254.145.65:9876
哎,看来还是外网内网ip的问题
上次在安装Tair的时候就碰到过类似的问题 详见 Centos7安装Tair及配置测试
最后经过多方搜索,在官方的用户说明里看到下面的方法
经过我修改后的broker.p
namesrvAddr=127.0.0.1:9876 brokerIP1=182.254.145.66 brokerName=localhost brokerClusterName=DefaultCluster brokerId=0 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true rejectTransactionMessage=false fetchNamesrvAddrByAddressServer=false storePathRootDir=/root/store storePathCommitLog=/root/store/commitlog flushIntervalCommitLog=1000 flushCommitLogTimed=false deleteWhen=04 fileReservedTime=72 maxTransferBytesOnMessageInMemory=262144 maxTransferCountOnMessageInMemory=32 maxTransferBytesOnMessageInDisk=65536 maxTransferCountOnMessageInDisk=8 accessMessageInMemoryMaxRatio=40 messageIndexEnable=true messageIndexSafe=false haMasterAddress= brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH cleanFileForciblyEnable=true
ok!
这说明什么?说明第一手资料很重要
参考资料
http://www.jialeens.com/archives/681.html
http://www.cnblogs.com/xiaodf/p/5075167.html