RocktMq安装和简单使用以及报错收集

时间:2022-10-10 02:35:41

看总结去吧

安装

准备:

jdk1.8

maven 3.3.9以上,我这里用的是3.3.9

登录linux服务器

下载rocketMq安装包

wget http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.3.0/rocketmq-all-4.3.0-source-release.zip
unzip rocketmq-all-4.3.0-source-release.zip

安装maven

wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz

配置maven环境变量

MAVEN_HOME=/usr/local/maven
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin

生效配置文件

source /etc/profile

检查maven是否成功

^C[root@supervisordruid1 ~]# mvn -v
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-11T00:41:47+08:00)
Maven home: /root/maven/apache-maven-3.3.9
Java version: 1.7.0_51, vendor: Oracle Corporation
Java home: /usr/local/jdk/jdk1.7.0_51/jre
Default locale: en_US, platform encoding: ANSI_X3.4-1968
OS name: "linux", version: "2.6.32-754.3.5.el6.x86_64", arch: "amd64", family: "unix"

通过maven编译部署

cd rocketmq-all-4.3.0-source-release
mvn -Prelease-all -DskipTests clean install -U

可能报错

[root@supervisordruid1 rocketmq-all-4.3.0]# mvn -Prelease-all -DskipTests clean install -U
-bash: bin/mvn: 没有那个文件或目录

将jdk改为1.8

直到

[INFO] rocketmq-namesrv 4.3.0 ............................. SUCCESS [  1.534 s]
[INFO] rocketmq-logappender 4.3.0 ......................... SUCCESS [  1.547 s]
[INFO] rocketmq-openmessaging 4.3.0 ....................... SUCCESS [  1.398 s]
[INFO] rocketmq-example 4.3.0 ............................. SUCCESS [  1.524 s]
[INFO] rocketmq-test 4.3.0 ................................ SUCCESS [  2.478 s]
[INFO] rocketmq-distribution 4.3.0 ........................ SUCCESS [  8.387 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:13 min
[INFO] Finished at: 2019-03-19T03:03:36+08:00
[INFO] Final Memory: 65M/420M
[INFO] ------------------------------------------------------------------------

进入distribution/target/apache-rocketmq目录

cd distribution/target/apache-rocketmq

启动的时候先启动 namesrv,然后启动 broker

启动 NameServer

先进入 RocketMQ 安装目录下的 distribution/target/apache-rocketmq 目录

# 启动命令,并且常驻内存
$ nohup sh bin/mqnamesrv &  或者  sh mqnamesrv
# 查看启动日志能看到:The Name Server boot success字样则成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log

nohup sh mqnamesrv & :属于后台启动

sh mqnamesrv :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出

启动 Broker

# 启动命令,并且常驻内存:注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问
> nohup sh bin/mqbroker -n localhost:9876 &
 # 查看启动日志
> tail -f ~/logs/rocketmqlogs/broker.log

nohup sh bin/mqbroker -n localhost:9876 & :属于后台启动

sh bin/mqbroker -n localhost:9876 :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出

我启动的时候报错:

[root@supervisordruid2 apache-rocketmq]# sh bin/mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /root/rocketMq/rocketmq-all-4.3.0/distribution/target/apache-rocketmq/hs_err_pid7473.log

说明是内存不够,不能为虚拟机分配足够多的空间

[root@supervisordruid2 bin]# vi runserver.sh
找到如下配置,进行修改
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"(修改前的)
|
|
|
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"(修改后的,如果还是不行就更小。,比如1g)
[root@supervisordruid2 bin]# vi runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"(修改前的)
找到如下配置,进行修改
|
|
|
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"(修改后的,如果还是不行就更小。,比如1g

如果报错

Invalid initial heap size: -Xms25mg
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

RocktMq安装和简单使用以及报错收集

说明不能正常创建虚拟机,尝试将参数调大,因为rocketmq需要的可能大一点。

比如我调到了1g,注意后面的xmn新生代的大小必须不能比前面两个大,但是我配置0.5页有问题,最后配置成了 1g 1g 1g

然后再次启动 broker 时就成功了,broker 注册到了 nameserver 上了(localhost:9876

[root@supervisordruid2 apache-rocketmq]# sh bin/mqbroker -n localhost:9876
The broker[supervisordruid2, 172.17.42.1:10911] boot success. serializeType=JSON and name server is localhost:9876

使用

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.leesin</groupId>
    <artifactId>RocketMq_demo</artifactId>
    <version>1.0-SNAPSHOT</version>

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.4.0</version>
    </dependency>
</dependencies>
</project>

producer

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
	public static void main(String[] args) throws MQClientException {
		/**
		 * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
		 * 注意:ProducerGroupName需要由应用来保证唯一<br>
		 * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
		 * 因为服务器会回查这个Group下的任意一个Producer
		 */
		DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupTest");

		producer.setNamesrvAddr("10.0.1.225:9876");
		/**
		 * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
		 * 注意:切记不可以在每次发送消息时,都调用start方法
		 */
		producer.start();

		/**
		 * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
		 * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
		 * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
		 * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
		 */
		try {
			{
				Message msg = new Message("broker-a",// topic
						"TagB",// tag
						"OrderID002",// key
						("Hello MetaQ2").getBytes());// body
				SendResult sendResult = producer.send(msg);
				System.out.println(sendResult);
			}
		}
		catch (Exception e) {
			e.printStackTrace();
		}

		/**
		 * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
		 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
		 */
		producer.shutdown();
	}
}

consumer

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {

	/**
	 * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
	 * 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法<br>
	 * @throws MQClientException
	 */
	public static void main(String[] args) throws InterruptedException, MQClientException {
		/**
		 * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
		 * 注意:ConsumerGroupName需要由应用来保证唯一
		 */
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testmerchantLeagueConsumerGroup");
		consumer.setNamesrvAddr("10.0.1.225:9876");

		/**
		 * 订阅指定topic下tags分别等于TagA或TagB
		 */
		consumer.subscribe("broker-a", "TagB || TagA");

		/**
		 * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
		 * 如果非第一次启动,那么按照上次消费的位置继续消费
		 */
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

		consumer.registerMessageListener(new MessageListenerConcurrently() {

			/**
			 * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
			 */
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
															ConsumeConcurrentlyContext context) {
				System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
				MessageExt msg = msgs.get(0);
				if (msg.getTopic().equals("broker-a")) {
					// 执行TopicTest1的消费逻辑
					if (msg.getTags() != null && msg.getTags().equals("TagA")) {
						// 执行TagA的消费
						String message = new String(msg.getBody());
						System.out.println(message);
					}
					else if (msg.getTags() != null && msg.getTags().equals("TagB")) {
						// 执行TagB的消费
						String message = new String(msg.getBody());
						System.out.println(message);
					}

				}
				//消费者向mq服务器返回消费成功的消息
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});

		/**
		 * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
		 */
		consumer.start();

		System.out.println("Consumer Started.");
	}
}

启动producer报错:org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, broker-a

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, broker-a
See http://rocketmq.apache.org/docs/faq/ for further details.
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:561)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1058)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1017)
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:208)
	at Producer.main(Producer.java:35)

查看文章:

https://blog.csdn.net/wangmx1993328/article/details/81588217

可能是上面的jvm参数设置的太小了,重新启动一下rocketmq broker和name

我这里的原因是因为防火墙没开

vi /etc/sysconfig/iptables
添加
-A INPUT -m state --state NEW -m tcp -p tcp --dport 8080 -j ACCEPT

开启防火墙报错:

iptables: Applying firewall rules: iptables-restore v1.4.7: no command specified

Error occurred at line: 26

[root@supervisordruid2 apache-rocketmq]# service iptables restart
iptables: Setting chains to policy ACCEPT: filter nat      [  OK  ]
iptables: Flushing firewall rules:                         [  OK  ]
iptables: Unloading modules:                               [  OK  ]
iptables: Applying firewall rules: iptables-restore v1.4.7: no command specified
Error occurred at line: 26
Try `iptables-restore -h' or 'iptables-restore --help' for more information.
                                                           [FAILED]

因为防火墙的配置文件在修改的过程中多了回车空格导致的。

通过命令查看防火墙状态

[root@supervisordruid2 apache-rocketmq]# /etc/init.d/iptables status
Table: filter
Chain INPUT (policy ACCEPT)
num  target     prot opt source               destination
1    ACCEPT     tcp  --  0.0.0.0/0            0.0.0.0/0           tcp dpt:9876
2    ACCEPT     tcp  --  0.0.0.0/0            0.0.0.0/0           tcp dpt:9876
3    ACCEPT     tcp  --  0.0.0.0/0            0.0.0.0/0           tcp dpt:8889
4    ACCEPT     tcp  --  0.0.0.0/0            0.0.0.0/0           tcp dpt:2375
5    ACCEPT     tcp  --  0.0.0.0/0            0.0.0.0/0           tcp dpt:9876

之后重新启动rocket produer和consumer

运行还是会报错:rocketmq org.apache.rocketmq.remoting.exception.RemotingConnectException:connect to failed

rocketmq org.apache.rocketmq.remoting.exception.RemotingConnectException:connect to failed

这是由于跨域造成的:

修改服务器中broker的配置,添加服务器IP(公网)即可

vim /home/prod/rocketmq-all-4.3.0/distribution/target/apache-rocketmq/conf/broker.conf
新增一行:
brokerIP1=xx.xx.xx.xx  # 你的公网IP,即你的mq安装的哪个服务器的ip,不是你java程序的ip

如何获得自己的公网ip?

curl ifconfig.me
或者
curl cip.cc

然后重启 mqnamesrv,记得先杀死进程

  > nohup sh bin/mqnamesrv &
  > tail -f ~/logs/rocketmqlogs/namesrv.log
  The Name Server boot success...

然后重启Broker, 记得先杀死进程

注意,重点是: -c conf/broker.conf

  > nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
  > tail -f ~/logs/rocketmqlogs/broker.log
  The broker[%s, 172.30.30.233:10911] boot success..

至此终于能够正常启动了,producer和consumer正常。

producer 输出:

SendResult [sendStatus=SEND_OK, msgId=0A0005820F8318B4AAC260221C780000, offsetMsgId=0A0001E100002A9F00000000000DBA37, messageQueue=MessageQueue [topic=broker-a, brokerName=broker-a, queueId=0], queueOffset=1]

consumer输出

Consumer Started.
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=189, queueOffset=1, sysFlag=0, bornTimestamp=1552982448251, bornHost=/10.0.5.130:55358, storeTimestamp=1552953392186, storeHost=/10.0.1.225:10911, msgId=0A0001E100002A9F00000000000DBA37, commitLogOffset=899639, bodyCRC=2145937944, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=broker-a, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=OrderID002, CONSUME_START_TIME=1552982688881, UNIQ_KEY=0A0005820F8318B4AAC260221C780000, WAIT=true, TAGS=TagB}, body=12]]]
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=189, queueOffset=0, sysFlag=0, bornTimestamp=1552982440910, bornHost=/10.0.5.130:55334, storeTimestamp=1552953384869, storeHost=/10.0.1.225:10911, msgId=0A0001E100002A9F00000000000DB97A, commitLogOffset=899450, bodyCRC=2145937944, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=broker-a, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=OrderID002, CONSUME_START_TIME=1552982688881, UNIQ_KEY=0A0005820F8118B4AAC26021FFCD0000, WAIT=true, TAGS=TagB}, body=12]]]
Hello MetaQ2
Hello MetaQ2

感谢:https://blog.csdn.net/wangmx1993328/article/details/81536168

报错

报错文章收集:https://blog.csdn.net/wangmx1993328/article/details/81588217#RemotingTooMuchRequestException%3A sendDefaultImpl call timeout

https://blog.csdn.net/lw5885799/article/details/88646051

https://blog.csdn.net/jiangyu1013/article/details/81478754

com.alibaba.rocketmq.client.exception.MQBrokerException:
CODE: 14  DESC: service not available now, maybe disk full, CL:  0.87 CQ:  0.87 INDEX:  0.87, maybe your broker machine memory too small.

机器的磁盘空间不足导致,删除机器上没用的东西,腾出空间就好了。

idea中启动producer

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:634)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1279)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1225)
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:283)
	at Producer.send(Producer.java:78)
	at Producer.run(Producer.java:96)
	at java.lang.Thread.run(Thread.java:748)
	at Main.main(Main.java:10)
null

说是关了防火墙就可以了,这句话好敷衍对吧,具体的操作看上面的推荐博文,我试了不行,继续往下看

还可能报错:

org.apache.rocketmq.client.exception.MQClientException: No route info of this topic

上面这两个错掺和着来,具体操作看上面的博文,我试了不行,继续往下看。

总结:

rocketmq内存设置

vi runserver.sh

vi runbroker.sh

//搜索到这里
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

rocketmq需要的内存比较大,如果设置太大,系统空间不够,如果设置太小,不能成功构建虚拟机,因为我就需要那么大,你给太小怎么办???(name和brocker都需要设置)

Invalid initial heap size: -Xms25mg
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

这个就是设置太小了,rocketmq虚拟机构建不出来

[root@supervisordruid2 apache-rocketmq]# sh bin/mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /root/rocketMq/rocketmq-all-4.3.0/distribution/target/apache-rocketmq/hs_err_pid7473.log

这个就是设置太大了,我们的机器给不了这么多

所以就需要根据自己的机器设置出机器技能给够运行内存,又不至于设置的太小而起不来,所以具体的还需要自己慢慢试,一点一点试出来。

因为我的机器是公司本地的机器,所以,上面的内存可能不多了

# linux 下 取进程占用 cpu 最高的前10个进程
ps aux|head -1;ps aux|grep -v PID|sort -rn -k +3|head

# linux 下 取进程占用内存(MEM)最高的前10个进程
ps aux|head -1;ps aux|grep -v PID|sort -rn -k +4|head

杀掉没有用的进程,节省空间出来。

上面这个配置好的话,其实已经差不多了,至于上面报错小节中的后两个问题我都迎刃而解。

我最后设置的是:

name:-Xms4g -Xmx4g -Xmn2g

brocker:-Xms4g -Xmx4g -Xmn4g

当然这不是死的,还好我关了本地的kafka,节省了好多内存。

df -h 运行内存查看

free -g 磁盘内存查看

实在不行换台机器。

配置brockerip

上面介绍中有

vim /home/prod/rocketmq-all-4.3.0/distribution/target/apache-rocketmq/conf/broker.conf
新增一行:
brokerIP1=xx.xx.xx.xx  # 你的公网IP,即你的mq安装的哪个服务器的ip,不是你java程序的ip

这里,因为我是本地环境,所以直接写成了本机的内网ip,因为仅供内网测试用。

如何获得自己的公网ip?

curl ifconfig.me
或者
curl cip.cc

启动方式

上面的博文中有一篇对启动方式有很大的强制性:

https://blog.csdn.net/lw5885799/article/details/88646051

博文裁剪:

name启动
nohup ./bin/mqnamesrv -n 你的公网IP:9876 &
不能写localhost 

broker 启动
nohup sh bin/mqbroker -n 你的公网IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &

首先他这个是后台运行的方式

我的方式

name启动
./mqnamesrv

brocker启动
./mqbroker -n localhost:9876
------------------------------
但是建议写成(仅仅是建议)
sh bin/mqbroker -n localhost:9876 -c conf/broker.conf autoCreateTopicEnable=true

我这里是localhost,因为我的demo就是访问内网部署的rocketMq,如果是公网的话根据我的推测写成公网ip把,因为这里仅供本地测试,所以我写成localhost就成功了。

如果往机器上部署,最好再本地看看报错吗

关于防火墙

如果像我一样只是玩玩demo,别搞端口开放了,直接关了防火墙

service iptables stop
service iptables status

当然在其他的机器上

telnet ip 9876

确保这个端口是可用的。