参考:
documentation: https://www.rabbitmq.com/documentation.html
demo: https://www.rabbitmq.com/getstarted.html
Rabbitmq入门: http://www.jianshu.com/p/a5f7fce67803
AMQP协议详解:http://www.cnblogs.com/frankyou/p/5283539.html
一、RabbitMQ介绍及安装
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用,关键就是异步,解耦。
RabbitMQ采用Erlang语言开发。Erlang语言由Ericson设计,专门为开发concurrent和distribution系统的一种语言,在电信领域使用广泛。OTP(Open Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件/库/工具,如mnesia/SASL,极大方便了Erlang应用的开发。OTP就类似于Python语言中众多的module,用户借助这些module可以很方便的开发应用。
技术亮点:
1.可靠性以及性能之间的*度
2.灵活的路由
3.支持集群
4. 高可用的队列
5. 多协议
...
常见的中间件:
Qpid, ActiveMQ, Rabbitmq, Rocketmq, ZeroMQ, Kafka...
rabbitmq的安装:
1. 官网下载rabbitmq-server.rpm
2. 下载erlang.rpm
3. 安装即可,先安装erlang
rabbitmq-plugins enable rabbitmq_management:开启web管理插件
启动服务(CentOS7):systemctl restart rabbitmq-server.service
访问http://host:15672访问,需要登录名和密码,同时要有远程访问的权限,默认有一个账号guest,密码guest,2种方式:
第一种:创建一个新用户,命令示例如下:
rabbitmqctl add_user carl 111111
rabbitmqctl set_user_tags carl administrator
rabbitmqctl set_permissions -p / carl ".*" ".*" ".*"
第二种:创建/etc/rabbitmq/rabbitmq.config并配置
[{rabbit, [{loopback_users, []}]}].
以允许guest用户远程登录
二、AMQP协议
上图包含了AMQP中的所有组件。
Channel:
1.信道是建立在TCP连接内的虚拟连接,每条信道会被指派一个唯一的ID.
2. 信道是更轻量级的连接,TCP每次连接需要3次握手,4次断开,是昂贵的。假设应用程序从队列中消费消息,并且根据服务需求合理调度线程。那么每个线程都需要自行连接到Rabbit,每个线程一个TCP连接没有必要的,我们用一条TCP连接多个信道方式以满足性能方面的要求,又能确保每个线程的私密性。
3.在TCP连接上创建多少条信道是没有限制的。
举个例子:对于生产者-消费者模型,生产者中应用程序通过多个信道去和broker进行通信,其中chan_recv信道用于服务接收消息的线程,chan_sendX(X是线程号)信道用于服务于每一个应答线程。AMQP使用多个信道来满足应用程序的需求,而不会有众多TCP连接的开销。
Queue:
1. 队列是AMQP消息通信的基础模块
2. 对于负载均衡而言,队列是绝佳方案,只需要让队列循环的分配即可
3. 队列是消息的终点
消费通过和队列交换来获取消息,一般流程:
- 队列将消息分发给消费者
- 消费者接收消息
- 消费者向broker进行确认
- 队列中移除该消息
如果消费者应用程序发生异常,rabbitmq会认为消息没有分发,重新发送。程序员可以通过断开连接或者显示调用reject api进行明确 拒绝该消息。
Exchang和Binding:
Message->Queue可以多种分发策略,服务器会根据路由键(routing key)将消息从交换器入由到队列。协议中定义了不同类型的交换器,一共有4种类型:direct,fanout,topic和headers。每一种协议实现了不同的路由算法。
headers交换器允许你匹配AMQP消息的header而不是routing key,除此之外,headers交换器和direct交换器完全一致,但是headers交换器的性能会差很多!不太实用,甚至几乎没用,因此,我们关注其他三种类型的交换器。
(1) Direct 交换器:
Message中的“routing key”如果和Binding中的“binding key”一致, Direct exchange则将message发到对应的queue中。
(2) Fanout 交换器:
每个发到Fanout类型Exchange的message都会分到所有绑定的queue上去,类似于组播。
(3) Topic
根据routing key,及通配规则,Topic exchange将分发到目标queue中。
Routing key中可以包含两种通配符,类似于正则表达式:
“#”通配任何零个或多个word
“*”通配任何单个word
虚拟主机和隔离:
1.每一个rabbitmq服务器都能创建虚拟消息服务器,称之为虚拟主机(vhost)。
2.每一个虚拟vhost本质上都是一个mini版的rabbitmq服务器,拥有自己的队列、交换器和绑定...... 更重要的是:它拥有自己的权限机制并以此来隔离多个不同应用,你不用担心一个应用会删除另外一个应用的队列。
rabbitmq的vhost就像服务器上的虚拟主机一样(不管httpd,nginx,tomcat的虚拟主机),通过各个实例间提供逻辑分离。
vhost是AMQP概念的基础,你必须在连接时进行指定。由于rabbitmq包含了开箱急用的默认的vhost:"/",因此使用非常简单。如果你不需要多个vhost,就使用默认的。
AMQP有一个有趣的地方在于它没有指定权限控制应该在vhost级别还是在服务器端级别实现,而是留给开发者去决定。
当你在rabbit里创建一个用户时,用户通常会被指派至少一个vhost,并且只能访问被指派vhost的队列、交换器和绑定。当你在设计消息通信架构时,记住vhost之间是绝对隔离的。
vhost的控制需要通过管理命令rabbitmqctl来实现,直接使用rabbitmqctl -h查看帮助即可
add_vhost <vhost>
delete_vhost <vhost>
list_vhosts [<vhostinfoitem> ...]
持久化:
关于队列和交换器,默认情况下你无法避免于服务器重启,即服务器重启之后,队列和交换器都消失了。
原因在于每个队列和交换器的durable属性。该属性默认是false,你可以设置为true,这样你可以不需要再服务器重启之后重新创建队列和交换器,但是这样不能让消息幸免...
能从AMQP服务器崩溃中恢复的消息,我们称之为持久化消息。在消息发布前,你可以把它的"投递模式"(delivery mode)设置为2,消息此时是持久化的,同时必须发布到持久化的队列和交换器中,否则重启之后队列和交换器都不存在,这就是孤儿消息。
因此,你想要消息从Rabbit崩溃中恢复,需要:
- 投递模式设置为2
- 持久化的交换器
- 持久化的队列
RabbitMQ为了确保持久化消息能重启后恢复的,将其写入持久化日志文件。当你发布一条消息-》持久化交换器时,Rabbit会在消息提交到日志文件之后才发送响应。记住,如果消息如果路由到非持久化队列中,会自动从持久性日志中移除。
一旦你消费了持久化消息,rabbitmq会在持久化日志中把消息标记为等待垃圾收集。
持久化当然会有性能损失,尤其是在集群环境中。
AMQP事务:如果发布了持久化消息,服务器没有响应,怎么知道服务器是否已经持久化了呢。在AMQP中,在把信道设置为事务模式,你通过信道发送那些想要确认的消息,之后还有多个AMQP命令。这些命令是执行还是忽略,取决于第一条消息发送是否成功。
一旦你发送完所有命令,就可以提交了。事务是规范的一部分,但是事务几乎吸干了性能...
因此,Rabbitmq使用更好的方案:发送方确认模式。 和事务一样,你需要将信道设置为confirm模式,而且你只能通过重新创建信道来关闭该模式。一旦信道进入confirm模式,所有在信道上发布的消息都会被指派一个唯一的ID号(从1开始)。
一旦消息被投递给所有匹配的队列后,信道会发送一个发送方确认模式给生成者应用程序(包含了消息的唯一fID)。这使得生产者知道消息已经安全到达目的队列了。如果消息和队列是可持久化的,消息会在队列将消息写入磁盘之后再发出。
因此,生产者可以异步确认消息,同时避免了事务。此时的发送方确认模式非常轻量级,对rabbit代理服务器的性能影响几乎可以忽略不计。
下面按照官方的demo一个个演示....
三、Hello World
Hello World简单的构建了一个生产者和消费者
完整pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>demo</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Send程序:生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /**
* Created by carl.yu on 2016/11/14.
*/
public class Send {
private final static String QUEUE_NAME = "hello"; public static void main(String[] argv)
throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.211");
//1. 创建的物理TCP连接
Connection connection = factory.newConnection();
//2. TCP连接中创建一个信道
Channel channel = connection.createChannel();
//3. 声明channel信道的模式,参数说明
// the name of the queue: hello
// durable: false
// exclusive: true if we are declaring an exclusive queue (restricted to this connection)
// autoDelete - true if we are declaring an autodelete queue (server will delete it when no longer in use)
// arguments - other properties (construction arguments) for the queue
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
//4. 发布消息,空字符串""表示默认的交换器,参数说明:
//exchange - the exchange to publish the message to
//routingKey - the routing key
///props - other properties for the message - routing headers etc
//body - the message body
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//5. 关闭操作
channel.close();
connection.close();
}
}
Recv:消费者
import com.rabbitmq.client.*;
import java.io.IOException; /**
* Created by carl.yu on 2016/11/15.
*/
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.211");
final Connection connection = factory.newConnection();
//1. 消费者也需要创建信道
final Channel channel = connection.createChannel();
//2. 声明一个信道
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//3. 创建一个consumer,和channel进行绑定
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
System.exit(0);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
try {
System.out.println("关闭");
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
四、Work Queues
4.1 消费者负载均衡
rabbitmq天然支持消费者的负载均衡:
One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.
(注:这里没有使用官网的demo,有兴趣请去github上看源码)
NewTask.java 生产者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /**
* Created by carl.yu on 2016/11/15.
*/
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.211");
Connection connection = factory.newConnection();
for(int i = 1;i<10;i++){
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
//1. 创建消息,其中有三个.号,每个.号处理需要100ms,详见消费端代码
String message = "消息"+i+".第一段.第二段.第三段";
channel.basicPublish("", "hello", null, message.getBytes());
channel.close();
}
System.out.println("发送消息完毕");
connection.close();
}
}
Worker.java
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { // private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.211");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
System.out.println("等待消息中......"); final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8"); System.out.println("消费者1号获取消息:" + message + "'");
try {
doWork(message);
} finally {
// channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume("hello", autoAck, consumer);
/* Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});*/
} private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(300);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
再创建一个Worker2.java代码和Worker基本一样
运行Worker先,再运行NewTask,结果如下:
4.2 Message ackonwledgement
上述的消费者代码中用的方式都自动ack机制,当消息发送给消费者之后,rabbitmq马上从内存中移除这条消息。
当消费者因为意外或故意主动拒绝这条消息的时候会出现消息丢失的问题,这严重影响了消息的可靠性,大多数情况下,我们并不希望消息自动丢失
于是,rabbitmq为我们提供了ack机制,类似于tcp通信的ack回应,消费端主动通知rabbitmq,hey,兄弟,我成功收到了消息,如果没有回应,rabbitmq就认为消费端没有收到,于是就继续抬走下一个。
我们可以做个试验,首先确认下消息一定会丢失...消费者做个小小的改变,在doWork的时候直接故意发生异常,而生产者的代码发送10条改为1条,去掉for循环防止有影响...
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { // private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.211");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicQos(1); // accept only one unack-ed message at a time (see below)
System.out.println("等待消息中......");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8"); System.out.println("消费者1号获取消息:" + message + "'");
try {
doWork(message);
} finally {
// channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume("hello", autoAck, consumer);
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
} private static void doWork(String task) {
int count = 0;
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
count++;
Thread.sleep(300);
//发生了意外
if(count == 2){
System.out.println("发生了意外,中断了");
System.exit(0);
}
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
Worker连续运行2次,NewTask运行一次,发现消息丢失了....
如何使用ack机制呢?非常简单...手动ack
如果你忘记了手动ack,消息堆积问题,rabbitmq可没有rocketmq那么强悍的消息堆积能力... 此时可以通过运维命令去管理一下:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello
...done.
4.3 Message durability持久化消息
记得之前的持久化消息吗?如何保证持久化的消息呢?
持久化的Exchange和持久化的queue,因此上面的queue hello就不能用了,换个新的~
投递模式设置为2
修改代码:
消费者和生产端的queueDeclare方法可以把队列设置durable为true,默认的Exchange是持久化的不用管,注意生产者和消费者代码中队列都要修改为true,防止出现意外。
queueDeclare的方法参数在helloworld的demo中注释中有说明:
Declare a queue
Parameters:
queue - the name of the queue
durable - true if we are declaring a durable queue (the queue will survive a server restart)
exclusive - true if we are declaring an exclusive queue (restricted to this connection)
autoDelete - true if we are declaring an autodelete queue (server will delete it when no longer in use)
arguments - other properties (construction arguments) for the queue
Returns:
a declaration-confirm method to indicate the queue was successfully declared
Throws:
IOException - if an error is encountered
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
下面把投递模式设置为2:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
查看下源码:MessageProperties.PERSISTENT_TEXT_PLAIN的投递模式就是2
/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
修改完毕之后,修改消费端,手动ack,但是最后故意不提交。重启rabbitmq服务,发现task_queue依旧存在,而且消息也依旧存在,测试成功。
4.4 Fair dispatch
上面的负载均衡算法不一定完全符合我们的要求,举个例子,如果2个workers,当所有的奇数条信息非常重量级,而所有的偶数条信息非常轻量级,此时一个worker必然会很忙碌,而另一个很轻松。这不是我们所期望的,我们希望能够更公平。
而导致上述情形的原因是rabbitmq在消息入由到队列之后就分发了,根据rr平均分给了2个worker。
为了解决上面的问题,可以设置:
int prefetchCount = 1;
channel.basicQos(prefetchCount);
这告诉rabbitmq一次只给我一个消息处理,我没有返回ack的话,rabbitmq你别再继续给我压力了。因此,此时rabbitmq会把消息分发给那个不那么忙的worker。
五、Publish/Subscribe
注意到一个问题,我们之前的代码中根本没有任何一个片段去关注于Exchange...
我们只关注了以下概念:
- A producer is a user application that sends messages.
- A queue is a buffer that stores messages.
- A consumer is a user application that receives messages.
5.1 Exchanges
我们这里演示的fanout方式...
创建一个fanout的exchange方式为;
channel.exchangeDeclare("logs", "fanout");
使用管理命令可以列出所有的exchange:
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
Nameless exchange?
之前我们的代码形如:
channel.basicPublish("", "hello", null, message.getBytes());
根本就没有指定任何的exchange,使用的是空字符串,它表示的是默认交换器(the default or nameless exchange): 消息会通过routingKey来匹配路由到队列。官方的原话是: messages are routed to the queue with the name specified byroutingKey, if it exists.
现在,我们可以在发送消息的时候指明特定的交换器
channel.basicPublish( "logs", "", null, message.getBytes());
5.2 Temporary queues 临时队列
我们前面的队列都指定了名称,例如hello或者task_queue,可以给queue命名这个机制对我们非常重要,可以帮助我们在生产者和消费者中根据名字来决定使用同一个队列。
但是他并不符合我们现在的需求,我们的消费者希望去关注于所有相关的消息,也只关注于当前的消息,而不是过去的消息,为了解决这个问题需要2件事情:
(1) 首先,当我们消费者连接rabbitmq的时候需要一个新的空队列
(2) 其次,当我们消费者断开连接的时候该队列自动被删除
String queueName = channel.queueDeclare().getQueue();
上面代码会自动帮我们创建一个non-durable(非持久),exclusive(和当前连接绑定的),autodelete(自动删除的)的队列并且生成一个随机的队列名,很可能是类似于amq.gen-JzTY20BRgKO-HjmUJj0wLg的造型。
5.3 Bindings
我们已经创建一个fanout的交换器和一个队列了,现在我们只需要将他们绑定在一起即可。
channel.queueBind(queueName, "logs", "");
5.4 演示
生产端:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv)
throws Exception { ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.211");
Connection connection = factory.newConnection();
for(int i=1;i<10;i++){
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "消息"+i+".第一段.第二段.第三段";
//消息发送给交换器
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
channel.close();
}
System.out.println("发送消息完毕");
connection.close();
}
}
消费端:
package helloworld; import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.211");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//信道声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//消费者生成一个临时队列
String queueName = channel.queueDeclare().getQueue();
//将临时队列和交换器绑定
channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
再来一个一样的ReceiveLog2
演示顺序如下:
- 启动ReceiveLog和ReceiveLog2
- 启动Emitlog
- 关闭ReceiveLog2
- 启动ReceiveLog2,发现之前的信息不再收到
- 修改Emitlog,稍微修改下发送内容,例如循环有1-10变为11-20,发现ReceiveLog2可以继续接收
- 使用web管理页面瞄一下随机队列名字
- 断开连接,发现刚刚的队列都消失了,测试成功。
六、Routing
上面的例子中演示了一个简单的日志系统,生产者可以生产消息并且组播到一组消费者上去。
此时消费者接受发送到该Exchange的全部消息,假设我们只对其中部分兴趣感兴趣...
6.1 Bindings
"A Binding is a relationship between an exchange and a queue."
之前的代码是,该信道在在queue和exchange之间建立一个映射关系,即binding
channel.queueBind(queueName, EXCHANGE_NAME, "");
该binding还可以增加一个额外参数,称之为binding key.
binding key的含义跟exchange有关,比如前面的fanout类型会直接忽略掉该参数...
6.2 direct exchange
之前介绍了Direct exchange,这种交换器比较消息中的routing key和上面的binding key,匹配的话就将消息分发到队列。
上图中,一个Exhange绑定了2个队列:
Q1和X之间拥有一个binding,key是orange
Q2和X之间拥有2个binding,key是black和green
此时,如果消息的routing key是orange,会被入由到Q1,如果消息的routing key是black ,green会被入由到Q2,其他的消息会被直接丢弃。
一个Queue和Exchange之间可以有多个bindings, 不同的binding还可以拥有相同的binding key,如下图所示,此时的效果类似于fanout,所有的消息会同时入由到Q1和Q2,由此可见,direct的exchange可以非常灵活.
6.3 代码演示
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /**
* Created by carl.yu on 2016/11/16.
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv)
throws Exception { ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.211");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//生产者声明了一个direct交换器direct_logs
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.basicPublish(EXCHANGE_NAME, "warn", null, "this is a warning.".getBytes());
channel.basicPublish(EXCHANGE_NAME, "info", null, "this is a info.".getBytes());
channel.basicPublish(EXCHANGE_NAME, "error", null, "this is a error.".getBytes());
channel.basicPublish(EXCHANGE_NAME, "fatal", null, "this is a fatal.".getBytes()); System.out.println("finish...");
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.211");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//创建一个临时队列
String queueName = channel.queueDeclare().getQueue(); //队列绑定到
// channel.queueBind(queueName, EXCHANGE_NAME, "info");
// channel.queueBind(queueName, EXCHANGE_NAME, "warn");
channel.queueBind(queueName, EXCHANGE_NAME, "error");
channel.queueBind(queueName, EXCHANGE_NAME, "fatal");
System.out.println(" 等待消息"); Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] 收到消息 '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
总结:
1. 上面同样适用临时队列实现了订阅功能,但是只订阅了部分信息,routing key 为error和fatal的才会被接受
2. 生产者虽然发送了warn和info信息,但是由于没有队列接受,消息被直接丢弃
七. Topics
7.1 Topic exchange
在上面的direct例子中,并不能"很方便"的实现多条件订阅,只能按照"特定"的"routeing key"进行路由,因此有了topic exchange.
Topic exchange支持以通配符的方式进行匹配:
- * (star) can substitute for exactly one word.
- # (hash) can substitute for zero or more words.
如上图所示:
1.有一个交换器X,它的类型是topic
2.有2个队列Q1,Q2
3.Q1和X之间有一个binding,所有route_key为*.orange.*的信息会被入由至Q1
4.Q2和X之间有2个bindings
举个例子:此时有个信息,它的routing key是"quick.orange.rabbit",会被同时发送到Q1和Q2
"lazy.orange.elephant"会被发送到Q1和Q2
"quick.orange.fox"会发送到Q1,而不是Q2
"lazy.pink.rabbit"会被发送到Q2,而不是Q1
"quick.brown.fox"不匹配Q1,也不匹配Q2,因此会被丢弃
"lazy.orange.male.rabbit"匹配Q2,因为#指代多个单词
Topic exchange非常强大:可以很轻易的像其他的exchange....
(1) 如果一个Queue的binding key是"#",它会接收所有的的messages,就像fanout
(2) 如果一个Queue的binding key没有*或#,是一个指定的字符串,此时就像direct..
实现:跟上面类似,只是exchange变成了topic