RocketMQ入门

时间:2024-01-28 22:10:03

RocketMQ简介

image
image
image

RocketMQ基本概念

image
image
image

RocketMQ安装运行

wusi@wusi-virtual-machine:~/桌面$ sudo wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
wusi@wusi-virtual-machine:~/桌面$ unzip rocketmq-all-4.7.1-bin-release.zip
root@wusi-virtual-machine:/home/wusi/桌面/rocketmq-all-4.7.1-bin-release# mkdir log
root@wusi-virtual-machine:/home/wusi/桌面/rocketmq-all-4.7.1-bin-release# nohup bin/mqnamesrv > log/mqname.log  2>&1  &
root@wusi-virtual-machine:/home/wusi/桌面/rocketmq-all-4.7.1-bin-release# nohup bin/mqbroker -n 172.16.20.246:9876 -c conf/broker.conf autoCreateTopicEnable=true >   log/borker.log 2>&1 &

2>&1 的意思就是将标准错误重定向到标准输出。
还需要修改runserver.sh,不要设置的太大
image
修改conf目录下的broker.conf
image
修改runbroker.sh,不要设置的太大
image

RocketMQ架构方案及角色详解

image
image
image

API的使用

生产者启动后,发送消息时会报以下错:
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest

原因:
使用RocketMQ进行发消息时,必须要指定topic,对于topic的设置有一个开关autoCreateTopicEnable,一般在开发测试环境中会使用默认设置autoCreateTopicEnable = true,
但是这样就会导致topic的设置不容易规范管理,没有统一的审核等等,所以在正式环境中会在Broker启动时设置参数autoCreateTopicEnable = false。

但是,目前的版本中,autoCreateTopicEnable设置为true也不会生效

解决方法:
手动通过命令或管理界面创建主题

/usr/rocketmq/bin/mqadmin updateTopic -n '192.168.100.242:9876' -c DefaultCluster -t TopicTest

消费者代码

package com.study.rocketmq.a151_simple;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * 普通消息消费者
 */
public class Consumer {

    public static final String NAME_SERVER_ADDR = "192.168.100.242:9876";

    public static void main(String[] args) throws MQClientException {
        // 1. 创建消费者(Push)对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST");

        // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
        consumer.setNamesrvAddr(NAME_SERVER_ADDR);
        consumer.setMaxReconsumeTimes(-1);// 消费重试次数 -1代表16次
        // 3. 订阅对应的主题和Tag
        consumer.subscribe("TopicTest", "*");

        // 4. 注册消息接收到Broker消息后的处理接口
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    MessageExt messageExt = list.get(0);
                    System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });

        // 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错)
        consumer.start();

        System.out.println("已启动消费者");
    }
}

同步生产者代码

package com.study.rocketmq.a151_simple;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

/**
 * 发送同步消息
 * 可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。
 */
public class SyncProducer {
    public static final String NAME_SERVER_ADDR = "192.168.100.242:9876";
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 1. 创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");

        // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
        producer.setNamesrvAddr(NAME_SERVER_ADDR);

        // 3. 启动生产者
        producer.start();

        // 4. 生产者发送消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult result = producer.send(message);

            System.out.printf("发送结果:%s%n", result);
        }

        // 5. 停止生产者
        producer.shutdown();
    }
}

image
image
异步生产者

package com.study.rocketmq.a151_simple;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;

/**
 * 异步消息
 * 一般用来对方法调用响应时间有较严格要求的情况下,异步调用,立即返回
 * 不同于同步的唯一在于: send方法调用的时候多携带一个回调接口参数,用来异步处理消息发送结果
 */
public class AsyncProducer {
    public static final String NAME_SERVER_ADDR = "192.168.100.242:9876";
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
        // 1:创建生产者对象,并指定组名
        DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");

        // 2:指定NameServer地址
        producer.setNamesrvAddr(NAME_SERVER_ADDR);

        // 3:启动生产者
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0); // 设置异步发送失败重试次数,默认为2

        int count = 10;
        CountDownLatch cd = new CountDownLatch(count);
        // 4:循环发送消息
        for (int i = 0; i < count; i++) {
            final int index = i;

            // ID110:业务数据的ID,比如用户ID、订单编号等等
            Message msg = new Message("TopicTest", "TagB", "ID110", ("Hello World " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送异步消息
            producer.send(msg, new SendCallback() {
                /**
                 * 发送成功的回调函数
                 * 但会结果有多种状态,在SendStatus枚举中定义
                 * @param sendResult
                 */
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK MSG_ID:%s %n", index, sendResult.getMsgId());
                    cd.countDown();
                }

                /**
                 * 发送失败的回调函数
                 * @param e
                 */
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                    cd.countDown();
                }
            });
        }

        // 确保消息都发送出去了
        cd.await();
        // 5:关闭生产者
        producer.shutdown();
    }
}

image
image
单向模式

package com.study.rocketmq.a151_simple;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

/**
 * 单向模式
 * 一般用来对可靠性有一定要求的消息发送,例如日志系统
 * 不同于同步的唯一之处在于:调用的是sendOneway方法,且方法不返回任何值,即调用者不需要关心成功或失败
 */
public class OnewayProducer {
    public static final String NAME_SERVER_ADDR = "192.168.100.242:9876";

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
        // 1:创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");

        // 2:指定NameServer地址
        producer.setNamesrvAddr(NAME_SERVER_ADDR);

        // 3:启动生产者
        producer.start();

        // 4:发送消息
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TopicTest", "TagC", ("Hello OneWay :" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.sendOneway(msg);
        }
        System.out.println("消息已发送");

        producer.shutdown();
    }
}

image
image

都是连接NAME_SERVER然后选择相应的集群

image

消息的高可用

从内存将消息写入磁盘,然后将消息同步到从服务器中,数据有同步写入和异步写入俩种方式,master之间没有通信

可以理解为同步就是消息立刻写入
异步就是消息累计一段时间后写入

image
双写、同步复制
image
双主双从,四台机器属于同一个集群
image