Springboot 配置RabbitMQ文档的方法步骤

时间:2022-05-22 09:49:01

简介

rabbitmq是实现amqp(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗

概念:

  • 生产者 消息的产生方,负责将消息推送到消息队列
  • 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
  • 队列 消息的寄存器,负责存放生产者发送的消息
  • 交换机 负责根据一定规则分发生产者产生的消息
  • 绑定 完成交换机和队列之间的绑定

模式:

  • direct:直连模式,用于实例间的任务分发
  • topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列
  • headers:适用规则复杂的分发,用headers里的参数表达规则
  • fanout:分发给所有绑定到该exchange上的队列,忽略routing key

springboot集成rabbitmq

一、引入maven依赖

?
1
2
3
4
5
<dependency>
 <groupid>org.springframework.boot</groupid>
 <artifactid>spring-boot-starter-amqp</artifactid>
 <version>1.5.2.release</version>
</dependency>

二、配置application.properties

?
1
2
3
4
5
6
# rabbitmq
spring.rabbitmq.host = dev-mq.a.pa.com
spring.rabbitmq.port = 5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = admin
spring.rabbitmq.virtualhost = /message-test/

三、编写amqpconfiguration配置文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package message.test.configuration;
import org.springframework.amqp.core.acknowledgemode;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.queue;
import org.springframework.amqp.rabbit.config.simplerabbitlistenercontainerfactory;
import org.springframework.amqp.rabbit.connection.cachingconnectionfactory;
import org.springframework.amqp.rabbit.connection.connectionfactory;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.boot.autoconfigure.amqp.rabbitproperties;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
 
@configuration
public class amqpconfiguration {
/**
 * 消息编码
 */
 public static final string message_encoding = "utf-8";
 public static final string exchange_issue = "exchange_message_issue";
 public static final string queue_issue_user = "queue_message_issue_user";
 public static final string queue_issue_all_user = "queue_message_issue_all_user";
 public static final string queue_issue_all_device = "queue_message_issue_all_device";
 public static final string queue_issue_city = "queue_message_issue_city";
 public static final string routing_key_issue_user = "routing_key_message_issue_user";
 public static final string routing_key_issue_all_user = "routing_key_message_issue_all_user";
 public static final string routing_key_issue_all_device = "routing_key_message_issue_all_device";
 public static final string routing_key_issue_city = "routing_key_message_issue_city";
 public static final string exchange_push = "exchange_message_push";
 public static final string queue_push_result = "queue_message_push_result";
 
 @autowired
 private rabbitproperties rabbitproperties;
 
 @bean
 public queue issueuserqueue() {
  return new queue(queue_issue_user);
 }
 
 @bean
 public queue issuealluserqueue() {
  return new queue(queue_issue_all_user);
 }
 
 @bean
 public queue issuealldevicequeue() {
  return new queue(queue_issue_all_device);
 }
 
 @bean
 public queue issuecityqueue() {
  return new queue(queue_issue_city);
 }
 
 @bean
 public queue pushresultqueue() {
  return new queue(queue_push_result);
 }
 
 @bean
 public directexchange issueexchange() {
  return new directexchange(exchange_issue);
 }
 
 @bean
 public directexchange pushexchange() {
  // 参数1:队列
  // 参数2:是否持久化
  // 参数3:是否自动删除
  return new directexchange(exchange_push, true, true);
 }
 
 @bean
 public binding issueuserqueuebinding(@qualifier("issueuserqueue") queue queue,
    @qualifier("issueexchange") directexchange exchange) {
   return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_user);
 }
 
 @bean
 public binding issuealluserqueuebinding(@qualifier("issuealluserqueue") queue queue,
    @qualifier("issueexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_user);
 }
 
 @bean
 public binding issuealldevicequeuebinding(@qualifier("issuealldevicequeue") queue queue,
    @qualifier("issueexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_device);
 }
 
 @bean
 public binding issuecityqueuebinding(@qualifier("issuecityqueue") queue queue,
    @qualifier("issueexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_city);
 }
 
 @bean
 public binding pushresultqueuebinding(@qualifier("pushresultqueue") queue queue,
    @qualifier("pushexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).withqueuename();
 }
 
 @bean
 public connectionfactory defaultconnectionfactory() {
  cachingconnectionfactory connectionfactory = new cachingconnectionfactory();
  connectionfactory.sethost(rabbitproperties.gethost());
  connectionfactory.setport(rabbitproperties.getport());
  connectionfactory.setusername(rabbitproperties.getusername());
  connectionfactory.setpassword(rabbitproperties.getpassword());
  connectionfactory.setvirtualhost(rabbitproperties.getvirtualhost());
  return connectionfactory;
 }
 
 @bean
 public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory(
    @qualifier("defaultconnectionfactory") connectionfactory connectionfactory) {
  simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory();
  factory.setconnectionfactory(connectionfactory);
  factory.setacknowledgemode(acknowledgemode.manual);
  return factory;
 }
 
 @bean
 public amqptemplate rabbittemplate(@qualifier("defaultconnectionfactory") connectionfactory connectionfactory)
 {
  return new rabbittemplate(connectionfactory);
 }
}

三、编写生产者

?
1
2
3
body = json.tojsonstring(issuemessage).getbytes(amqpconfiguration.message_encoding);
 rabbittemplate.convertandsend(amqpconfiguration.exchange_issue,
            amqpconfiguration.routing_key_issue_user, body);

四、编写消费者

?
1
2
3
4
5
@rabbitlistener(queues = amqpconfiguration.queue_push_result)
public void handlepushresult(@payload byte[] data, channel channel,
    @header(amqpheaders.delivery_tag) long deliverytag) {
    
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://segmentfault.com/a/1190000018555963