四.RabbitMQ之发布/订阅(Publish/Subscribe)

时间:2021-08-23 17:21:19

  一.基础知识点

  在上述章节中,我们理解的RabbitMQ是基于如下这种模式运作的。

  四.RabbitMQ之发布/订阅(Publish/Subscribe)

  而事实上,这只是我们简单化了的模型的结果,真正的模型应该是这样的。

  四.RabbitMQ之发布/订阅(Publish/Subscribe)

  P:Producer 生产者,生产消息,把它放进交换机

  X:Exchange 交换机,可以理解为存在于生产者和队列之间的一个桥梁。或者你可以将它理解为队列的一个父级,或者更形象的,你就把它理解为像局域网中的交换机,把队列理解为主机,它有direct, topic, headers 和fanout这几种类型,后面会做介绍。

  orange这些叫做binding,它是Exchange与Queue之间的纽带。对某些类型的Exchange它是无效的(比如fanout),我的理解它其实是将队列进一步的分类。如上图所示,orange被分为一类,而black和green被分为了另外一类。

  Q:Queue 队列,如果你想要在P和C之间共享指定队列的消息,你就得给队列指定显式的名称。它是直接与C(consumer)连接的。

  C:消费者,从队列中订阅消息,并且处理。

  二.fanout类别的Exchange

  下面通过一个日志消息的demo来介绍Exchange的用法。

  记得发布消息的方法吗?channel.basicPublish("", "hello", null, message.getBytes());第一个参数其实就是Exchange,只是当时我们并未注意它,它其实是一个无名字的默认的Exchange。既然是无名字,就无法通过它来区分Queue。所以在那些例子里,我们给Queue指定了名字以供消费者来寻找。(可以用数据库查询的思路去理解,比如可将Exchange理解为省份,Binding理解为城市,queue理解为地区。我们可以从指定省份,指定城市的集合里面去查询地区,相当于给Exchange命名,指定Binding的值,当然也可以直接指定一个确定的地区,指定queue的名字)。

  现在我们要做的日志消息的demo要求如下:

  1.我们希望接收所有的消息。

  2.我们对消息的一致性要求不高,只需要接收那些新近的消息,如果接收不到,就丢弃这个消息。

  所以我们需要使用fanout类型的Exchange,fanout类型的Exchange的特点是,它广播所有的消息给与它关联的所有队列,不加任何区分。所以这时候指定binding也没什么意义。当然,queue也无需指定名称。

  铺垫已久,现在来着手编写生产者类,如下。

  

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import net.sf.json.JSONObject; import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class EmitLog {
private final static String EXCHANGE_NAME="logs";//交换机名称为log public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.1.195");//服务器ip
factory.setPort(5672);//端口
factory.setUsername("xdx");//登录名
factory.setPassword("xxxxx");//密码
Connection connection=factory.newConnection();//建立连接
Channel channel=connection.createChannel();//建立频道
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//在频道里声明一个交换机,类型定位fanout
System.out.println(channel+"发布100条日志消息");
for(int i=0;i<100;i++){
JSONObject jsonObjet=new JSONObject();
jsonObjet.put("msgType", "log");//该消息是针对发送验证邮件的。
jsonObjet.put("content", "日志消息"+i);
String message=jsonObjet.toString();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());//发布消息,发布到EXCHANGE_NAME,此时它会到哪个queue里面是不确定的
System.out.println(jsonObjet.get("content"));
}
channel.close();
connection.close();
}
}

  我们运行这个生产者类,由于此时还没有queue,所以到RabbitMQ的控制台去查看,此时并没有queue,但是已经有了一个EXCHANGE。如图所示。

  四.RabbitMQ之发布/订阅(Publish/Subscribe)

  接下来编写消费者的类,如下所示。

  

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope; public class ReceiveLogs {
private final static String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
// 下面的配置与生产者相对应
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.195");// 服务器ip
factory.setPort(5672);// 端口
factory.setUsername("xdx");// 登录名
factory.setPassword("xxxxx");// 密码
Connection connection = factory.newConnection();// 连接
final Channel channel = connection.createChannel();// 频道
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
final String queueName = channel.queueDeclare().getQueue();// 生成一个独立的,非持久的,自动删除的queue
channel.queueBind(queueName, EXCHANGE_NAME, "");// 绑定queue和exchange。这样队列中就有通过EXCHANGE_NAME发布的消息。
System.out.println(" messages from channel:"+ channel+",queue:"+ queueName
+ ". To exit press CTRL+C");
// defaultConsumer实现了Consumer,我们将使用它来缓存生产者发送过来储存在队列中的消息。当我们可以接收消息的时候,从中获取,可理解为被一个一直运行着的线程调用。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("channel:"+channel+",queue:"+queueName+",consumer:"+this.getConsumerTag()+" Received '" + message + "'");
// channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(300);
} catch (Exception e) {
}
}
};
channel.basicConsume(queueName, true, consumer);//自动回复,消息发出后队列自动消除
} }

  消费者的channel一样声明相同的EXCHANGE,然后final String queueName = channel.queueDeclare().getQueue();这句话生成一个独立的,非持久的,自动删除的queue。

  接下来运行消费者程序,控制台打印出:

  messages from channel:AMQChannel(amqp://xdx@192.168.1.195:5672/,1),queue:amq.gen-ZUNKY5IQG3GQG2Y8lZOjUA. To exit press CTRL+C
  可见已经生成了一个名为amq.gen-ZUNKY5IQG3GQG2Y8lZOjUA的queue,事实上,去RabbitMQ后台查看,确实看到了一个queue.他的特点是AD(auto delete)和EXCL(exclusive)。

  auto delete:当他没用的时候,服务就会删除掉它。

  exclusive:独占,说明这个queue仅仅被这个connection独占。

四.RabbitMQ之发布/订阅(Publish/Subscribe)

  但是并未处理之前我们生产者程序发出来的消息啊。这是为什么呢?

  这是因为当我们的生产者发出消息的时候,那时我们的消费者程序还未运行,所以还未建立queue队列,那些消息自然无处容身,所以它们被丢弃了。

  下面我们在保持生产者程序运行的情况下,也就是有了queue的情况下,再次运行生产者程序,再发送100条信息。然后我们就可以看到消费者程序开始在处理消息了。

四.RabbitMQ之发布/订阅(Publish/Subscribe)

  接下来,我们在启动另外一个消费者,即把刚才这个生产者程序在运行一个。在eclipse上你可以看到有两个程序正在运行。

  四.RabbitMQ之发布/订阅(Publish/Subscribe)

  同时,在RabbitMQ的控制台,看到又增多了一个queue.

  四.RabbitMQ之发布/订阅(Publish/Subscribe)

    接着,我们再次运行生产者,按照预期,应该是两个消费者程序同时接受并处理生产者发送过来的消息。

  果然,运行结果截图如下。

  消费者1:

四.RabbitMQ之发布/订阅(Publish/Subscribe)

  消费者2:

四.RabbitMQ之发布/订阅(Publish/Subscribe)

  我们发现消费者1和消费者2都接受到了生产者发出来的消息,并且进行处理了。

  三.总结

  1.生产者先创建一个connection,然后通过该connection创建一个channel,再在该channel中创建一个Exchange。生产者不再指定的queue名称,而只是将消息广播到特定routingKey的Exchange中。通过channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());这个方法。

  2.消费者也创建一个新的connection,然后通过该connection创建一个channel,再在该channel中声明一个Exchange与生产者那边对应(经测试,如果在生产者中已声明,且先运行了,则这个Exchange已经存在,这一步可以省去。但一般情况下不省略,因为有时候我们可能先运行的消费者程序,就有可能找不到,也就是说生产者和消费者如果都声明了同一个Exchange,则只生成了一个这样的Exchange),再从channel中拿出一个临时的queue,通过queueBind方法与生产者那边所定义的Exchange和RouteKey发生关联。关联之后,消费者就可以从queue中取到所有广播到了满足该Exchange&&RouteKey下的消息。

  3.生产者程序只负责广播,不负责把消息送到指定的queue中,这部分工作是消费者程序来做的。一旦没有消费者来接收消息,这些消息就被丢弃了。而消费者通过queueBind方法订阅特定Exchange,特定routeKey中的消息并处理。我想这就是这章叫做广播/订阅的原因吧。

  4.细心的读者可以看到当我们启动两个消费者程序的时候,这两个消费者都接收了生产者的消息,且都是一模一样的消息。这是因为,这两个消费者从两个queue中去取消息,而这两个queue都关联了相同的Exchange和RouteKey。所以接收到的是相同的消息,我们完全可以在这两个消费者中编写不同的处理代码,比如消费者1就进行显示消息的工作,而消费者2则进行存储消息的动作。注意与前面章节的那种模式相区分,在前面章节的例子中,我们在生产者中指定了一个命名了的queue,所以以后不管扩展了几个新的消费者,他们都是从同一个queue中去取消息,所以不会取得重复的消息。