A. AMQP基础
RabbitMQ 并不是基于 Java 开发人员熟悉的 JMS 规范设计开发的,而是基于一个比 JMS 更新更合理的 AMQP (Advanced Message Queuing Protocol) 协议。所以,在开始 RabbitMQ 之旅前,需要先对 AMQP 有一定的了解。毕竟,RabbitMQ 就是 AMQP 协议的第一个开源实现。
a. AMQP messaging 中的基本概念
以下内容来自 http://www.cnblogs.com/frankyou/p/5283539.html
- Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之间的TCP连接。断开连接的操作只会在 client 端进行,broker不会断开连接,除非出现网络故障或 broker 服务出现问题
- Channel:如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最终被送到这里等待 consumer 取走。一个 message 可以被同时拷贝到多个 queue 中
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
简而言之,Publisher 发送的消息通过 Connection 中的 Channel 到达 Broker 的某个 Virtual Host,消息经过指定的 Exchange,根据 Binding 提供的分发依据,分发到 0~n 个 Queue 中;Queue 中的消息等待 Consumer 消费。
b. AMQP 模型解释
官网有一篇更详尽的模型解释:https://www.rabbitmq.com/tutorials/amqp-concepts.html
这篇文章一看就知道是好东西,可惜读起来太累了,以后再来回顾。
c. AMQP 规范
官网有 AMQP 规范的快速参考:https://www.rabbitmq.com/amqp-0-9-1-quickref.html
AMQP 规范全部内容可在 AMQP 官网查阅:http://www.amqp.org/
这些内容短期不会接触,留个脚印以后有机会再研究。
B. 你好,世界
参考资料:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
示例代码:https://github.com/gordonklg/study,rabbitmq module
gordon.study.rabbitmq.helloworld.Sender.java
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
private static final String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 10;) {
String message = "NO. " + ++i;
TimeUnit.MILLISECONDS.sleep(1000);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.printf("(%1$s)[===>%2$s ] %3$s\n", "Sender", ":" + QUEUE_NAME, message);
}
channel.close();
connection.close();
}
}
代码第12-14行通过 ConnectionFactory 创建了一个 Connection。Broker Host 地址为 localhost,端口是默认的 5672,用户密码是默认的 guest,VHOST 是默认的 "/"。(实际上默认的 Host 就是 localhost,第13行可以省略)
第15行在 Connection 中创建了一个虚拟的 Channel。
第17行申明了一个名字为 hello 的 Queue。不同于 JMS,AMQP 协议允许通过编程方式创建绝大多数的模型,例如 Exchange、Queue 等等。之所以是申明,是因为该方法会先判断当前 VHOST 中是否已存在一个同名的 Queue,如果不存在,则创建 Queue;如果存在,可能会直接使用已存在的 Queue,也可能返回异常(例如申明的 Queue 的属性与已存在的 Queue 的属性不一致时)。
queueDeclare 方法中三个 boolean 型参数指定了 Queue 的属性:
- durable:队列本身是否需要持久化。如果为 false,则 RabbitMQ 重启后该 Queue 消失。
- exclusive:排他性队列确保该队列只对申明它的连接可见,注意是连接而不是 Channel。当相应连接关闭时,该队列自动删除。
- autoDelete:设置为 true 的队列会在所有 Consumer 都断开连接时自动删除(不管是否是 durable)。队列被第一个 Consumer 连接前不会被删除。
第22行发布一个消息。basicPublish 方法第一个参数指定发布该消息所使用的 Exchange 名称,空字符串为系统预先定义的默认 Exchange,类型为 direct。第二个参数指定路由键,对于 direct 类型的 Exchange,会将该消息发送到所有绑定到该 Exchange 并且也设定了相同的路由键的 Queue 中。
gordon.study.rabbitmq.helloworld.Receiver.java
public class Receiver {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.printf(" [ %2$s<===](%1$s) %3$s\n", "Receiver", QUEUE_NAME, message);
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
代码从第13行开始定义一个 Consumer,通过 handleDelivery 回调接口,我们可以处理从队列中获取的消息。
第25行启动一个 Consumer,basicConsume 方法第一个参数指定 Consumer 消费的队列,即 hello 队列;第二个参数 autoAck 指定消息确认模式,true 表示消息确认是自动完成的(至少在进入 handleDelivery 方法前就已经自动确认了),false 表示必须由 Consumer 自己确认;第三个参数指定 Consumer
现在,可以花式运行两个类,看看发生了什么了。
RabbitMQ入门_02_HelloWorld的更多相关文章
-
2.RABBITMQ 入门 - WINDOWS - 生产和消费消息 一个完整案例
关于安装和配置,见上一篇 1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置 公司有需求,要求使用winform开发这个东西(消息中间件),另外还要求开发一个日志中间件,但是也是要求 ...
-
RabbitMQ入门-从HelloWorld开始
从读者的反馈谈RabbitMQ 昨天发完<RabbitMQ入门-初识RabbitMQ>,我陆陆续续收到一些反馈.鉴于部分读者希望结合实例来讲 期待下篇详细,最好结合案例.谢谢! 哪都好,唯 ...
-
RabbitMQ入门-高效的Work模式
扛不住的Hello World模式 上篇<RabbitMQ入门-从HelloWorld开始>介绍了RabbitMQ中最基本的Hello World模型.正如其名,Hello World模型 ...
-
RabbitMQ入门-消息订阅模式
消息派发 上篇<RabbitMQ入门-消息派发那些事儿>发布之后,收了不少反馈,其中问的最多的还是有关消息确认以及超时等场景的处理. 楼主,有遇到消费者后台进程不在,但consumer连接 ...
-
RabbitMQ入门-Topic模式
上篇<RabbitMQ入门-Routing直连模式>我们介绍了可以定向发送消息,并可以根据自定义规则派发消息.看起来,这个Routing模式已经算灵活的了,但是,这还不够,我们还有更加多样 ...
-
RabbitMQ入门与使用篇
介绍 RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue)协议的开源实现.用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面都非常的优秀 ...
-
[转]RabbitMQ入门教程(概念,应用场景,安装,使用)
原文地址:https://www.jianshu.com/p/dae5bbed39b1 RabbitMQ 简介 RabbitMQ是一个在AMQP(Advanced Message Queuing Pr ...
-
RabbitMQ 入门指南——安装
RabbitMQ好文 Rabbitmq Java Client Api详解 tohxyblog-博客园-rabbitMQ教程系列 robertohuang-CSDN-rabbitMQ教程系列 Rabb ...
-
RabbitMQ入门:总结
随着上一篇博文的发布,RabbitMQ的基础内容我也学习完了,RabbitMQ入门系列的博客跟着收官了,以后有机会的话再写一些在实战中的应用分享,多谢大家一直以来的支持和认可. RabbitMQ入门系 ...
随机推荐
-
git初体验(三)git分支
分支的理念就是分身,就像孙悟空拔出猴毛变出很多跟自己一模一样的猴子,然后每个猴子做自己的事情互不干涉,等到所有猴子做完之后,猴子集合来合并劳动成果,然后悟空就把那些猴子猴孙门统统收回了. 你创建了一个 ...
-
Bash实用技巧:同时循环两个列表
摘要: 你会学到一种原创的同时循环两个列表的方法.类似于Python或者Haskell的zip函数,非常简洁直观,效果如下: $ paste <( ) <( ) | while read ...
-
#技塑人生# windows2008无法远程— 注册表缺失键值导致高级防火墙服务异常
windows2008无法远程— 注册表缺失键值导致高级防火墙服务异常 阿里云技术支持中心:章阿贵 一.远程无法访问(windows server 2008) 症状:无法远程但是系统内网络正常,防火墙 ...
-
执行发送邮件Send方法时,报错:邮箱不可用。 服务器响应为: 5.7.1 Unable to relay for xxx@xxx.com
.net代码在执行发送邮件Send方法时,往往出现这个的报错: 邮箱不可用. 服务器响应为: 5.7.1 Unable to relay for xxx@xxx.com 这个问题应该是smtp的设置问 ...
-
基于C#实现的自动化测试框架:发布自动触发自动化回归测试
接口自动化测试用例完成以后,以前都是发布以后手动运行测试用例.虽然手动运行下脚本也就是一个F5的事情,但是离自动化测试的标准差得很远.这两天有了个大胆的想法,想要实现以下发布时直接触发自动化回归测试用 ...
-
Linux&#160;学习笔记之超详细基础linux命令&#160;Part&#160;2
Linux学习笔记之超详细基础linux命令 by:授客 QQ:1033553122 ---------------------------------接Part 1----------------- ...
-
ASCII UTF-8 编码
1. ASCII码 我们知道,在计算机内部,所有的信息最终都表示为一个二进制的字符串.每一个二进制位(bit)有0和1两种状态,因此八个二进制位就可以组合出256种状态,这被称为一个字节(byte). ...
-
解析Google集群资源管理系统Omega
1. 背景 Google的第一代/第二代集群(资源)管理系统被称为Borg,Borg设计细节因零零星星出现在各种文章中而知名,但一直未公开(比如发一篇paper).然而,我们可从腾讯公布的Torca( ...
-
使用 vux 框架
1)vux官网:https://vux.li/#/ 2)通过 vue-cli 工具使用 vux 1.如果没有安装 nodejs,请先前往 nodejs 官网下载并安装 nodejs,传送门:https ...
-
Centos7安装killall,fuser, killall,pstree和pstree.x11
centos7精简安装后,使用中发现没有killall命令. 经查找,可以通过以下命令解决: yum -y install psmisc 简单介绍一下 psmisc : Psmisc软件包包含三个帮助 ...