分布式消息通信之RabbitMQ Tutorials

时间:2023-03-08 22:00:52

官网

RabbitMQ Tutorials & 1 Hello World! | 2 Work queues | 3 Publish/Subscribe | 4 Routing | 5 Topics | 6 RPC

1 Hello World!

 RabbitMQ所需依赖

 <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>

1.1 生产者demo producer

package com.smallShen.distributed.rmq.cpt01HelloWorld;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "HelloWorld RMQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest"); try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 0; i < 1000; i++) { String msg = "Hello World, your RabbitMQ! " + i;
// String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.deliveryMode(2)// 2持久化
.build();
channel.basicPublish("", QUEUE_NAME, false, false, props, msg.getBytes()); System.out.println(String.format("Producer Sent msg[%s] at %s", msg, new Date()));
} } catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} } }

1.2 消费者demo consumer

package com.smallShen.distributed.rmq.cpt01HelloWorld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException; public class Receive { private static final String QUEUE_NAME = "HelloWorld RMQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest"); try { Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Waiting for msg, To exit press Crtl+C"); DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String consumerTag, Delivery delivery) throws IOException {
String msg = new String(delivery.getBody(), "UTF-8");
System.out.println(msg);
System.out.println(String.format("Receive msg[%s] at %s", msg, new Date()));
}
};
// String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback
channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("Receive cancel!");
}
}); } catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} } }

1.3 查看queue队列中的信息

页面查看,可看到有4条消息

http://127.0.0.1:15672, 用户名密码默认为guest;打开页面客户端分布式消息通信之RabbitMQ Tutorials

命令查看

 Linux版本命令:sudo rabbitmqctl list_queues

 Window版命令:rabbitmqctl.bat list_queues,如下可看到queue name为hello的message有4条;

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>rabbitmqctl.bat list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 4 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>

2 Work queues

2.1 消费者设置消息回执

autoAck=false,执行任务需要花费一些时间,如果消费者客户端在处理一个需要较长时间完成的消息时,中途程序中断了。当前默认配置下,RabbitMQ在将消息分发给一个消费者后,会直接将消息删除。在这种情况下,不仅仅是当前还没处理完的这条消息,已经分发给当前消费者的其余消息也会随之丢失。但我们并不想要丢失消息,为了避免这种情况,我们可以设置手工回执。当将autoAck设置为false后,RabbitMQ服务端在将消息分发给消费者后,会将消息标记为Unacked状态,这在UI页面也可以看得到。如果没有收到回执,且消费者连接断开没有心跳的情况下,RabbitMQ会将消息回收重新发送给新的消费者。

 开启手工回执却忘记回执**是一种常见的错误,不过造成的后果确很严重。当生产者一直向队列中发送消息,消费者虽然分发到了也消费了,但并没有发送回执,相当于这部分消息还是一直在RabbitMQ内存之中,不被释放。可以使用UI管理界面或者自带命令来查看Unack的消息:

Linux : sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Window : rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged e.g
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
HelloWorld RMQ 0 662 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>

 code:

             DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String consumerTag, Delivery delivery) throws IOException {
String msg = new String(delivery.getBody(), "UTF-8"); System.out.println(String.format("Receive msg[%s] at %s", msg, new Date()));
try {
doWorker(msg);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(String.format("Receive msg[%s] and finish at %s", msg, new Date()));
// 当关闭了自动回执,必须添加手动回执
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
} }
};
// String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback
// 关闭自动回执,启用手工回执
Boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("Receive cancel!");
}
});

2.2 队列持久化及消息持久化

durable = true,当RabbitMQ服务器宕机,如果没有设置队列及消息的持久化,重启之后消息就会丢失;

// 设置队列持久化,RabbitMQ宕机或重启队列不会丢失
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null); //消息持久化
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // or // String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
// 2 消息持久化,RabbitMQ服务宕机或者重庆,消息不会删除
// 可满足一般情况,但并不能保证100%数据不丢失,因为假如生产者将消息推送至队列Queue,放入RabbitMQ内存中,还没有进行磁盘保存时服务宕机,内存中的这部分数据就丢失了
.deliveryMode(2)
.build(); channel.basicPublish("", QUEUE_NAME, false, false, props, msg.getBytes());

2.3 多消费者同时消费,公平分发资源合理利用

 当多消费者客户端消费同一队列中的消息时,默认会按消息数量均衡分配,比如队列中1000条消息,有A、B、C、D4个消费者,RabbitMQ会为每个消费者分配250条消息;

 但是当1000条消息中有些消息的处理逻辑比较复杂,耗时较大,就可能造成收到该类型消息的消费者A处理250条消息需要花费很长时间,而此时B、C、D消费者已经早早处理完他们自己的消息,闲置了N久;

 针对这种情况可以在消费者客户端中使用basicQos 设置prefetchCount = 1 ,告诉RabbitMQ每次只给消费者1条消息进行处理,换句话说,如果前一条消息的回执ack没有收到,不再给这个消费者推送消息;

code

            // 告诉RabbitMQ每次只给消费者1条消息进行处理,换句话说,如果前一条消息的回执ack没有收到,不再给这个消费者推送消息;
// 如果不配置此参数,队列中的消息会均衡的分配给所有消息者,不论消费者的处理速度快慢,每个消费者接收到的消息数量是大致一致的
// 比如 100条消息, 消费者A,B,C每个接收33-34条,但A处理消息效率很慢,B,C很快,就会有B,C很快就将消息处理完毕开始闲置,而A可能还需要很久才能处理完毕;
// 此时配置 prefetchCount 来提高利用率
int prefetchCount = 1;
channel.basicQos(prefetchCount);

2.4 完整的生产者消费者代码

 生产者:

package com.smallShen.distributed.rmq.cpt02WorkQueue;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException; public class NewTask { private static final String QUEUE_NAME = "Work Queue RMQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest"); try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
// 队列持久化,RabbitMQ服务宕机或者重庆,队列不会删除
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null); for (int i = 0; i < 100; i++) { String msg = args != null && args.length > 0 ? String.join(" ", args) + i : "Hello World, your RabbitMQ! " + i;
// String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
// 2 消息持久化,RabbitMQ服务宕机或者重庆,消息不会删除
// 可满足一般情况,但并不能保证100%数据不丢失,因为假如生产者将消息推送至队列Queue,放入RabbitMQ内存中,还没有进行磁盘保存时服务宕机,内存中的这部分数据就丢失了
.deliveryMode(2)
.build(); channel.basicPublish("", QUEUE_NAME, false, false, props, msg.getBytes());
//channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); System.out.println(String.format("Producer send msg[%s] at %s", msg, new Date()));
} } catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} } }

 消费者

package com.smallShen.distributed.rmq.cpt02WorkQueue;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; public class Receive { private static final String QUEUE_NAME = "Work Queue RMQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest"); try { Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
System.out.println("Waiting for msg, To exit press Crtl+C"); // 告诉RabbitMQ每次只给消费者1条消息进行处理,换句话说,如果前一条消息的回执ack没有收到,不再给这个消费者推送消息;
// 如果不配置此参数,队列中的消息会均衡的分配给所有消息者,不论消费者的处理速度快慢,每个消费者接收到的消息数量是大致一致的
// 比如 100条消息, 消费者A,B,C每个接收33-34条,但A处理消息效率很慢,B,C很快,就会有B,C很快就将消息处理完毕开始闲置,而A可能还需要很久才能处理完毕;
// 此时配置 prefetchCount 来提高利用率
int prefetchCount = 1;
channel.basicQos(prefetchCount); DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String consumerTag, Delivery delivery) throws IOException {
String msg = new String(delivery.getBody(), "UTF-8"); System.out.println(String.format("Receive msg[%s] at %s", msg, new Date()));
try {
doWorker(msg);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(String.format("Receive msg[%s] and finish at %s", msg, new Date()));
// 当关闭了自动回执,必须添加手动回执
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
} }
};
// String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback
// 关闭自动回执,启用手工回执
Boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("Receive cancel!");
}
}); } catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} } private static void doWorker(String msg) throws InterruptedException { for (char c : msg.toCharArray()) {
if (c == '.') {
TimeUnit.SECONDS.sleep(1);
}
} } }

3 Publish/Subscribe

3.1交换机

 在RabbitMQ中,生产者从来不会直接将消息发送至队列Queue中,实际上,生产者连消息该发给那个队列都不知道,生产者只是将消息发送给交换机EXCHANGE

Exchange交换机有两个作用,一是接收生产者发送的消息,二是将消息放入队列Queue中。交换机必须清楚接收到的消息该如何处理,是放到指定的队列 或者放到多个队列 或是 直接丢弃。

 交换机有如下四种类型:

public enum BuiltinExchangeType {

    DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
...
direct : 直连类型的交换机,完全匹配队列名称,然后将消息放入该队列
fanout : 广播类型的交换机,发布/订阅,1条消息同时发布给绑定到该交换机的多个队列
topic : 正则模式匹配的交换机,类似正则的规则模糊匹配队列,然后将消息放入这些匹配到的队列
headers : 很少使用

 可以通过UI页面或者sudo rabbitmqctl list_exchanges命令来查看有哪些交换机

3.2 临时队列

 在创建队列的时候,我们可以指定队列名称,也可以通过String queueName = channel.queueDeclare().getQueue(); 方式来创建,这样创建的队列由RabbitMQ随机指定名称,且是一种 非持久化,排他,自动删除的 临时队列。

  String queueName = channel.queueDeclare().getQueue();

   临时队列随机名称类似:  amq.gen-JzTY20BRgKO-HjmUJj0wLg

3.3广播Fanont类型交换机使用

fanont交换机模型

发布Pub端

package com.smallShen.distributed.rmq.cpt03PubSub;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException; public class ProducerPub { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest"); try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); for (int i = 0; i < 10; i++) {
String msg = "Producer publish msg " + i;
// fanout类型的交换机,不需要指定路由关键字 routingKey
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println(String.format("Producer send msg[%s] success at %s", msg, new Date()));
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} } }

订阅Sub端

package com.smallShen.distributed.rmq.cpt03PubSub;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException; public class ConsumerSub { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest"); try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 声明队列时候不适用参数,表示RabbitMQ创建一个默认 非持久化, 排他, 自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// 广播fanout类型的交换机,不需要指定路由关键字routingKey
channel.queueBind(queueName, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println(String.format("Consumer accept msg[%s] at %s", new String(delivery.getBody()), new Date())); }
}; //(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} } }

4 Routing

  之前一章使用的是fanout类型的交换机,生产者Producer将消息推送至交换机Exchange,这种模式不需要指定路由关键字Routing Key,消息将推送给所有和该交换机绑定的队列Queue中去,被消息者消费;

  本章使用direct类型的交换机,消息推送至交换机Exchange之后,可以根据指定的不同路由关键字routing key来匹配指定的队列,而不是所有绑定了该交换机的队列,然后被队列绑定的消费者消费。

4.1 路由关键字 Routing, 绑定关键字 Binding

 生产者发布消息,将消息推送至交换机,需要指定路由关键字routingKey

  // 生产者
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes("UTF-8"));

 消费者在接收消息时,是从队列中读取,队列中的消息是由交换机分发过来的,队列从交换机接收哪些类型的消息 也需要通过指明关键字routingKey来绑定

  // 消费者
channel.queueBind(defaultQueueName, EXCHANGE_NAME, routingKey);

 当使用direct直连类型的交换机时,消息发送的路由关键字必须和消息接收的绑定关键字一致。

 一个交换机可以绑定多个队列,交换机在绑定每个队列时也可以通过多个关键字进行绑定,只要一种关键字匹配上,交换机就会将消息分发到该队列。

直连交换机

4.2 直连类型交换机demo

生产者demo

package com.smallShen.distributed.rmq.cpt04Routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Producer { private static final String EXCHANGE_NAME = "MY_DIRECT_EXCHANGE"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest"); try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String routingKey = "log"; // "error", "warn", "info" for (int i = 0; i < 3; i++) {
String msg = String.format("This is a msg, type[%s], send to exchange[%s]", routingKey, EXCHANGE_NAME) + " " + i;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes("UTF-8"));
System.out.println(String.format("Producer send msg[%s] success", msg));
} } catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} } }

消费者demo

 第一个消费者,绑定info | warn | log三种关键字,三种类型的消息都能接收到

package com.smallShen.distributed.rmq.cpt04Routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException; public class ConsumerCommon { private static final String EXCHANGE_NAME = "MY_DIRECT_EXCHANGE"; public static void main(String[] args) { String[] routingKeys = new String[3];
routingKeys[0] = "log";
routingKeys[1] = "info";
routingKeys[2] = "warn"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest"); try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String defaultQueueName = channel.queueDeclare().getQueue(); for (String routingKey : routingKeys) {
// 一个队列绑定多个路由关键字
System.out.println(String.format("Queue[%s] is binging exchange[%s] with key[%s]", defaultQueueName, EXCHANGE_NAME, routingKey));
channel.queueBind(defaultQueueName, EXCHANGE_NAME, routingKey);
} DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println(String.format("Consumer bind key[%s] accept msg[%s] at %s", String.join("|", routingKeys),
new String(delivery.getBody(), "UTF-8"), new Date())); }
}; channel.basicConsume(defaultQueueName, true, deliverCallback, consumerTag -> {}); } catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} } }

 第二个消费者,绑定了error类型的关键字,只要发送消息时指定路由关键字是error的才能接收到

package com.smallShen.distributed.rmq.cpt04Routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException; public class ConsumerError { private static final String EXCHANGE_NAME = "MY_DIRECT_EXCHANGE"; public static void main(String[] args) { String[] params = new String[1];
params[0] = "error"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest"); try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String defaultQueueName = channel.queueDeclare().getQueue(); for (String routingKey : params) {
// 一个队列绑定多个路由关键字
System.out.println(String.format("Queue[%s] is binging exchange[%s] with key[%s]", defaultQueueName, EXCHANGE_NAME, routingKey));
channel.queueBind(defaultQueueName, EXCHANGE_NAME, routingKey);
} DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println(String.format("Consumer bind key[%s] accept msg[%s] at %s", String.join("|", params),
new String(delivery.getBody(), "UTF-8"), new Date())); }
}; channel.basicConsume(defaultQueueName, true, deliverCallback, consumerTag -> {}); } catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} } }

5 Topics

5.1 Topic Exchange

  第4章提到了Direct Exchange直连类型的交换机,必须生产者routingKey和消费者bindingKey完全匹配才能互相通信,发送消息;Topic Exchange正则类型的交换机可以提供更多的选择性,使用类似正则的方式匹配路由关键字routingKey和绑定关键字bindingKey,正则模糊匹配到后即可通信;

  *  匹配一个单词
# 匹配0到多个单词
. 单词之间使用.分割

Topic Exchange

 如上图, 队列Q1绑定 *.orange.**,队列Q2绑定*.*.rabbitlazy.#:

当生产者发送消息指定路由关键字是quick.orange.rabbit时,Q1,Q2都可匹配;

当生产者发送消息指定路由关键字是lazy.orange.elephant时,Q1,Q2都可匹配;

当生产者发送消息指定路由关键字是quick.orange.fox时,Q1可匹配;

当生产者发送消息指定路由关键字是lazy.brown.fox时,Q2可匹配;

当生产者发送消息指定路由关键字是lazy.brown.fox时,Q2可匹配;

当生产者发送消息指定路由关键字是lazy.pink.rabbit时,Q2都可匹配,且只匹配1次;

当是quick.orange.male.rabbit时,没有匹配队列,消息丢失。

5.2 Topic Exchange Demo

生产者 demo

package com.smallShen.distributed.rmq.cpt05Topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;
/**
* 发送消息 :
* 速度 + 颜色 + 动物种类
* 格式 : "<speed>.<colour>.<species>"
* 队列1 接收 :
* 红色的 (*.red.* )
* 队列2 接收 :
* 兔子类型的消息 (*.*.rabbit)
* 速度比较慢的消息 (lazy.#)
*/
public class ProducerSend { private static final String EXCHANGE_NAME = "TOPIC_EXCHANGE"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setUsername("guest"); try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 路由关键字
// String routingKey = "lazy.red.rabbit"; // 两个队列都能收到
// String routingKey = "lazy.RED.rabbit"; // 只有队列2收到
// String routingKey = "lazy.yellow.tiger.run"; // 队列2收到
String routingKey = "fast.yellow.tiger.run"; // 都收不到,丢弃
String msg = String.format("This is a %s, la la la", routingKey);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println(String.format("Producer send msg[%s] to exchange[%s] with routingKey[%s] at %s", msg, EXCHANGE_NAME, routingKey, new Date())); } catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} } }

消费者 demo

  队列1

package com.smallShen.distributed.rmq.cpt05Topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException; /**
* 发送消息 :
* 速度 + 颜色 + 动物种类
* 格式 : "<speed>.<colour>.<species>"
* 队列1 接收 :
* 红色的 (*.red.* )
* 队列2 接收 :
* 兔子类型的消息 (*.*.rabbit)
* 速度比较慢的消息 (lazy.#)
*/
public class ConsumerAccept01 { private static final String EXCHANGE_NAME = "TOPIC_EXCHANGE"; public static void main(String[] args) { String[] bindingKeys = new String[1];
bindingKeys[0] = "*.red.*"; ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setUsername("guest"); try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 使用默认的队列
String defaultQueueName = channel.queueDeclare().getQueue(); for (String bindingKey : bindingKeys) {
channel.queueBind(defaultQueueName, EXCHANGE_NAME, bindingKey);
System.out.println(String.format("Queue[%s] bind exchange[%s] with bindingKey[%s]", defaultQueueName, EXCHANGE_NAME, bindingKey));
} DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println(String.format("Consumer accept msg[%s] from queue[%s] at %s",
new String(delivery.getBody(),"UTF-8"), defaultQueueName, new Date()));
}
}; channel.basicConsume(defaultQueueName, true, deliverCallback, consumerTag -> {}); } catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} } }

 队列2

package com.smallShen.distributed.rmq.cpt05Topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException; /**
* 发送消息 :
* 速度 + 颜色 + 动物种类
* 格式 : "<speed>.<colour>.<species>"
* 队列1 接收 :
* 红色的 (*.red.* )
* 队列2 接收 :
* 兔子类型的消息 (*.*.rabbit)
* 速度比较慢的消息 (lazy.#)
*/
public class ConsumerAccept02 { private static final String EXCHANGE_NAME = "TOPIC_EXCHANGE"; public static void main(String[] args) { String[] bindingKeys = new String[2];
bindingKeys[0] = "*.*.rabbit";
bindingKeys[1] = "lazy.#"; ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setUsername("guest"); try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 使用默认的队列
String defaultQueueName = channel.queueDeclare().getQueue(); for (String bindingKey : bindingKeys) {
channel.queueBind(defaultQueueName, EXCHANGE_NAME, bindingKey);
System.out.println(String.format("Queue[%s] bind exchange[%s] with bindingKey[%s]", defaultQueueName, EXCHANGE_NAME, bindingKey));
} DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println(String.format("Consumer accept msg[%s] from queue[%s] at %s",
new String(delivery.getBody(),"UTF-8"), defaultQueueName, new Date()));
}
}; channel.basicConsume(defaultQueueName, true, deliverCallback, consumerTag -> {}); } catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} } }

6 RPC

 RabbitMQ实现RPC思路:建立两个队列Queue,通过第一个队列RPC客户端将消息发送至RPC服务器端,服务器端执行逻辑完毕将结果返回至第二个队列,供RPC客户端消费;其中RPC客户端每次发送请求,需要把消息内容,返回队列及信息唯一标识ID一起发送出去,RPC服务端处理完毕也根据这些发送来的额外信息将消息分发至指定队列;

Rabbit RPC

6.1 RPC Client

package com.smallShen.distributed.rmq.cpt06RPC;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException; public class RPCClient implements AutoCloseable { private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_sample_queue"; public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); connection = factory.newConnection();
channel = connection.createChannel();
} public static void main(String[] argv) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
} public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
}); String result = response.take();
channel.basicCancel(ctag);
return result;
} public void close() throws IOException {
connection.close();
}
}

6.2 RPC Server

package com.smallShen.distributed.rmq.cpt06RPC;

import com.rabbitmq.client.*;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_sample_queue";

    private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
} public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> { System.out.println(delivery.getProperties().getCorrelationId());
System.out.println(delivery.getProperties().getReplyTo()); AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build(); String response = ""; try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
}; channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}