【RabbitMQ】RabbitMQ在Windows的安装和简单的使用

时间:2022-09-12 22:05:23

版本说明

使用当前版本:3.5.4

安装与启动

在官网上下载其Server二进制安装包,在Windows上的安装时简单的,与一般软件没什么区别。

安装前会提示你,还需要安装Erlang,并打开下载页面。把他们都下载安装就ok了。(当然也可先行下载安装)

安装完,服务默认是启动的。

Erlang,应该是一个在并发编程方面很厉害的语言吧。

后期可通过开始菜单启动。

简单的Java客户端连接

编码中有些配置需要特别注意配置,比如:

  • 选择什么交换器,各种交换器的分发策略不一样。
  • 是否自动确认消息。如果RabbitMQ服务器收到该消息的确认消息,会认为该消息已经处理OK了,会把它从队列中删除。
  • 队列是否持久、消息是否持久,就是队列和消息在RabbitMQ服务器重启时是否恢复。
  • 队列是否自动删除,就是队列在无监听的情况下是否自动删除。

引入的客户端包:

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.6</version>
</dependency>

消费者:

package com.nicchagil.rabbit.No001MyFirstDemo;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope; public class Customer { private final static String QUEUE_NAME = "hello world"; public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException { /* 创建连接工厂 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
/* 创建连接 */
Connection connection = factory.newConnection();
/* 创建信道 */
Channel channel = connection.createChannel(); // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Waiting for messages."); /* 定义消费者 */
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received the message -> " + message);
}
}; // 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

生产者:

package com.nicchagil.rabbit.No001MyFirstDemo;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer { private final static String QUEUE_NAME = "hello world"; public static void main(String[] argv) throws java.io.IOException, TimeoutException { Connection connection = null;
Channel channel = null;
try {
/* 创建连接工厂 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
/* 创建连接 */
connection = factory.newConnection();
/* 创建信道 */
channel = connection.createChannel(); // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
channel.queueDeclare(QUEUE_NAME, true, false, false, null); String message = "hello world..."; // 需发送的信息 /* 发送消息,使用默认的direct交换器 */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("Send message -> " + message); } finally {
/* 关闭连接、通道 */
channel.close();
connection.close();
System.out.println("Closed the channel and conn.");
} } }

如无意外,每运行一次生产者(发送一次消息),消费者都会执行一次业务(接收到一次消息)。

执行了两次生产者后,日志如下:

Waiting for messages.
Received the message -> hello world...
Received the message -> hello world...

注:

  • MQ服务器收到确认(ack)信息后,会在队列中删除该消息。如果未收到确认消息,则会继续等待(不存在超时的概念),它直到执行该消息的消费者挂了,才把此遗留的消息重新分发给其它的消费者。

发布与订阅(fanout交换器)

发布与订阅,类似于广播。

下面代码演示:两个消费者创建临时队列后绑定一个fanout类型的交换器,然后生产者往该交换器发送消息,消息被广播到两个绑定的队列中,队列将消息发送给各自的消费者,两个消费者接收到消息完成任务。

消费者A:

package com.nicchagil.rabbit.No003Fadout;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope; public class FanoutCustomerA { public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException { /* 创建连接工厂 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
/* 创建连接 */
Connection connection = factory.newConnection();
/* 创建信道 */
Channel channel = connection.createChannel(); // 创建一个临时的、私有的、自动删除、随机名称的临时队列
String queueName = channel.queueDeclare().getQueue();
System.out.println("queue : " + queueName);
channel.queueBind(queueName, "amq.fanout", "");
System.out.println(FanoutCustomerA.class.getName() + ", waiting for messages."); /* 定义消费者 */
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received the message -> " + message);
}
}; // 开始消费(设置自动确认消息)
channel.basicConsume("", true, consumer);
}
}

消费者B:

package com.nicchagil.rabbit.No003Fadout;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope; public class FanoutCustomerB { public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException { /* 创建连接工厂 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
/* 创建连接 */
Connection connection = factory.newConnection();
/* 创建信道 */
Channel channel = connection.createChannel(); // 创建一个临时的、私有的、自动删除、随机名称的临时队列
String queueName = channel.queueDeclare().getQueue();
System.out.println("queue : " + queueName);
channel.queueBind(queueName, "amq.fanout", "");
System.out.println(FanoutCustomerB.class.getName() + ", waiting for messages."); /* 定义消费者 */
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received the message -> " + message);
}
}; // 开始消费(设置自动确认消息)
channel.basicConsume("", true, consumer);
}
}

生产者:

package com.nicchagil.rabbit.No003Fadout;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class FanoutProducer { public static void main(String[] argv) throws java.io.IOException, TimeoutException { Connection connection = null;
Channel channel = null;
try {
/* 创建连接工厂 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
/* 创建连接 */
connection = factory.newConnection();
/* 创建信道 */
channel = connection.createChannel(); String message = "hello world..."; // 需发送的信息 /* 发送消息,使用默认的fanout交换器 */
channel.basicPublish("amq.fanout", "", null, message.getBytes());
System.out.println("Send message -> " + message); } finally {
/* 关闭连接、通道 */
channel.close();
connection.close();
System.out.println("Closed the channel and conn.");
} } }

先将两个消费者跑起来,然后运行生产者发送一条消息。

正常来说,消费者A、消费者B都收到消息并执行。

消费者A的日志:

queue : amq.gen-F3EYfr68AHvfZTIJUcN_Ug
com.nicchagil.rabbit.No003Fadout.FanoutCustomerA, waiting for messages.
Received the message -> hello world...

消费者B的日志:

queue : amq.gen-AV_XDQtB-LFPK8bDy31PTw
com.nicchagil.rabbit.No003Fadout.FanoutCustomerB, waiting for messages.
Received the message -> hello world...

管理控制台

我们可以通过以下命令启用管理控制台:

rabbitmq-plugins enable rabbitmq_management

然后由此地址(http://localhost:15672)进入,默认端口是15672,默认账户是guest/guest。

进入后,可以看到Overview、Connections、Channels、Exchanges、Queues、Admin几个页签,此控制台的功能各种强大,不仅可以查看信息,还可以增、删信息,非常棒。

消费者的异常处理器

如果消费者方法体中发生异常没被捕捉并处理,如果使用默认的异常处理器,消费者的信道会关闭,不继续执行任务。

比如以下例子,遇到空字符串则抛出运行时异常:

package com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope; public class Customer { private final static String QUEUE_NAME = "hello world"; public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException { /* 创建连接工厂、连接、通道 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); // 声明消息队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Waiting for messages."); /* 定义消费者 */
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8"); if (message == null || message.length() == 0) {
throw new RuntimeException("The input str is null or empty...");
} System.out.println("Received the message -> " + message);
}
}; // 将消费者绑定到队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

然后,我们用这个消费者监听一个队列,且此队列只有这个消费者,用于测试队列是否堵塞,也就是这个消费者是否不继续消费。

先用生产者发送“hello world...”(正常参数),再发送“”(异常参数),最后发送“hello world...”(正常参数)。

可见如下日志,消费者发生异常后,没有响应第三个“hello world...”的消息,也可进入控制台,会发现此消息为Ready状态,等待消费。

原因在于,默认的异常处理器为DefaultExceptionHandler,其继承StrictExceptionHandler,从源码看,遇到异常它会关闭信道。

日志如下:

Waiting for messages.
Received the message -> hello world...
com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办.Customer$1@630bd3f1 (amq.ctag-QzWI1jxh4h23rOFJM63cBA) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1):
java.lang.RuntimeException: The input str is null or empty...
at com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办.Customer$1.handleDelivery(Customer.java:40)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

遇到异常,如果需使用别的处理方式,可以设置自定义的异常处理器。

以下的异常处理器,只是Demo。各方法体中只打印相关信息供查看:

package com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ExceptionHandler;
import com.rabbitmq.client.TopologyRecoveryException; public class MyExceptionHandler implements ExceptionHandler { public void handleUnexpectedConnectionDriverException(Connection conn,
Throwable exception) {
System.out.println("MyExceptionHandler.handleUnexpectedConnectionDriverException");
} public void handleReturnListenerException(Channel channel,
Throwable exception) {
System.out.println("MyExceptionHandler.handleReturnListenerException");
} public void handleFlowListenerException(Channel channel, Throwable exception) {
System.out.println("MyExceptionHandler.handleFlowListenerException");
} public void handleConfirmListenerException(Channel channel,
Throwable exception) {
System.out.println("MyExceptionHandler.handleConfirmListenerException");
} public void handleBlockedListenerException(Connection connection,
Throwable exception) {
System.out.println("MyExceptionHandler.handleBlockedListenerException");
} public void handleConsumerException(Channel channel, Throwable exception,
Consumer consumer, String consumerTag, String methodName) {
// 正常渠道应该有专业的LOG框架打印,此处简单处理
exception.printStackTrace();
System.out.println("MyExceptionHandler.handleConsumerException");
} public void handleConnectionRecoveryException(Connection conn,
Throwable exception) {
System.out.println("MyExceptionHandler.handleConnectionRecoveryException");
} public void handleChannelRecoveryException(Channel ch, Throwable exception) {
System.out.println("MyExceptionHandler.handleChannelRecoveryException");
} public void handleTopologyRecoveryException(Connection conn, Channel ch,
TopologyRecoveryException exception) {
System.out.println("MyExceptionHandler.handleTopologyRecoveryException");
} }

设置自定义的异常处理器:

factory.setExceptionHandler(new MyExceptionHandler());

像上述那样,先传递“hello world...”,再传递“”(空字符串),最后传递“hello world...”。观察如下日志,可见发生异常后,消费者正常响应消息。

Waiting for messages.
Received the message -> hello world...
java.lang.RuntimeException: The input str is null or empty...
at com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办.Customer$1.handleDelivery(Customer.java:41)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
MyExceptionHandler.handleConsumerException
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Received the message -> hello world...

具体应该根据什么策略进行异常处理,这是个是值得深思的问题,与业务的性质有关。什么情况下消费者应不继续响应请求,什么情况下消费者应继续相应,这个在于业务的性质而定。

参考的优秀文章

【RabbitMQ】RabbitMQ在Windows的安装和简单的使用的更多相关文章

  1. RabbitMQ学习在windows下安装配置

    RabbitMQ学习一. 在windows下安装配置 1.下载并安装erlang,http://www.erlang.org/download.html,最新版是R15B01(5.9.1).由于我机器 ...

  2. RabbitMQ介绍及windows下安装使用

    RebbitMQ介绍 RabbitMQ是一个由 Erlang (一种通用的面向并发的编程语言)开发的AMQP(Advanced Message Queue )的开源实现,Rabbit MQ 是建立在E ...

  3. Redis Windows版安装及简单使用

    1.Redis简介及优势 Redis 是完全开源免费的,遵守BSD协议,是一个高性能的key-value数据库. 特点: Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次 ...

  4. windows下安装laravel简单步骤以及碰到的问题

    1.下载并安装composerhttp://pkg.phpcomposer.com/ ========================================================= ...

  5. 在 Windows 上安装 TensorFlow(转载)

    在 Windows 上安装 TensorFlow windows下配置安装Anaconda+tensorflow Spyder——科学的Python开发环境 Windows7 安装TensorFlow ...

  6. RabbitMQ消息队列系列教程(二)Windows下安装和部署RabbitMQ

    摘要 本篇经验将和大家介绍Windows下安装和部署RabbitMQ消息队列服务器,希望对大家的工作和学习有所帮助! 目录 一.Erlang语言环境的搭建 二.RabbitMQ服务环境的搭建 三.Ra ...

  7. Linux RabbitMQ的安装、环境配置、远程访问 , Windows 下安装的RabbitMQ远程访问

    Linux  RabbitMQ的安装和环境配置 1.安装 RabbitMQ是使用Erlang语言编写的,所以安装RabbitMQ之前,先要安装Erlang环境 #对原来的yum官方源做个备份 1.mv ...

  8. windows下安装rabbitmq的php扩展amqp

    最近研究rabbitmq队列,linux安装这样的软件一向都是很方便的,但是windows可能会比较麻烦,所以对windows的安装做个记录. windows上使用的php扩展为dll文件,首先去下载 ...

  9. Windows下安装RabbitMQ

    今天正好给自己机器安装rabbitmq,总结下安装经验. 现在国内访问erlang,和 RabbitMQ 官网好像都很难连上.我已下载好了资源,需要的朋友可以下载. 链接: https://pan.b ...

随机推荐

  1. SQL查出异常数据(ORA-01722&colon; 无效数字)

    -- Created on 2015/4/29 by MENGHU DECLARE -- Local variables here I INTEGER; BEGIN FOR OPEN_DATA IN ...

  2. Mybatis分页插件

    mybatis配置 <!-- mybatis分页插件 --> <bean id="pagehelper" class="com.github.pageh ...

  3. VC6&period;0中MFC界面换肤简例

    利用VC中的MFC进行界面设计时,发现界面上的各控件无法简易地进行调整,比如字体大小.颜色.格式等. 为了改变外观,小小地美化一下,今天决定动手一试. 网上提供的库和方法不计其数,我选择了SkinMa ...

  4. &lbrack;Papers&rsqb;MHD&comma; &dollar;&bsol;p&lowbar;3&bsol;pi&dollar;&comma; Lebesgue space &lbrack;Cao-Wu&comma; JDE&comma; 2010&rsqb;

    $$\bex \p_3\pi\in L^p(0,T;L^q(\bbR^3)),\quad \frac{2}{p}+\frac{3}{q}=\frac{12}{7},\quad \frac{12}{7} ...

  5. Python学习笔记23&colon;Django构建一个简单的博客网站(一个)

    在说如何下载和安装Django,本节将重点讨论如何使用Django站点. 一 新建project 命令:django-admin startproject mysite # 有的须要输入:django ...

  6. GCD HDU - 2588

    输入 N 和 M (2<=N<=1000000000, 1<=M<=N), 找出所有满足1<=X<=N 且 gcd(X,N)>=M 的 X 的数量. Inpu ...

  7. C&plus;&plus; GetSystemDirectory&lpar;&rpar;

    关于GetSystemDirectory function,参考:https://msdn.microsoft.com/en-us/library/windows/desktop/ms724373(v ...

  8. 最课程学员启示录:这么PL的小姐姐你要不要

    最课程学员启示录:这么PL的小姐姐给你做……你要不要? 想撒呢,给你做程序媛你要不要? 一句话,先上图,而且必须是经得住考验的素颜无码高清大图身份照: 我觉得未来我们可以搞个校花评选,你们不反对的话, ...

  9. 中国移动CMPP协议、联通SGIP协议、电信SMGP协议短信网关

    移动cmpp协议 英文缩写:CMPP (China Mobile Peer to Peer) 中文名称:中国移动通信互联网短信网关接口协议 说明:为中国移动通信集团公司企业规范.规范中描述了中国移动短 ...

  10. 《PHP和MySQL Web开发》读书笔记(上篇)

    最近过得太浮躁了,实在自己都看不下去了,看了PHP圣经之后,觉得非常有必要要总结一下. Chapter1.快速入门 ·PHP标记:总共有三种风格,常用的还是XML风格为主 <?php echo ...