看总结去吧
安装
准备:
jdk1.8
maven 3.3.9以上,我这里用的是3.3.9
登录linux服务器
下载rocketMq安装包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.3.0/rocketmq-all-4.3.0-source-release.zip
unzip rocketmq-all-4.3.0-source-release.zip
安装maven
wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
配置maven环境变量
MAVEN_HOME=/usr/local/maven
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin
生效配置文件
source /etc/profile
检查maven是否成功
^C[root@supervisordruid1 ~]# mvn -v
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-11T00:41:47+08:00)
Maven home: /root/maven/apache-maven-3.3.9
Java version: 1.7.0_51, vendor: Oracle Corporation
Java home: /usr/local/jdk/jdk1.7.0_51/jre
Default locale: en_US, platform encoding: ANSI_X3.4-1968
OS name: "linux", version: "2.6.32-754.3.5.el6.x86_64", arch: "amd64", family: "unix"
通过maven编译部署
cd rocketmq-all-4.3.0-source-release
mvn -Prelease-all -DskipTests clean install -U
可能报错
[root@supervisordruid1 rocketmq-all-4.3.0]# mvn -Prelease-all -DskipTests clean install -U
-bash: bin/mvn: 没有那个文件或目录
将jdk改为1.8
直到
[INFO] rocketmq-namesrv 4.3.0 ............................. SUCCESS [ 1.534 s]
[INFO] rocketmq-logappender 4.3.0 ......................... SUCCESS [ 1.547 s]
[INFO] rocketmq-openmessaging 4.3.0 ....................... SUCCESS [ 1.398 s]
[INFO] rocketmq-example 4.3.0 ............................. SUCCESS [ 1.524 s]
[INFO] rocketmq-test 4.3.0 ................................ SUCCESS [ 2.478 s]
[INFO] rocketmq-distribution 4.3.0 ........................ SUCCESS [ 8.387 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:13 min
[INFO] Finished at: 2019-03-19T03:03:36+08:00
[INFO] Final Memory: 65M/420M
[INFO] ------------------------------------------------------------------------
进入distribution/target/apache-rocketmq目录
cd distribution/target/apache-rocketmq
启动的时候先启动 namesrv,然后启动 broker
启动 NameServer:
先进入 RocketMQ 安装目录下的 distribution/target/apache-rocketmq 目录
# 启动命令,并且常驻内存
$ nohup sh bin/mqnamesrv & 或者 sh mqnamesrv
# 查看启动日志能看到:The Name Server boot success字样则成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
nohup sh mqnamesrv & :属于后台启动
sh mqnamesrv :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
启动 Broker
# 启动命令,并且常驻内存:注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问
> nohup sh bin/mqbroker -n localhost:9876 &
# 查看启动日志
> tail -f ~/logs/rocketmqlogs/broker.log
nohup sh bin/mqbroker -n localhost:9876 & :属于后台启动
sh bin/mqbroker -n localhost:9876 :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
我启动的时候报错:
[root@supervisordruid2 apache-rocketmq]# sh bin/mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /root/rocketMq/rocketmq-all-4.3.0/distribution/target/apache-rocketmq/hs_err_pid7473.log
说明是内存不够,不能为虚拟机分配足够多的空间
[root@supervisordruid2 bin]# vi runserver.sh
找到如下配置,进行修改
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"(修改前的)
|
|
|
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"(修改后的,如果还是不行就更小。,比如1g)
[root@supervisordruid2 bin]# vi runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"(修改前的)
找到如下配置,进行修改
|
|
|
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"(修改后的,如果还是不行就更小。,比如1g
如果报错
Invalid initial heap size: -Xms25mg
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
说明不能正常创建虚拟机,尝试将参数调大,因为rocketmq需要的可能大一点。
比如我调到了1g,注意后面的xmn新生代的大小必须不能比前面两个大,但是我配置0.5页有问题,最后配置成了 1g 1g 1g
然后再次启动 broker 时就成功了,broker 注册到了 nameserver 上了(localhost:9876
[root@supervisordruid2 apache-rocketmq]# sh bin/mqbroker -n localhost:9876
The broker[supervisordruid2, 172.17.42.1:10911] boot success. serializeType=JSON and name server is localhost:9876
使用
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.leesin</groupId>
<artifactId>RocketMq_demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
</dependencies>
</project>
producer
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws MQClientException {
/**
* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ProducerGroupName需要由应用来保证唯一<br>
* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Producer
*/
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupTest");
producer.setNamesrvAddr("10.0.1.225:9876");
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
/**
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
try {
{
Message msg = new Message("broker-a",// topic
"TagB",// tag
"OrderID002",// key
("Hello MetaQ2").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
}
catch (Exception e) {
e.printStackTrace();
}
/**
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
*/
producer.shutdown();
}
}
consumer
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
/**
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
* 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法<br>
* @throws MQClientException
*/
public static void main(String[] args) throws InterruptedException, MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testmerchantLeagueConsumerGroup");
consumer.setNamesrvAddr("10.0.1.225:9876");
/**
* 订阅指定topic下tags分别等于TagA或TagB
*/
consumer.subscribe("broker-a", "TagB || TagA");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("broker-a")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 执行TagA的消费
String message = new String(msg.getBody());
System.out.println(message);
}
else if (msg.getTags() != null && msg.getTags().equals("TagB")) {
// 执行TagB的消费
String message = new String(msg.getBody());
System.out.println(message);
}
}
//消费者向mq服务器返回消费成功的消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
启动producer报错:org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, broker-a
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, broker-a
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:561)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1058)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1017)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:208)
at Producer.main(Producer.java:35)
查看文章:
https://blog.csdn.net/wangmx1993328/article/details/81588217
可能是上面的jvm参数设置的太小了,重新启动一下rocketmq broker和name
我这里的原因是因为防火墙没开
vi /etc/sysconfig/iptables
添加
-A INPUT -m state --state NEW -m tcp -p tcp --dport 8080 -j ACCEPT
开启防火墙报错:
iptables: Applying firewall rules: iptables-restore v1.4.7: no command specified
Error occurred at line: 26
[root@supervisordruid2 apache-rocketmq]# service iptables restart
iptables: Setting chains to policy ACCEPT: filter nat [ OK ]
iptables: Flushing firewall rules: [ OK ]
iptables: Unloading modules: [ OK ]
iptables: Applying firewall rules: iptables-restore v1.4.7: no command specified
Error occurred at line: 26
Try `iptables-restore -h' or 'iptables-restore --help' for more information.
[FAILED]
因为防火墙的配置文件在修改的过程中多了回车空格导致的。
通过命令查看防火墙状态
[root@supervisordruid2 apache-rocketmq]# /etc/init.d/iptables status
Table: filter
Chain INPUT (policy ACCEPT)
num target prot opt source destination
1 ACCEPT tcp -- 0.0.0.0/0 0.0.0.0/0 tcp dpt:9876
2 ACCEPT tcp -- 0.0.0.0/0 0.0.0.0/0 tcp dpt:9876
3 ACCEPT tcp -- 0.0.0.0/0 0.0.0.0/0 tcp dpt:8889
4 ACCEPT tcp -- 0.0.0.0/0 0.0.0.0/0 tcp dpt:2375
5 ACCEPT tcp -- 0.0.0.0/0 0.0.0.0/0 tcp dpt:9876
之后重新启动rocket produer和consumer
运行还是会报错:rocketmq org.apache.rocketmq.remoting.exception.RemotingConnectException:connect to failed
rocketmq org.apache.rocketmq.remoting.exception.RemotingConnectException:connect to failed
这是由于跨域造成的:
修改服务器中broker的配置,添加服务器IP(公网)即可
vim /home/prod/rocketmq-all-4.3.0/distribution/target/apache-rocketmq/conf/broker.conf
新增一行:
brokerIP1=xx.xx.xx.xx # 你的公网IP,即你的mq安装的哪个服务器的ip,不是你java程序的ip
如何获得自己的公网ip?
curl ifconfig.me
或者
curl cip.cc
然后重启 mqnamesrv,记得先杀死进程
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
然后重启Broker, 记得先杀死进程
注意,重点是: -c conf/broker.conf
> nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success..
至此终于能够正常启动了,producer和consumer正常。
producer 输出:
SendResult [sendStatus=SEND_OK, msgId=0A0005820F8318B4AAC260221C780000, offsetMsgId=0A0001E100002A9F00000000000DBA37, messageQueue=MessageQueue [topic=broker-a, brokerName=broker-a, queueId=0], queueOffset=1]
consumer输出
Consumer Started.
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=189, queueOffset=1, sysFlag=0, bornTimestamp=1552982448251, bornHost=/10.0.5.130:55358, storeTimestamp=1552953392186, storeHost=/10.0.1.225:10911, msgId=0A0001E100002A9F00000000000DBA37, commitLogOffset=899639, bodyCRC=2145937944, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=broker-a, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=OrderID002, CONSUME_START_TIME=1552982688881, UNIQ_KEY=0A0005820F8318B4AAC260221C780000, WAIT=true, TAGS=TagB}, body=12]]]
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=189, queueOffset=0, sysFlag=0, bornTimestamp=1552982440910, bornHost=/10.0.5.130:55334, storeTimestamp=1552953384869, storeHost=/10.0.1.225:10911, msgId=0A0001E100002A9F00000000000DB97A, commitLogOffset=899450, bodyCRC=2145937944, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=broker-a, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=OrderID002, CONSUME_START_TIME=1552982688881, UNIQ_KEY=0A0005820F8118B4AAC26021FFCD0000, WAIT=true, TAGS=TagB}, body=12]]]
Hello MetaQ2
Hello MetaQ2
感谢:https://blog.csdn.net/wangmx1993328/article/details/81536168
报错
报错文章收集:https://blog.csdn.net/wangmx1993328/article/details/81588217#RemotingTooMuchRequestException%3A sendDefaultImpl call timeout
https://blog.csdn.net/lw5885799/article/details/88646051
https://blog.csdn.net/jiangyu1013/article/details/81478754
com.alibaba.rocketmq.client.exception.MQBrokerException:
CODE: 14 DESC: service not available now, maybe disk full, CL: 0.87 CQ: 0.87 INDEX: 0.87, maybe your broker machine memory too small.
机器的磁盘空间不足导致,删除机器上没用的东西,腾出空间就好了。
idea中启动producer
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:634)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1279)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1225)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:283)
at Producer.send(Producer.java:78)
at Producer.run(Producer.java:96)
at java.lang.Thread.run(Thread.java:748)
at Main.main(Main.java:10)
null
说是关了防火墙就可以了,这句话好敷衍对吧,具体的操作看上面的推荐博文,我试了不行,继续往下看
还可能报错:
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
上面这两个错掺和着来,具体操作看上面的博文,我试了不行,继续往下看。
总结:
rocketmq内存设置
vi runserver.sh
vi runbroker.sh
//搜索到这里
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
rocketmq需要的内存比较大,如果设置太大,系统空间不够,如果设置太小,不能成功构建虚拟机,因为我就需要那么大,你给太小怎么办???(name和brocker都需要设置)
Invalid initial heap size: -Xms25mg
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
这个就是设置太小了,rocketmq虚拟机构建不出来
[root@supervisordruid2 apache-rocketmq]# sh bin/mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /root/rocketMq/rocketmq-all-4.3.0/distribution/target/apache-rocketmq/hs_err_pid7473.log
这个就是设置太大了,我们的机器给不了这么多
所以就需要根据自己的机器设置出机器技能给够运行内存,又不至于设置的太小而起不来,所以具体的还需要自己慢慢试,一点一点试出来。
因为我的机器是公司本地的机器,所以,上面的内存可能不多了
# linux 下 取进程占用 cpu 最高的前10个进程
ps aux|head -1;ps aux|grep -v PID|sort -rn -k +3|head
# linux 下 取进程占用内存(MEM)最高的前10个进程
ps aux|head -1;ps aux|grep -v PID|sort -rn -k +4|head
杀掉没有用的进程,节省空间出来。
上面这个配置好的话,其实已经差不多了,至于上面报错小节中的后两个问题我都迎刃而解。
我最后设置的是:
name:-Xms4g -Xmx4g -Xmn2g
brocker:-Xms4g -Xmx4g -Xmn4g
当然这不是死的,还好我关了本地的kafka,节省了好多内存。
df -h 运行内存查看
free -g 磁盘内存查看
实在不行换台机器。
配置brockerip
上面介绍中有
vim /home/prod/rocketmq-all-4.3.0/distribution/target/apache-rocketmq/conf/broker.conf
新增一行:
brokerIP1=xx.xx.xx.xx # 你的公网IP,即你的mq安装的哪个服务器的ip,不是你java程序的ip
这里,因为我是本地环境,所以直接写成了本机的内网ip,因为仅供内网测试用。
如何获得自己的公网ip?
curl ifconfig.me
或者
curl cip.cc
启动方式
上面的博文中有一篇对启动方式有很大的强制性:
https://blog.csdn.net/lw5885799/article/details/88646051
博文裁剪:
name启动
nohup ./bin/mqnamesrv -n 你的公网IP:9876 &
不能写localhost
broker 启动
nohup sh bin/mqbroker -n 你的公网IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &
首先他这个是后台运行的方式
我的方式
name启动
./mqnamesrv
brocker启动
./mqbroker -n localhost:9876
------------------------------
但是建议写成(仅仅是建议)
sh bin/mqbroker -n localhost:9876 -c conf/broker.conf autoCreateTopicEnable=true
我这里是localhost,因为我的demo就是访问内网部署的rocketMq,如果是公网的话根据我的推测写成公网ip把,因为这里仅供本地测试,所以我写成localhost就成功了。
如果往机器上部署,最好再本地看看报错吗
关于防火墙
如果像我一样只是玩玩demo,别搞端口开放了,直接关了防火墙
service iptables stop
service iptables status
当然在其他的机器上
telnet ip 9876
确保这个端口是可用的。