Rabbit简单模式理解

时间:2024-04-04 15:17:11

简单模式

我们以最普通的方式去理解,并没有整合Springboot的那种

这是最简单的模式,一个生产者,一个消费者,一个队列

在这里插入图片描述

测试

1、 导包,没整合,不需要编写配置

2、需要生产者消费者

  • 导包
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
  • Producer
public class Producer {
    public static void main(String[] args) {
        //ip port
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //创建连接工程
        connectionFactory.setHost("47.120.50.213");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        //创建连接connection
        Connection connection = null;
        Channel channel = null;
        try {

            connection = connectionFactory.newConnection("producer");
            //通过连接获取通道Channel
            channel = connection.createChannel();
            //通过创建交换机,声明队列,绑定关系,路由key,发送接收消息
            String queueName = "queue";
            /**
             * 队列的名称
             * 是否要持久化
             * 排他性,是否独占独立
             * 是否自动删除,在最后一个消费者消费完后
             * 携带附属参数
             */
            channel.queueDeclare(queueName,false,false,false,null);
            String message = "hello world";
            //发送消息到消息队列
            channel.basicPublish("",queueName,null,message.getBytes());
            System.out.println("消息发送成功");
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }finally {
            //关闭连接
            if(channel != null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                } catch (TimeoutException e) {
                    throw new RuntimeException(e);
                }
            }

            if(connection != null && connection.isOpen()){

                try {
                    connection.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }

            }
        }
    }
}
  • Consumer
public class Consumer {

    public static void main(String[] args) {
        //ip port
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //创建连接工程
        connectionFactory.setHost("47.120.50.213");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        //创建连接connection
        Connection connection = null;
        Channel channel = null;
        try {

            connection = connectionFactory.newConnection("producer");
            //通过连接获取通道Channel
            channel = connection.createChannel();
            //第一个是消息队列的名字
            channel.basicConsume("queue", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery message) throws IOException {
                    System.out.println("收到的消息的是"+new String(message.getBody(),"UTF-8"));
                }
            },new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("接收消息失败");
                }
            }
            );


            System.out.println("开始接收消息");
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }finally {
            //关闭连接
            if(channel != null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                } catch (TimeoutException e) {
                    throw new RuntimeException(e);
                }
            }

            if(connection != null && connection.isOpen()){

                try {
                    connection.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }

            }
        }
    }
}

总结

代码流程

在这里插入图片描述

  • 上述消息没有设置为持久化

  • 没持久化,消息创建了依旧存在,除非服务器重启,就会删除

  • 持久化,服务器重启后都不会删除

  • 发送消息

    • channel.queueDeclare(queueName,false,false,false,null);
      String message = "hello world";
      //发送消息到消息队列
      channel.basicPublish("",queueName,null,message.getBytes());
      
  • 接收消息

    • channel.basicConsume("queue", true, new DeliverCallback() {
          @Override
          public void handle(String s, Delivery message) throws IOException {
              System.out.println("收到的消息的是"+new String(message.getBody(),"UTF-8"));
          }
      },new CancelCallback() {
          @Override
          public void handle(String s) throws IOException {
              System.out.println("接收消息失败");
          }
      }
      );
      

问题

1、连接超时

在这里插入图片描述

这里可能是NO access ,点击admin修改

在这里插入图片描述

命令方式给用户分配权限

rabbitmqctl set_permissions -p / admin '*' '.*' '.*' 给用户分配权限

发现并没有解决问题

  • 访问的端口时5672,因为15672是给web访问的所以需要访问5672
  • 需要开通安全组与端口号,即5672,15672都需要开启
#开启端口
[root@iZf8zhsqf64x47n1tpdy6oZ rabbitmq]# firewall-cmd --zone=public --add-port=15672/tcp --permanent    
#重启防火墙
firewall-cmd --reload
#需要开启远程安全组

思考

为什么基于channel而不是连接??????

一个应用有多个线程需要从rabbitmq中消费,或是生产消息,那么必然会建立很多个connection ,也就是多个tcp连接,对操作系统而言,建立和销毁tcp连接是很昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现,rabbitmq采用类似nio的做法,连接tcp连接复用,不仅可以减少性能开销,同时也便于管理