目录:
一、RabbitMQ部署
二、认识RabbitMQ
三、RabbitMQ快速入门
四、Spring AMQP
五、各种消息模型实例
六、消息转换器
一、RabbitMQ部署
- 1、在线拉取image
命令:docker pull rabbitmq:3-management
- 2、安装RabbitMQ
命令中对应的username
和password
修改为自己要设置的用户名及密码即可;
-p 15672:15672
中15672
端口是Management Plugin管理插件的访问端口,如:127.0.0.1:15672
,或者win访问虚拟机IP+端口:192.168.255.1:15672
;
-p 5672:5672
中的5672
端口是RabbitMQ容器的默认端口。
docker run \
-d \
-p 15672:15672 \
-p 5672:5672 \
--hostname my-rabbit \
--name mrmq \
-e RABBITMQ_DEFAULT_USER=username \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3-management
这样,我们就已经完成RabbitMQ的部署,就可以开始使用了。
二、认识RabbitMQ
- publisher:生产者
- consumer:消费者
- exchange:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
- 2、RabbitMQ消息模型
Ⅰ 基本消息队列
① BasicQueue
Ⅱ 工作消息队列
② WorkQueue
Ⅲ 发布订阅模型
③ Fanout Exchange(广播)
④ Direct Exchange(路由)
⑤ Topic Exchange(主题)
MQ存在的好处就是,消息发送方和消息接收方之间可以实现异步通信,由中间的Broker帮助完成。
三、RabbitMQ快速入门
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.253.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("test");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.253.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("test");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
此处的VirtualHost可以通过Management Plugin查询,具体如下:
四、Spring AMQP
此处,我们以基本消息队列BasicQueue
为例
- 1、Spring AMQP的功能
- 自动声明队列、交换机及其绑定关系;
- 基于注解的监听器模式,异步接收消息;
- 封装了RabbitTemplate工具,用于发送消息 。
- 2、基础依赖
引入依赖:spring-boot-starter-amqp
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
① 消息发送方配置application.yml
# rabbitmq
spring:
rabbitmq:
host: 192.168.253.128
port: 5672
virtual-host: /
username: test
password: 123456
② 发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "Hello, Spring AMQP!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
此时,我们就可以通过管理插件查看已经发送的消息。如下:
① 消息接收方配置application.yml
此处配置与消费方一致。
② 接收消息
@Component
public class SpringRabbitMQListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMsg(String msg){
System.out.println(msg);
}
}
注意:此处消息被消费后,对应的simple.queue
中的消息就消失了
五、各种消息模型实例
基本消息队列BasicQueue
即为上方的代码,此处不再重复。
WorkQueue
与BasicQueue
不同之处,就是WorkQueue
支持一对多发布消息(不是一个消息发给多个消费者,一个消息只会被一个消费者消费),多个消费者可以提高消息消费速度,当然相同之处也是消息消费后就会从Queue中消失(后续的几种模型都是如此)。
① 模拟消息堆积
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "Message_";
for (int i = 1; i <= 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
② 接收消息
此处设置两个线程处理速度不同。
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
处理结果是2个消费者会均分消息。可以修改消费方的配置,以按照实际处理能力分配,如下:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
① 编写Fanout配置类
创建FanoutExchange,绑定队列Queue和交换机Exchange。
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("stone.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
② 发送消息
// 队列名称
String exchangeName = "stone.fanout";
// 消息
String message = "Hello, Fanout!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
③ 接收消息
@RabbitListener(queues = "fanout.queue1")
public void listen1FanoutQueueMsg(String msg){
System.out.println("Listener1 get :" + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listen2FanoutQueueMsg(String msg){
System.out.println("Listener2 get :" + msg);
}
不同于WorkQueue
,Fanout Exchange
广播模型下,绑定该交换机的消费者可以获取到对应的消息(即一条消息可以通过交换机被多个消费者消费)。
① 基于注解声明队列和交换机
@RabbitListener的使用
Ⅰ bindings = @QueueBinding()
配置绑定关系;
Ⅱ value = @Queue(name = "direct.queue1")
配置队列;
Ⅲ exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT)
配置交换机;
Ⅳ key = {"talkshow", "musicshow"}
配置订阅。
注意:type = ExchangeTypes.DIRECT
是默认类型,可以不做配置。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT),
key = {"talkshow", "musicshow"}
))
public void listenDirectQueue1(String msg){
System.out.println("DirectQueue1 :" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT),
key = {"talkshow", "news"}
))
public void listenDirectQueue2(String msg){
System.out.println("DirectQueue2 :" + msg);
}
② 发送消息
// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String messageNews = "乌俄冲突升级,昔日友邦冷眼旁观!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "news", messageNews);
// 消息
String messageTalks = "蜘蛛侠3英雄无归发布蓝光预告,主演再登SN宣传!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "talkshow", messageTalks);
此时:订阅news
主题的队列direct.queue1
可以消费messageNews
,订阅talkshow
主题的direct.queue1
和direct.queue2
均可以消费messageTalks
。
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符。通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
① 发送消息
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "建设更高水平法治中国";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
② 接收消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "stone.topic", type = ExchangeTypes.TOPIC),
key = {"China.#"}
))
public void listenTopicQueue1(String msg){
System.out.println("TopicQueue1 :" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "stone.topic", type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String msg){
System.out.println("TopicQueue2 :" + msg);
}
此时,由于消息Topic满足两个队列的订阅规则,所以两个队列都可以消费到消息。
六、消息转换器
① 发送消息
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "Jackson");
msg.put("age", 24);
// 发送消息
rabbitTemplate.convertAndSend("simple.queue","", msg);
此时,我们通过管理平台可以看到传送的消息是序列化后的消息,是Java原生的序列化类型。
② 接收消息
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(HashMap<String, Object> msg){
System.out.println("object.queue get msg is: " + msg);
}
接收到的消息也可以正常被反序列化:object.queue get msg is: {name=Jackson, age=24}
。
注意:在修改原生转换器时,要同时修改消息发送方和接收方的转换器,不然会报错。
① 基础依赖
在消息发送方和接收方,添加JSON转换器的依赖jackson-dataformat-xml
。
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
② 注册JSON转换器Bean
在消息发送方和接收方,注册JSON转换器到Spring容器中。
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
此时,我们再发送消息,管理平台显示的就是JSON类型。
七、结尾
以上即为RabbitMQ的基础内容