(一)RabbitMQ基本概念
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。我曾经对这门语言挺有兴趣,学过一段时间,后来没坚持。RabbitMQ是 AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。
RabbitMQ的结构图如下:
1、几个概念说明:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
2、消息队列的使用过程大概如下:
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符 号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还 有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
3、关联关系
从示意图可以看出消息生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange的Channel,将消息发送给Exchange,Exchange根据规则,将消息转发给指定的消息队列。消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。
这里谈到的Channel可以理解为建立在生产者/消费者和RabbitMQ服务器之间的TCP连接上的虚拟连接,一个TCP连接上可以建立多个Channel。 RabbitMQ服务器的Exchange对象可以理解为生产者发送消息的邮局,消息队列可以理解为消费者的邮箱。Exchange对象根据它定义的规则和消息包含的routing key以及header信息将消息转发到消息队列。channel下图中浅红色框起来的两块所示:
根据转发消息的规则不同,RabbitMQ服务器中使用的Exchange对象有四种,Direct Exchange, Fanout Exchange, Topic Exchange, Header Exchange,如果定义Exchange时没有指定类型和名称, RabbitMQ将会为每个消息队列设定一个Default Exchange,它的Routing Key是消息队列名称。
RabbitMQ Java Client的官网示例有6个,本篇只使用三个例程,分别是使用默认Default Exchange的消息生产/消费,使用Direct Exchange的消息生产/消费,以及RPC方式的消息生产/消费。
为了测试方便,我们新定义了一个virutal host,名字是test_vhosts,定义了两个用户rabbitmq_producer和rabbitmq_consumer, 设置其user_tag为administrator(可以进行远程连接), 为它们设置了访问test_vhosts下所有资源的权限。
创建virutal host,在Admin-->Virtual Hosts(右侧的导航栏上)打开:
创建用户:
为用户设置权限:(在用户列表上点击某个用户进入设置页面)
使用默认Default Exchange的消息生产/消费
我们定义一个生产者程序,一个消费者程序。
生产者程序代码如下:
package com.gl365.payment.util.rabbitmq.demo1; import java.io.IOException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import io.netty.handler.timeout.TimeoutException; public class ProducerApp {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbitmq_producer");
factory.setPassword("rabbitmq_producer");
factory.setVirtualHost("test_vhosts"); //创建与RabbitMQ服务器的TCP连接
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("firstQueue", true, false, false, null);
String message = "First Message";
channel.basicPublish("", "firstQueue", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
catch(Exception ex)
{
ex.printStackTrace();
}
finally
{
if(channel != null)
{
try {
channel.close();
} catch (java.util.concurrent.TimeoutException e) {
e.printStackTrace();
}
}
if(connection != null)
{
connection.close();
}
}
}
}
关于生产者的代码有几点说明:
1) RabbitMQ Java Client示例提供的ConnectionFactory属性设置的代码只有一句:
factory.setHost("localhost");
这句代码表示使用rabbitmq服务器默认的virutal host(“/”),默认的用户guest/guest进行连接,但是如果这段代码运行在远程机器上时, 将因为guest用户不能用于远程连接RabbitMQ服务器而运行失败,上面提供的代码是可以进行建立远程连接的代码。
2)Channel建立后,调用Channel.queueDeclare方法创建消息队列firstQueue。
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
这个方法的第二个参数durable表示建立的消息队列是否是持久化(RabbitMQ重启后仍然存在,并不是指消息的持久化),第三个参数exclusive 表示建立的消息队列是否只适用于当前TCP连接,第四个参数autoDelete表示当队列不再被使用时,RabbitMQ是否可以自动删除这个队列。 第五个参数arguments定义了队列的一些参数信息,主要用于Headers Exchange进行消息匹配时。
3)生产者发送消息使用Channel.basicPublish方法。
void basicPublish(String exchange, String routingKey,
BasicProperties props, byte[] body) throws IOException;
第一个参数exchange是消息发送的Exchange名称,如果没有指定,则使用Default Exchange。 第二个参数routingKey是消息的路由Key,是用于Exchange将消息路由到指定的消息队列时使用(如果Exchange是Fanout Exchange,这个参数会被忽略), 第三个参数props是消息包含的属性信息。RabbitMQ的消息属性和消息体是分开的,不像JMS消息那样同时包含在javax.jms.Message对象中,这一点需要特别注意。 第四个参数body是RabbitMQ消息体。 我们这里调用basicPublish方法发送消息时,props参数为null,因而我们发送的消息是非持久化消息,如果要发送持久化消息,我们需要进行如下设置:
AMQP.BasicProperties props =
new AMQP.BasicProperties("text/plain",
"UTF-8",
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
channel.basicPublish("", "firstQueue", props, message.getBytes());
定义props时的参数2表示消息的类型为持久化消息。 运行生产者程序后,我们可以执行rabbitmqctl命令查看队列消息,我们看到firstQueue队列有一条消息。
消费者代码如下:
package com.gl365.payment.util.rabbitmq.demo1; import java.io.IOException; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope; public class ConsumerApp {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbitmq_consumer");
factory.setPassword("rabbitmq_consumer");
factory.setVirtualHost("test_vhosts");
connection = factory.newConnection();
channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" Consumer have received '" + message + "'");
}
};
channel.basicConsume("firstQueue", true, consumer);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
消费者代码中,建立Connection,Channel的代码和生产者程序类似。它主要定义了一个Consumer对象,这个对象重载了DefaultCustomer类 的handleDelivery方法:
void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
handleDelivery方法的第一个参数consumerTag是接收到消息时的消费者Tag,如果我们没有在basicConsume方法中指定Consumer Tag,RabbitMQ将使用随机生成的Consumer Tag(如下图所示)
第二个参数envelope是消息的打包信息,包含了四个属性:
1._deliveryTag,消息发送的编号,表示这条消息是RabbitMQ发送的第几条消息,我们可以看到这条消息是发送的 第一条消息。
2._redeliver,重传标志,确认在收到对消息的失败确认后,是否需要重发这条消息,我们这里的值是false,不需要重发。
3._exchange,消息发送到的Exchange名称,正如我们上面发送消息时一样,exchange名称为空,使用的是Default Exchange。
4._routingKey,消息发送的路由Key,我们这里是发送消息时设置的“firstQueue”。
第三个参数properties就是上面使用basicPublish方法发送消息时的props参数,由于我们上面设置它为null,这里接收到的properties 是默认的Properties,只有bodySize,其他全是null。
第四个参数body是消息体.
我们这里重载的handleDelivery方法仅仅打印出了生产者发送的消息内容,实际使用时可以转发给后台程序进行处理。
在Consumer对象定义后,我们调用了Channel.basicConsume方法将Consumer与消息队列绑定,否则Consumer无法从消息队列获取消息。
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException
basicConsume方法的第一个参数是Consumer绑定的队列名,第二个参数是自动确认标志,如果为true,表示Consumer接受到消息后,会自动发确认消息(Ack消息)给消息队列,消息队列会将这条消息从消息队列里删除,第三个参数就是Consumer对象,用于处理接收到的消息。
如果我们想让消费者接收到消息后对消息进行手动确认(Manual Ack),我们需要对代码进行两处改动:
1)在调用basicConsume方法时,将autoAck属性设置为false。
channel.basicConsume("firstQueue", false, consumer);
2)在handleDelivery方法中调用Channel.basicAck方法,发送手动确认消息给消息队列。
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException
{
this.getChannel().basicAck(envelope.getDeliveryTag(), false);
}
basicAck方法有两个参数,第一个参数deliverTag是消息的发送编号,第二个参数multiple是消息确认方式,如果值为true,表示对消息队列里所有编号小于或等于当前消息编号的未确认消息进行手动确认,如果为false,表示仅确认当前消息。
消费者代码执行后,我们可以看到消费者程序的控制台输出了这条消息的内容,而且使用rabbitmqctl命令查看队列消息时,队列里的消息数为0。
使用Direct Exchange的消息生产/消费
使用Direct Exchange的生产者/消费者代码与Default Exchange比较类似,不过生产者程序的代码需要添加创建Direct Exchange和 将Exchange和消息队列绑定的代码,具体添加和修改的代码如下:
package com.gl365.payment.util.rabbitmq.demo2; import java.io.IOException; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import io.netty.handler.timeout.TimeoutException; public class ProducerApp {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbitmq_producer");
factory.setPassword("rabbitmq_producer");
factory.setVirtualHost("test_vhosts"); //创建与RabbitMQ服务器的TCP连接
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("directExchange", "direct");
channel.queueDeclare("directQueue", true, false, false, null);
channel.queueBind("directQueue", "directExchange", "directMessage");
String message = "First Direct Message"; channel.basicPublish("directExchange", "directMessage", null, message.getBytes());
System.out.println("Send Direct Message is:'" + message + "'");
}
catch(Exception ex)
{
ex.printStackTrace();
}
finally
{
if(channel != null)
{
try {
channel.close();
} catch (java.util.concurrent.TimeoutException e) {
e.printStackTrace();
}
}
if(connection != null)
{
connection.close();
}
}
}
}
首先我们调用Channel.exchangeDeclare方法创建名为“directExchange”的Direct Exchange。
Exchange.DeclareOk exchangeDeclare(String exchange, String type,boolean durable) throws IOException
exchangeDeclare方法的第一个参数exchange是exchange名称,第二个参数type是Exchange类型,有“direct”,“fanout”,“topic”,“headers”四种,分别对应RabbitMQ的四种Exchange。第三个参数durable是设置Exchange是否持久化( 即在RabbitMQ服务器重启后Exchange是否仍存在,如果没有设置,默认是非持久化的)
创建“directQueue”消息队列后,我们再调用Channel.queueBind方法,将我们创建的Direct Exchange和消息队列绑定。
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
queueBind方法第一个参数queue是消息队列的名称,第二个参数exchange是Exchange的名称,第三个参数routingKey是消息队列和Exchange之间绑定的路由key,我们这里绑定的路由key是“directMessage”。从Exchange过来的消息,只有routing key为“directMessage”的消息会被转到消息队列“directQueue”,其他消息将不会被转发,下面将证实这一点。
运行ProducerApp程序,使用rabbitmq_producer用户登录管理页面,我们可以看到名为“directExchange”的Direct Exchange被创建出来。
消息队列directQueue与它绑定,routing key为directMessage。
消息队列directQueue里有一条消息
我们修改ProducerApp的程序,将消息的routing key改为“indirectMessage”
package com.gl365.payment.util.rabbitmq.demo2; import java.io.IOException; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import io.netty.handler.timeout.TimeoutException; public class ProducerApp {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbitmq_producer");
factory.setPassword("rabbitmq_producer");
factory.setVirtualHost("test_vhosts"); //创建与RabbitMQ服务器的TCP连接
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("directExchange", "direct");
channel.queueDeclare("directQueue", true, false, false, null);
channel.queueBind("directQueue", "directExchange", "directMessage");
//String message = "First Direct Message";
String message = "First Indirect Message";
channel.basicPublish("directExchange", "indirectMessage", null, message.getBytes());
System.out.println("Send Indirect Message is:'" + message + "'"); //channel.basicPublish("directExchange", "indirectQueue", null, message.getBytes());
//System.out.println("Send Direct Message is:'" + message + "'");
}
catch(Exception ex)
{
ex.printStackTrace();
}
finally
{
if(channel != null)
{
try {
channel.close();
} catch (java.util.concurrent.TimeoutException e) {
e.printStackTrace();
}
}
if(connection != null)
{
connection.close();
}
}
}
}
再次运行程序后,打开管理页面,我们看到“directQueue”队列里仍然只有一条消息。
我们向Exchange发送的第二条消息由于和绑定的routing key不一致,没有被转发到“directQueue”消息队列,说明被RabbitMQ丢弃了。
我们通过管理界面再创建一个消息队列“indirectQueue”,在它和“directExchange”之间建立bind关系,routingkey为“indirectMessage” 。
再绑定一个
再次运行ProducerApp程序,我们可以看到“directQueue”消息队列消息数仍是1,但“indirectQueue”消息队列接收到了从Exchange转发来的消息。
使用RPC方式的消息生产/消费
RPC方式的消息生产和消费示意图如下:
在这种方式下,生产者和消费者之间的消息发送/接收流程如下:
1)生产者在发送消息的同时,将返回消息的消息队列名(replyTo中指定)以及消息关联Id(correlationId)附带在消息Properties中发送给消费者。
2)消费者在接收到消息,处理完成后,将结果作为返回消息发送到replyTo指定的返回消息队列中,同时附带接收消息中的corrleationId, 以便让生产者接收到到返回消息后,根据corrleationId确认是针对1)中发送消息的返回消息,如果correlationId确认一致,则将返回消息 取出,进行后续处理。
示意图中的生产者和消费者在发送消息时使用的都是Default Exchange,我们接下来的程序做一点改动,使用Direct Exchange。
在我们的程序中,生产者发送一个数字给消费者,消费者接收到消息后,计算这个数字的阶乘结果,返回给生产者。 生产者程序的主要代码如下:
- //创建RPC发送消息的Direct Exchange,消息队列和绑定关系。
- channel.exchangeDeclare("rpcSendExchange", "direct",true);
- channel.queueDeclare("rpcSendQueue", true, false, false, null);
- channel.queueBind("rpcSendQueue", "rpcSendExchange", "rpcSendMessage");
- //建立RPC返回消息的Direct Exchange, 消息队列和绑定关系
- channel.exchangeDeclare("rpcReplyExchange", "direct",true);
- channel.queueDeclare("rpcReplyQueue", true, false, false, null);
- channel.queueBind("rpcReplyQueue", "rpcReplyExchange", "rpcReplyMessage");
- //创建接收RPC返回消息的消费者,并将它与RPC返回消息队列相关联。
- QueueingConsumer replyCustomer = new QueueingConsumer(channel);
- channel.basicConsume("rpcReplyQueue", true,replyCustomer);
- String number = "10";
- //生成RPC请求消息的CorrelationId
- String correlationId = UUID.randomUUID().toString();
- //在RabbitMQ消息的Properties中设置RPC请求消息的CorrelationId以及
- //ReplyTo名称(我们这里使用的是Exchange名称,
- //而不是消息队列名称)
- BasicProperties props = new BasicProperties
- .Builder()
- .correlationId(correlationId)
- .replyTo("rpcReplyExchange")
- .build();
- System.out.println("The send message's correlation id is:" + correlationId);
- channel.basicPublish("rpcSendExchange", "rpcSendMessage", props, number.getBytes());
- String response = null;
- while(true)
- {
- //从返回消息中取一条消息
- Delivery delivery = replyCustomer.nextDelivery();
- //如果消息的CorrelationId与发送消息的CorrleationId一致,表示这条消息是
- //发送消息对应的返回消息,是阶乘运算的计算结果。
- System.out.println("The received reply message's correlation id is:" + messageCorrelationId);
- String messageCorrelationId = delivery.getProperties().getCorrelationId();
- if (!Strings.isNullOrEmpty(messageCorrelationId) && messageCorrelationId.equals(correlationId))
- {
- response = new String(delivery.getBody());
- break;
- }
- }
- //输出阶乘运算结果
- if(!Strings.isNullOrEmpty(response))
- {
- System.out.println("Factorial(" + number + ") = " + response);
- }
消费者程序的主要代码如下:
- Consumer consumer = new DefaultConsumer(channel)
- {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
- {
- //获取返回消息发送到的Exchange名称
- String replyExchange = properties.getReplyTo();
- //设置返回消息的Properties,附带发送消息的CorrelationId.
- AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
- .correlationId(properties.getCorrelationId())
- .build();
- String message = new String(body,"UTF-8");
- System.out.println("The received message is:" + message);
- System.out.println("The received message's correlation id is:" + properties.getCorrelationId());
- //计算阶乘,factorial方法是计算阶乘的方法。
- int number = Integer.parseInt(message);
- String response = factorial(number);
- //将阶乘消息发送到Reply Exchange
- this.getChannel().basicPublish(replyExchange, "rpcReplyMessage",replyProps, response.getBytes());
- }
- };
- channel.basicConsume("rpcSendQueue", true, consumer);
先运行生产者程序,发送请求消息到Send Exchange,然后等待消费者发送的返回消息。
再启动消费者程序,计算阶乘并返回结果给Reply Exchange。 两个程序的控制台信息如下图所示
生产者程序控制台
消费者程序控制台
从控制台信息可以看出生产者端根据返回消息中包含的Correlation Id判断出这是发送消息对应的返回消息,获取了阶乘的计算结果。
这个例子只是简单的生产者和消费者之间的方法调用,实际使用时,我们可以基于这个实例,实现更为复杂的操作。
RabbitMQ Client的重连机制
RabbitMQ Java Client提供了重连机制,不过在RabbitMQ Java Client 4.0版本之前,自动重连默认是关闭的。从Rabbit Client 4.0版本开始,自动重连默认是打开的。控制自动重连的属性是com.rabbitmq.client.ConnectionFactory类的automaticRecovery和topologyRecovery属性。
设置automaticRecovery属性为true时,会执行以下recovery:
1)Connection的重连。
2)侦听Connection的Listener的恢复。
3)重新建立在Connection基础上的Channel。
4)侦听Channel的Listener的恢复。
5)Channel上的设置,如basicQos,publisher confirm以及事务属性等的恢复。
当设置topologyRecovery属性为true时,会执行以下recovery:
1)exchange的重新定义(不包含预定义的exchange)
2)queue的重新定义(不包含预定义的queue)
3)binding的重新定义(不包含预定义的binding)
4)所有Consumer的恢复
我们定义一个带auto recovery的消费者程序,我们使用RabbitMQ Java Client 4.0.0版本,这个版本引入了AutorecoveringConnection和
AutorecoveringChannel类,可以添加RecoveryListener对Recovery过程进行监控。
- public class RecoveryConsumerApp
- {
- public static void main( String[] args ) throws IOException, TimeoutException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- ...................
- AutorecoveringConnection connection = (AutorecoveringConnection)connectionFactory.newConnection();
- String originalLocalAddress =
- connection.getLocalAddress() + ":" + connection.getLocalPort();
- System.out.println("The origin connection's local address is:" + originalLocalAddress);
- AutorecoveringChannel channel = (AutorecoveringChannel)connection.createChannel();
- System.out.println("The origin channel's channel number is:" + channel.getChannelNumber());
- channel.exchangeDeclare("recoveryExchange", BuiltinExchangeType.DIRECT, false, true ,null);
- channel.queueDeclare("recoveryQueue", false, false, true,null);
- channel.queueBind("recoveryQueue", "recoveryExchange", "recoveryMessage");
- connection.addRecoveryListener(new RecoveryListener() {
- public void handleRecovery(Recoverable recoverable) {
- System.out.println("Connection handleRecovery method is called");
- AutorecoveringConnection recoveredConnection =
- (AutorecoveringConnection)recoverable;
- String recoveredLocalAddress =
- recoveredConnection.getLocalAddress() + ":" + recoveredConnection.getLocalPort();
- System.out.println("The recovered connection's local address is:" + recoveredLocalAddress);
- }
- public void handleRecoveryStarted(Recoverable recoverable) {
- System.out.println("Connection handleRecoveryStarted method is called");
- }
- });
- channel.addRecoveryListener(new RecoveryListener() {
- public void handleRecovery(Recoverable recoverable) {
- System.out.println("Channel handleRecovery method is called");
- AutorecoveringChannel recoveryChannel =
- (AutorecoveringChannel)recoverable;
- System.out.println("The recovered Channel's number is:" + recoveryChannel.getChannelNumber());
- }
- public void handleRecoveryStarted(Recoverable recoverable) {
- System.out.println("Channel handleRecoveryStarted method is called");
- }
- });
- }
- }
这个程序中Exchange, Queue都是非持久化并且自动删除的。 我们为Connection和Channel分别添加了Recovery Listener匿名对象,
便于确认他们确实进行了Recovery操作。
启动程序后,我们可以看到recoveryExchange和recoveryQueue都被创建出来,且Binding关系建立了。
连接的本地地址是0.0.0.0:8109,Channel编号是1
此时我们关闭RabbitMQ服务器,再重启RabbitMQ服务器,我们可以从控制台界面看到有连接超时的警告信
息以及重连信息。
从重连日志信息中我们可以看出Channel的编号还是1,但是Connection的本地地址已经变成了0.0.0.0:8470,证明进行了重连。
连接到recoveryQueue队列上的Consumer Tag也进行了恢复,而且Consumer Tag与之前的Consumer Tag一致,这是因为设置了
topologyRecovery属性为true。
我们再在生产者程序中使用重连机制,依然使用Rabbit Java Client 4.0版本 生产者程序的片段如下:
- <span style="font-size: 17.5px;"> </span>factory.setAutomaticRecoveryEnabled(true);
- factory.setNetworkRecoveryInterval(60000);
- factory.setTopologyRecoveryEnabled(true);
- AutorecoveringConnection connection = (AutorecoveringConnection)factory.newConnection();
- AutorecoveringChannel channel = (AutorecoveringChannel)connection.createChannel();
- //设置Channel为Publish Confirm模式
- channel.confirmSelect(); <span style="font-size: 17.5px;"> </span>
登录管理界面,我们可以看到生产者建立的Channel是Confirm模式(图中Mode列用C表示)
我们关掉RabbitMQ服务器,再重启RabbitMQ服务器,可以看到生产者Channel被恢复,但是本地端口号已经从13684变成了13874,
说明这是重新创建的Channel,创建的Channel仍然是Confirm模式,和最初的Channel一致。
如果我们设置Channel为Transaction模式(调用Channel.txSelect()方法),重连后恢复的Channel的模式也仍然是Transaction模式。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
(1)exchange持久化,在声明时指定durable => 1
(2)queue持久化,在声明时指定durable => 1
(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
(二、基本概念介绍)
AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路层协议,而不是API(例如JMS),AMQP 客户端能够无视消息的来源任意发送和接受信息。AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件 (MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一 部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由。
AMQP当中有四个概念非常重要
-
virtual host
,虚拟主机 -
exchange
,交换机 -
queue
,队列 -
binding
,绑定
一个虚拟主机持有一组交换机、队列和绑定。
为什么需要多个虚拟主机呢?因为RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机/
。
何谓虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)
队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止。不过,也可以将一个队列配置成这样的:一旦消息进入这个队列,此消息就被删除。
队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具。这没什么问题,如果一个消费者试图创建一个已经存在的队列,RabbitMQ会直接忽略这个请求。因此我们可以将消息队列的配置写在应用程序的代码里面。
而要把一个消息放进队列前,需要有一个交换机(Exchange)。
交换机(Exchange)可以理解成具有路由表的路由程序。每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes)。(例如,指明具有路由键 “X” 的消息要到名为timbuku的队列当中去。)
消费者程序(Consumer)要负责创建你的交换机。交换机可以存在多个,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,可以创建5个交换机来用5个核,另外3个核留下来做消息处理。类似的,在RabbitMQ的集群当中,你可以用类似的思路来扩展交换机一边获取更高的吞吐量。
交换机如何判断要把消息送到哪个队列?你需要路由规则,即绑定(binding)。一个绑定就是一个类似这样的规则:将交换机“desert(沙漠)”当中具有路由键“阿里巴巴”的消息送到队列“hideout(山洞)”里面去。换句话说,一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。例如,具有路由键“audit”的消息需要被送到两个队列,“log-forever”和“alert-the-big-dude”。要做到这个,就需要创建两个绑定,每个都连接一个交换机和一个队列,两者都是由“audit”路由键触发。在这种情况下,交换机会复制一份消息并且把它们分别发送到两个队列当中。交换机不过就是一个由绑定构成的路由表。
交换机有多种类型。他们都是做路由的,但是它们接受不同类型的绑定。为什么不创建一种交换机来处理所有类型的路由规则呢?因为每种规则用来做匹配分子的CPU开销是不同的。例如,一个“topic”类型的交换机试图将消息的路由键与类似“dogs.*”的模式进行匹配。匹配这种末端的通配符比直接将路由键与“dogs”比较(“direct”类型的交换机)要消耗更多的CPU。如果你不需要“topic”类型的交换机带来的灵活性,你可以通过使用“direct”类型的交换机获取更高的处理效率。那么有哪些类型,他们又是怎么处理的呢?
Exchange
-
Exchange Direct
Exchange Fanout
Exchange Topic
持久化
你花了大量的时间来创建队列、交换机和绑定,然后,服务器程序挂了。你的队列、交换机和绑定怎么样了?还有,放在队列里面但是尚未处理的消息们呢?
如果你是用默认参数构造的这一切的话,那么,他们都灰飞烟灭了。RabbitMQ重启之后会干净的像个新生儿。你必须重做所有的一切,亡羊补牢,如何避免将来再度发生此类杯具?
队列和交换机有一个创建时候指定的标志durable。durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。那么如何才能做到不只是队列和交换机,还有消息都是持久的呢?
但是首先需要考虑的问题是:是否真的需要消息的持久化?如果需要重启后消息可以回复,那么它需要被写入磁盘。但即使是最简单的磁盘操作也是要消耗时间的。所以需要衡量判断。
当你将消息发布到交换机的时候,可以指定一个标志“Delivery Mode”(投递模式)。根据你使用的AMQP的库不同,指定这个标志的方法可能不太一样。简单的说,就是将Delivery Mode设置成2,也就是持久的(persistent)即可。一般的AMQP库都是将Delivery Mode设置成1,也就是非持久的。所以要持久化消息的步骤如下:
- 将交换机设成 durable。
- 将队列设成 durable。
- 将消息的 Delivery Mode 设置成2 。
绑定(Bindings)怎么办?绑定无法在创建的时候设置成durable。没问题,如果你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是durable),依赖它的绑定都会自动删除。
注意:
- RabbitMQ 不允许你绑定一个非坚固(non-durable)的交换机和一个durable的队列。反之亦然。要想成功必须队列和交换机都是durable的。
- 一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。因此,最好仔细检查创建的标志。