RabbitMQ学习笔记3-使用topic交换器

时间:2022-08-27 23:04:57

topic的路由规则里使用【.】号分隔单词,使用【*】号匹配1个单词,使用【#】匹配多个.和多个*。

在下面的例子中:

logger.*可以匹配logger.error和logger.warning,但logger*.error只能匹配logger.error

logger#可以匹配到logger.error和logger.warning。

下面的例子使用topic接收警告、错误的日志,并根据匹配的路由规则发送给不同的Queue队列来处理的例子:

日志生产者SenderWithTopicExchange

 package com.yzl.test2;

 import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /**
* 使用topic交换器发送消息
* 分为警告和错误2种日志
* @author: yzl
* @date: 2016-10-22
*/
public class SenderWithTopicExchange {
//交换器名称
private static final String EXCHANGE_NAME = "myTopicExchange";
//路由键的前缀
private static final String BASE_ROUTING_KEY = "logger."; public static void main(String[] args) throws Exception {
//使用CountDownLatch控制2个线程一起运行
final CountDownLatch cdl = new CountDownLatch(2);
//连接到rabbitmq服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
//创建一个信道
final Channel channel = connection.createChannel();
//定义一个名字为topicExchange的topic类型的exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); ExecutorService pool = Executors.newFixedThreadPool(2);
pool.submit(new Runnable() {
@Override
public void run() {
try {
cdl.await();
//发送警告日志,绑定路由键:logger.warning
String warningMsg = "warning message is :";
for(int i=1; i<800; i++){
System.out.println("发送警告消息:" + warningMsg+i);
channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "warning", null, (warningMsg+i).getBytes());
Thread.sleep(2000L);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
pool.submit(new Runnable() {
@Override
public void run() {
try {
cdl.await();
//发送错误日志,绑定路由键:logger.error
String errorMsg = "error message is :";
for(int i=1; i<1000; i++){
System.out.println("发送错误消息:" + errorMsg+i);
channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "error", null, (errorMsg+i).getBytes());
Thread.sleep(2000L);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}); cdl.countDown();
cdl.countDown();
}
}

消息消费者ReceiverWithTopicExchange

 package com.yzl.test2;

 import java.io.IOException;

 import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope; /**
* 使用topic交换器接收消息
*
* @author: yzl
* @date: 2016-10-22
*/
public class ReceiverWithTopicExchange {
// 交换器名称
private static final String EXCHANGE_NAME = "myTopicExchange"; public static void main(String[] args) throws Exception {
// 连接到rabbitmq服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
// 创建一个信道
final Channel channel = connection.createChannel();
// 定义一个名字为topicExchange的topic类型的exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //定义接收警告消息的队列
channel.queueDeclare("warningQueue", false, false, false, null);
//定义接收错误消息的队列
channel.queueDeclare("errorQueue", false, false, false, null);
//定义接收所有级别日志消息的队列
channel.queueDeclare("allLoggerQueue", false, false, false, null); //使用logger.warning路由键绑定myTopicExchange与warningQueue
channel.queueBind("warningQueue", EXCHANGE_NAME, "logger.warning");
//使用logger.error路由键绑定myTopicExchange与errorQueue
channel.queueBind("errorQueue", EXCHANGE_NAME, "logger.error");
//使用logger.*路由规则绑定myTopicExchange与allLoggerQueue
channel.queueBind("allLoggerQueue", EXCHANGE_NAME, "logger.*"); //只处理警告日志,使用手动ack确认
channel.basicConsume("warningQueue", false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body);
System.out.println("warningQueue accept a warning msg :" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
//只处理错误日志,使用手动ack确认
channel.basicConsume("errorQueue", false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body);
System.out.println("errorQueue accept a error msg :" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
//处理警告和错误日志,使用手动ack确认
channel.basicConsume("allLoggerQueue", false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body);
System.out.println("allLoggerQueue accept a logger msg :" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

结果输出:

发送警告消息:warning message is :1
发送错误消息:error message is :1
发送警告消息:warning message is :2
发送错误消息:error message is :2
发送错误消息:error message is :3
发送警告消息:warning message is :3
allLoggerQueue accept a logger msg :error message is :1
allLoggerQueue accept a logger msg :warning message is :1
errorQueue accept a error msg :error message is :1
warningQueue accept a warning msg :warning message is :1
warningQueue accept a warning msg :warning message is :2
errorQueue accept a error msg :error message is :2
allLoggerQueue accept a logger msg :warning message is :2
allLoggerQueue accept a logger msg :error message is :2
allLoggerQueue accept a logger msg :warning message is :3
errorQueue accept a error msg :error message is :3
warningQueue accept a warning msg :warning message is :3
allLoggerQueue accept a logger msg :error message is :3

消息处理流程:

RabbitMQ学习笔记3-使用topic交换器

RabbitMQ学习笔记3-使用topic交换器的更多相关文章

  1. RabbitMQ学习笔记4-使用fanout交换器

    fanout交换器会把发送给它的所有消息发送给绑定在它上面的队列,起到广播一样的效果. 本里使用实际业务中常见的例子, 订单系统:创建订单,然后发送一个事件消息 积分系统:发送订单的积分奖励 短信平台 ...

  2. RabbitMQ学习笔记(五) Topic

    更多的问题 Direct Exchange帮助我们解决了分类发布与订阅消息的问题,但是Direct Exchange的问题是,它所使用的routingKey是一个简单字符串,这决定了它只能按照一个条件 ...

  3. 官网英文版学习——RabbitMQ学习笔记(一)认识RabbitMQ

    鉴于目前中文的RabbitMQ教程很缺,本博主虽然买了一本rabbitMQ的书,遗憾的是该书的代码用的不是java语言,看起来也有些不爽,且网友们不同人学习所写不同,本博主看的有些地方不太理想,为此本 ...

  4. RabbitMQ学习笔记1-hello world

    安装过程略过,一搜一大把. rabbitmq管理控制台:http://localhost:15672/   默认账户:guest/guest RabbitMQ默认监听端口:5672 JAVA API地 ...

  5. 官网英文版学习——RabbitMQ学习笔记(十)RabbitMQ集群

    在第二节我们进行了RabbitMQ的安装,现在我们就RabbitMQ进行集群的搭建进行学习,参考官网地址是:http://www.rabbitmq.com/clustering.html 首先我们来看 ...

  6. (转) Rabbitmq学习笔记

    详见原文: http://blog.csdn.net/shatty/article/details/9529463 Rabbitmq学习笔记

  7. RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

    RabbitMQ优先级队列注意点: 1.只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效 2.RabbitMQ3.5以后才支持优先级队列 代码在博客:RabbitMQ学习笔记三:Java ...

  8. 官网英文版学习——RabbitMQ学习笔记(七)Topic

    在上一篇中使用直接交换器改进了我们的系统,使得它能够有选择的进行接收消息,但它仍然有局限性——它不能基于多个条件进行路由.本节我们就进行能够基于多个条件进行路由的topics exchange学习. ...

  9. RabbitMQ学习笔记(六) RPC

    什么RPC? 这一段是从度娘摘抄的. RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的 ...

随机推荐

  1. &lpar;1&rpar; 深入理解Java虚拟机到底是什么&quest;

    好文转载:http://blog.csdn.net/zhangjg_blog/article/details/20380971 什么是Java虚拟机   作为一个Java程序员,我们每天都在写Java ...

  2. 李洪强iOS开发之通知的使用

    李洪强iOS开发之通知的使用 01 - 在A中发送通知 02 - 在B中监听通知 03 - 在B中通知出发的方法 04 - 在B控制器viewDidLoad调用通知

  3. 【宽搜】【并查集】Vijos P1015 十字绣

    题目链接: https://vijos.org/p/1015 题目大意: n*m的网格,线只能在网格的顶点处才能从布的一面穿到另一面.每一段线都覆盖一个单位网格的两条对角线之一,而在绣的过程中,一针中 ...

  4. 果酷:80后IT男&OpenCurlyDoubleQuote;鲜果切”年入千万 &lowbar; 财经频道 &lowbar; 东方财富网&lpar;Eastmoney&period;com&rpar;

    果酷:80后IT男"鲜果切"年入千万 _ 财经频道 _ 东方财富网(Eastmoney.com) 果酷:80后IT男"鲜果切"年入千万

  5. Effective C&plus;&plus;规定45 附加代码

    这部分是额外的代码的博客.键45条款想法已经实现. #include<iostream> using namespace std; template<typename T> c ...

  6. iOS-SQLite&lpar;FMDB&rpar;

    在已经存在的表中,添加字段,更新表结构 /** Test to see if particular column exists for particular table in database @pa ...

  7. ASP&period;NET前台table通过Ajax获取绑定后台查询的json数据

    上一篇<ASP.NET前台html页面AJAX提交数据后台ashx页面接收数据>写了前台提交数据后台保存到数据库,数据处理以后用户肯定要查询.接下来就写一个前台table通过ajax  J ...

  8. springboot秒杀课程学习整理1-2

    1)从数据库到前端,做了三层转换,最后统一返回给前端的格式 DO-> model: 放在service中,目的是为了组装来自于数据库的数据,有些字段来自于不同的表的取,这一层相当于真正的业务模型 ...

  9. GenyMotion the virtual device got no ip address 问题解决

    不要再找答案了 升级你的virtual box到最新版本(目前是 5.0.26,已通过) 如果你是windows 10系统 必须关闭hyper-v 在管理员命令行下运行bcdedit /set hyp ...

  10. Supervised Hashing with Kernels&comma; KSH

    Notation 该论文中应用到较多符号,为避免混淆,在此进行解释: n:原始数据集的大小 l:实验中用于监督学习的数据集大小(矩阵S行/列的大小) m:辅助数据集,用于得到基于核的哈希函数 r:比特 ...