根据以上对应关系进行代码编写,其中包括两个队列:disk和console,一个生产者,两个消费者,一个direct类型的交换,与disk队列通过error进行绑定,与console队列通过info和warning两个routingkey进行绑定。最终进行测试,查看结果。
1、消费者代码
同样的,代码整体逻辑与之前的案例没什么不同,只是交换机类型变更,以及需要指定routingkey参数。
消费者01代码如下:
/**
* Description: 路由模式消费者01
*/
public class ReceiveLogsDirect01 {
//设置要创建的交换机的名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//创建fanout交换机
/**
* 参数1:交换机名
* 参数2:交换机类型,本次设置为direct
* 参数3:交换机是否持久化
*/
channel.exchangeDeclare(EXCHANGE_NAME, "direct", false);
channel.queueDeclare("disk", false, false, false, null);
//将交换机与队列进行绑定(binding)
/**
* 参数1:队列名
* 参数2:交换机名
* 参数3:路由关键字,发布订阅模式写""空串即可
*/
channel.queueBind("disk", EXCHANGE_NAME, "error");
//接收消息
channel.basicConsume("disk", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsDirect01控制台打印接收到的消息: " + message);
}
});
}
}
消费者02代码如下:
/**
* Description: 路由模式消费者02
*/
public class ReceiveLogsDirect02 {
//设置要创建的交换机的名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct", false);
channel.queueDeclare("console", false, false, false, null);
//多重绑定
channel.queueBind("console", EXCHANGE_NAME, "info");
channel.queueBind("console", EXCHANGE_NAME, "warning");
//接收消息
channel.basicConsume("console", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsDirect02控制台打印接收到的消息: " + message);
}
});
}
}
2、生产者代码
/**
* Description: 路由模式生产者
*/
public class EmitLogDirect {
//交换机名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//准备3条消息
String message1 = "这是一条info信息";
String message2 = "这是一条warning信息";
String message3 = "这是一条error信息";
channel.basicPublish(EXCHANGE_NAME, "info", null, message1.getBytes());
channel.basicPublish(EXCHANGE_NAME, "warning", null, message2.getBytes());
channel.basicPublish(EXCHANGE_NAME, "error", null, message3.getBytes());
//关闭资源
channel.close();
}
}
3、运行结果分析
先将两个消费者运行,因为它们负责声明交换机创建队列以及绑定关系,再启动生产者发送消息,此时会看到message1和message2发给了ReceiveLogsDirect02
,而message3发送给了ReceiveLogsDirect01
在此过程中,生产者在发送消息时指定了EXCHANGE_NAME
,无论是什么消息都先发送给direct类型的交换机,它的名字设置为了direct_logs
,然后交换机会根据routingkey的路由规则决定该消息转发给哪个队列,以及是否要丢弃。假如此时我们发送一个routingkey为debug的消息,交换机由于找不到转发目标会将该消息丢弃。