2.7日学习打卡----初学RabbitMQ(二)

时间:2024-02-29 12:59:50

2.7日学习打卡

目录:

  • 2.7日学习打卡
    • 一. RabbitMQ 简单模式![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/42009c68e078440797c3183ffda6955d.png)
      • 生产者代码实现
      • 消费者代码实现
    • 二. RabbitMQ 工作队列模式
      • 生产者代码实现
      • 消费者代码实现
    • 三. RabbitMQ 发布订阅模式
      • 生产者代码实现
      • 消费者代码实现
    • 四. RabbitMQ 路由模式
      • 生产者代码实现
      • 消费者代码实现
    • 五. RabbitMQ 通配符模式
      • 生产者代码实现
      • 消费者代码实现

在这里插入图片描述
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则
——JMS,用于操作消息中间件。JMS即Java消息服务
(JavaMessage Service)应用程序接口,是一个Java平台中关于面
向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多
MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没
有实现JMS规范,但是开源社区有JMS的实现包。

创建项目

# 开启管控台插件
rabbitmq-plugins enable
rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

创建普通maven项目,添加RabbitMQ依赖:

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqpclient</artifactId>
        <version>5.14.0</version>
    </dependency>
</dependencies>

一. RabbitMQ 简单模式在这里插入图片描述

P:生产者,也就是要发送消息的程序

C:消费者:消息的接收者,会一直等待消息到来

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

特点:

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机

生产者代码实现

步骤:

  1. 创建连接工厂ConnectionFactory
  2. 设置工厂的参数
  3. 创建连接 Connection
  4. 创建管道 Channel
  5. 简单模式中没有交换机exchange,所以不用创建(RabbitMQ会使用默认的交换机!)
  6. 创建队列 queue
  7. 设置发送内容,使用channal.basicPublish()发送
  8. 释放资源

代码实现

package com.jjy.mq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//生产者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //使用自己的服务器ip地址
        connectionFactory.setHost("192.168.66.100");
        //rabbitmq的默认端口5672
        connectionFactory.setPort(5672);
        //用户名
        connectionFactory.setUsername("jjy");
        //密码
        connectionFactory.setPassword("jjy");
        //虚拟机
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列,如果队列已存在,则使用该队列
        /**
        //     * 参数1:队列名
        //     * 参数2:是否持久化,true表示MQ重启后队列还在。
        //     * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
        //     * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
        //     * 参数5:其他额外参数
        //     */
        channel.queueDeclare("simple_queue",false,false,false,null);
        //5.发送消息
        String mesg="hello rabbitmq";
        /**
         * 参数1:交换机名,""表示默认交换机
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
        channel.basicPublish("","simple_queue",null,mesg.getBytes());

        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
        System.out.println("发送成功");
    }
}

消费者代码实现

在这里插入图片描述

步骤:

1.创建连接工厂ConnectionFactory
2.设置工厂参数
3.创建连接
4.创建信道
前四步代码基本是一致的,需要注意的是生产者与消费者的Channel是不同Connection中的!不是同一个对象.
5. 最简单的模型没有交换机exchange,所以此处RabbitMQ会使用默认的交换机
6. 接收消息,有一个回调方法 channel.basicConsume()

代码实现

package com.jjy.mq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.监听队列
        /**
         * 参数1:监听的队列名
         * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
         * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
         */
        channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("接受消息,消息为:"+message);
            }
        });
        //
    }
}

二. RabbitMQ 工作队列模式

在这里插入图片描述
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该
模式也使用direct交换机,应用于处理消息较多的情况。特点如
下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

生产者代码实现

代码实现

package com.jjy.mq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        // 2.创建连接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        // 4.创建队列,持久化队列
        channel.queueDeclare("work_queue",true,false,false,null);
        // 5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中
        for(int i=0;i<100;i++){

            channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("你好,这是今天的第"+i+"条消息").getBytes());
        }
        // 6.关闭资源
        channel.close();
        connection.close();
    }
}

消费者代码实现

在这里插入图片描述

消费者1:

package com.jjy.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        // 4.监听队列,处理消息
        channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者1消费消息,消息为:" + message);
            }
        });

    }
}

消费者2

package com.jjy.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer2 {

        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.66.100");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("jjy");
            connectionFactory.setPassword("jjy");
            connectionFactory.setVirtualHost("/");
            //2.创建连接
            Connection connection = connectionFactory.newConnection();
            //3.建立信道
            Channel channel = connection.createChannel();
            // 4.监听队列,处理消息
            channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("消费者2消费消息,消息为:" + message);
                }
            });

        }


}


消费者3

package com.jjy.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer3 {

        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.66.100");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("jjy");
            connectionFactory.setPassword("jjy");
            connectionFactory.setVirtualHost("/");
            //2.创建连接
            Connection connection = connectionFactory.newConnection();
            //3.建立信道
            Channel channel = connection.createChannel();
            // 4.监听队列,处理消息
            channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("消费者3消费消息,消息为:" + message);
                }
            });

        }
    }


三. RabbitMQ 发布订阅模式

在这里插入图片描述
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机

C:消费者,消息的接收者,会一直等待消息到来

Queue:消息队列,接收消息、缓存消息

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电
商网站的同一条促销信息需要短信发送、邮件发送、站内信发送
等。此时可以使用发布订阅模式(Publish/Subscribe)
特点:

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

Exchange:交换机(X)一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic(常用):通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失
在这里插入图片描述

生产者代码实现

与之前的步骤相比,多了创建交换机和绑定交换机与队列的操作

代码实现

package com.jjy.mq.publish;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class produce {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机
        
        /*
        exchangeDeclare(String exchange,                  -- 交换机的名称
                        String type,                      -- 交换机的类型,4种
                                                             枚举(direct,fanout,topic,headers)
                        boolean durable,                  -- 持久化
                        boolean autoDelete,               -- 自动删除
                        boolean internal,                 -- 内部使用,基本是false
                        Map<String, Object> arguments)    -- 参数
         *
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
        channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
        //5.创建队列
        //短信队列
        channel.queueDeclare("SEND_MAIL",true,false,false,null);
        //消息队列
        channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
        //站内信息
        channel.queueDeclare