【初始RabbitMQ】了解和安装RabbitMQ

时间:2024-02-18 07:48:40

RabbitMQ的概念

RabbitMQ是一个消息中间件:他可以接受并转发消息。例如你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据

四大核心概念

生产者:产生数据发送消息的程序

交换机:交换机是RabbitMQ中的一个重要的部件,一方面它接受来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切的知道如何处理它接受到的消息,是将这些消息推送到特定队列还是推送到多个队列,或者是把消息丢弃,这个得有交换机类型决定

队列:队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但是它们只能存储在的队列中。队列仅受主机内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者:消费者大多数是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在一个机器上。同一个程序既可以是生产者也可以是消费者

RabbitMQ核心部分

各种名词介绍

Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似 于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销是非常大的,效率是很低效的。Channel是在connect内部建立的逻辑连接,如果应用成型鼓支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ的安装

官网地址:rabbitmq.com/download.html

百度网盘大家自取:

链接:https://pan.baidu.com/s/1U7rdXf2yk9PRGxOJxhcY8A?pwd=d0jd 
提取码:d0jd

文件上传

以CentOS系统举例:将以上的软件安装使用命令scp

scp D:\Java学习课件\MQ\软件\rabbitmq-server-3.8.8-1.el7.noarch.rpm root@118.31.6.100:/root

安装文件

rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

常用命令(按照以下顺序)

添加开机启动 RabbitMQ 服务

chkconfig rabbitmq-server on

启动服务

/sbin/service rabbitmq-server start

查看服务状态

/sbin/service rabbitmq-server status

 停止服务(选择执行)

/sbin/service rabbitmq-server stop

开启 web 管理插件

rabbitmq-plugins enable rabbitmq_management

用默认账号密码(guest)访问地址 http://真实IP:15672/出现权限问题使用云服务器的一定要把云服务器的防火墙的打开

添加一个新的用户

创建账号

rabbitmqctl add_user admin 123

设置用户角色

rabbitmqctl set_user_tags admin administrator

设置用户权限

set permissions-p <vhostpath><user><conf> <write><read>

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限

当前用户和角色

rabbitmqctl list_users

利用 admin 用户登录

重置命令

关闭应用的命令

rabbitmqctl stop_app

清除的命令

rabbitmqctl reset

重新启动命令

rabbitmqctl start_app 

Hello World

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.rabbitmq</groupId>
    <artifactId>rabbitmq-hello</artifactId>
    <version>1.0-SNAPSHOT</version>
    <!--指定 jdk 编译版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.6</version> <!-- 使用最新版本 -->
        </dependency>

    </dependencies>

</project>

生产者代码:

package com.rabbitmq.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发消息
 */

public class Produce {
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP连接rabbitmq
        factory.setHost("118.31.6.132");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /*
         *生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
         * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
         * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数 延迟消息等
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息
        String message = "hello world";
        /**
         * 发送一个消息
         * 1.发送到那个交换机
         * 2.路由的KEY值是哪个 本次是队列的名称
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕!");
    }
}

消费者代码:

package com.rabbitmq.one;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class Consume {
    //队列名称
    public static final String QUEUE_NAME = "hello";
    //接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("118.31.6.132");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明 接受消息
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };
        //取消消息的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被中断");
        };
         /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        channel.basicConsume(QUEUE_NAME,true, deliverCallback,cancelCallback);
    }
}

运行结果: