Spring boot集成RabbitMQ的示例代码

时间:2022-09-07 14:00:05

rabbitmq简介

rabbitmq是一个在amqp基础上完整的,可复用的企业消息系统

mq全称为message queue, 消息队列(mq)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

amqp就是一个协议,是一个高级抽象层消息通信协议。

虽然在同步消息通讯的世界里有很多公开标准(如 cobar的 iiop ,或者是 soap 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 msmq ,ibm 的 websphere mq 等),因此,在 2006 年的 6 月,cisco 、redhat、imatix 等联合制定了 amqp 的公开标准。也就是说amqp是异步通讯的一个协议。

rabbitmq使用场景

在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。不过大多数不仅仅是无需即时返回,甚至是执行是否成功都无所谓。如果需要即时返回则可以使用dubbo,spring boot与dubbo集成可以去看spring boot 集成dubbox

rabbitmq依赖

rabbitmq并不是直接一个简单的jar包(jar包只是提供一个基本的与rabbitmq本身通讯的一些功能),和dubbo相同,rabbitmq也需要其他软件来运行,以下是rabbitmq运行所需要的软件

1、erlang

由于rabbitmq软件本身是基于erlang开发的,所以想要运行rabbitmq必须要先按照erlang

erlang官网

erlang下载地址

rabbitmq

rabbitmq才是实现消息队列的核心

rabbitmq官网

rabbitmq下载

配置rabbitmq

安装完成后,需要完成一些配置才能使用rabbitmq,可以直接用cmd到rabbitmq的安装目录下的sbin目录通过命令配置,也可以直接在开始菜单中直接找到rabbitmq command prompt (sbin dir)运行直接到达rabbitmq的安装目录的sbin,为了方便,我们先启用管理插件,执行命令

?
1
rabbitmq-plugins.bat enable rabbitmq_management

即可,注意,这是在windows下面,如果是linux则没有bat后缀 然后我们添加一个用户,因为在外网环境没有用户的情况下是不能连接成功的,执行添加用户命令

?
1
rabbitmqctl.bat add_user springboot password

springboot是用户名,password是密码

然后为了方便演示,我们给springboot赋予管理员权限,方便登录管理页面

?
1
rabbitmqctl.bat set_user_tags springboot administrator

给账号赋予虚拟主机权限

?
1
rabbitmqctl.bat set_permissions -p / springboot .* .* .*

然后启动rabbitmq服务 访问rabbitmq管理页面http://localhost:15672即可看见登录页面,如果没有创建用户则可以用guest,guest登录,如果有创建用户则用创建的用户登录

创建springboot项目

因为创建spring boot项目在前面的文章已经说过很多次了,所以这里就不多说了

添加rabbitmq相关依赖

?
1
2
3
4
5
<!-- rabbitmq -->
<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

没错,就是点配置,不过这样可能有点不理解,我还是把全部配置贴出来吧

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
 xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelversion>4.0.0</modelversion>
 
 <groupid>wang.raye.rabbitmq</groupid>
 <artifactid>demo1</artifactid>
 <version>0.0.1-snapshot</version>
 <packaging>jar</packaging>
 
 <name>demo1</name>
 <url>http://maven.apache.org</url>
 
 <properties>
  <project.build.sourceencoding>utf-8</project.build.sourceencoding>
 </properties>
<parent>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-parent</artifactid>
    <version>1.4.0.release</version>
  </parent>
 
  <dependencies>
    <dependency>
      <groupid>junit</groupid>
      <artifactid>junit</artifactid>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <!-- springboot -->
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-web</artifactid>
 
    </dependency>
    <!-- rabbitmq -->
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-amqp</artifactid>
    </dependency>
  </dependencies>
</project>

因为没有做其他操作,所以目前项目主要是依赖2个模块,一个sprig boot,一个rabbitmq

添加配置类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.acknowledgemode;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.queue;
import org.springframework.amqp.rabbit.connection.cachingconnectionfactory;
import org.springframework.amqp.rabbit.connection.connectionfactory;
import org.springframework.amqp.rabbit.core.channelawaremessagelistener;
import org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
/**
 * rabbitmq 的配置类
 *
 * @author raye
 * @since 2016年10月12日10:57:44
 */
@configuration
public class rabbitmqconfig {
  /** 消息交换机的名字*/
  public static final string exchange = "my-mq-exchange";
  /** 队列key1*/
  public static final string routingkey1 = "queue_one_key1";
  /** 队列key2*/
  public static final string routingkey2 = "queue_one_key2";
 
  /**
   * 配置链接信息
   * @return
   */
  @bean
  public connectionfactory connectionfactory() {
    cachingconnectionfactory connectionfactory = new cachingconnectionfactory("127.0.0.1",5672);
 
    connectionfactory.setusername("springboot");
    connectionfactory.setpassword("password");
    connectionfactory.setvirtualhost("/");
    connectionfactory.setpublisherconfirms(true); // 必须要设置
    return connectionfactory;
  }
 
  /**
   * 配置消息交换机
   * 针对消费者配置
    fanoutexchange: 将消息分发到所有的绑定队列,无routingkey的概念
    headersexchange :通过添加属性key-value匹配
    directexchange:按照routingkey分发到指定队列
    topicexchange:多关键字匹配
   */
  @bean
  public directexchange defaultexchange() {
    return new directexchange(exchange, true, false);
  }
  /**
   * 配置消息队列1
   * 针对消费者配置
   * @return
   */
  @bean
  public queue queue() {
    return new queue("queue_one", true); //队列持久
 
  }
  /**
   * 将消息队列1与交换机绑定
   * 针对消费者配置
   * @return
   */
  @bean
  public binding binding() {
    return bindingbuilder.bind(queue()).to(defaultexchange()).with(rabbitmqconfig.routingkey1);
  }
 
  /**
   * 配置消息队列2
   * 针对消费者配置
   * @return
   */
  @bean
  public queue queue1() {
    return new queue("queue_one1", true); //队列持久
 
  }
  /**
   * 将消息队列2与交换机绑定
   * 针对消费者配置
   * @return
   */
  @bean
  public binding binding1() {
    return bindingbuilder.bind(queue1()).to(defaultexchange()).with(rabbitmqconfig.routingkey2);
  }
  /**
   * 接受消息的监听,这个监听会接受消息队列1的消息
   * 针对消费者配置
   * @return
   */
  @bean
  public simplemessagelistenercontainer messagecontainer() {
    simplemessagelistenercontainer container = new simplemessagelistenercontainer(connectionfactory());
    container.setqueues(queue());
    container.setexposelistenerchannel(true);
    container.setmaxconcurrentconsumers(1);
    container.setconcurrentconsumers(1);
    container.setacknowledgemode(acknowledgemode.manual); //设置确认模式手工确认
    container.setmessagelistener(new channelawaremessagelistener() {
      public void onmessage(message message, com.rabbitmq.client.channel channel) throws exception {
        byte[] body = message.getbody();
        system.out.println("收到消息 : " + new string(body));
        channel.basicack(message.getmessageproperties().getdeliverytag(), false); //确认消息成功消费
 
      }
 
    });
    return container;
  }
  /**
   * 接受消息的监听,这个监听会接受消息队列1的消息
   * 针对消费者配置
   * @return
   */
  @bean
  public simplemessagelistenercontainer messagecontainer2() {
    simplemessagelistenercontainer container = new simplemessagelistenercontainer(connectionfactory());
    container.setqueues(queue1());
    container.setexposelistenerchannel(true);
    container.setmaxconcurrentconsumers(1);
    container.setconcurrentconsumers(1);
    container.setacknowledgemode(acknowledgemode.manual); //设置确认模式手工确认
    container.setmessagelistener(new channelawaremessagelistener() {
 
      public void onmessage(message message, com.rabbitmq.client.channel channel) throws exception {
        byte[] body = message.getbody();
        system.out.println("queue1 收到消息 : " + new string(body));
        channel.basicack(message.getmessageproperties().getdeliverytag(), false); //确认消息成功消费
      }
    });
    return container;
  }
}

注意,为了更好的展示如何配置,我配置了2个消息队列,而本类除了链接配置哪里,其他都是针对消息消费者的,当然不管消息消费者和消息生产者都需要配置链接信息,而为了方便,所以本项目的消息消费者和生产者都在本项目,一般实际项目中不会在同一项目,由于注释很详细,我就不多说了

发送消息

为了方便发送消息,所以我直接写了一个controller,通过访问接口的形式来调用发送消息的方法,话不多说,上代码

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package wang.raye.rabbitmq.demo1;
import java.util.uuid;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.amqp.rabbit.support.correlationdata;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;
 
/**
 * 测试rabbitmq发送消息的controller
 * @author raye
 *
 */
@restcontroller
public class sendcontroller implements rabbittemplate.confirmcallback{
  private rabbittemplate rabbittemplate;
  /**
   * 配置发送消息的rabbittemplate,因为是构造方法,所以不用注解spring也会自动注入(应该是新版本的特性)
   * @param rabbittemplate
   */
  public sendcontroller(rabbittemplate rabbittemplate){
    this.rabbittemplate = rabbittemplate;
    //设置消费回调
    this.rabbittemplate.setconfirmcallback(this);
  }
  /**
   * 向消息队列1中发送消息
   * @param msg
   * @return
   */
  @requestmapping("send1")
  public string send1(string msg){
    string uuid = uuid.randomuuid().tostring();
    correlationdata correlationid = new correlationdata(uuid);
    rabbittemplate.convertandsend(rabbitmqconfig.exchange, rabbitmqconfig.routingkey1, msg,
        correlationid);
    return null;
  }
  /**
   * 向消息队列2中发送消息
   * @param msg
   * @return
   */
  @requestmapping("send2")
  public string send2(string msg){
    string uuid = uuid.randomuuid().tostring();
    correlationdata correlationid = new correlationdata(uuid);
    rabbittemplate.convertandsend(rabbitmqconfig.exchange, rabbitmqconfig.routingkey2, msg,
        correlationid);
    return null;
  }
  /**
   * 消息的回调,主要是实现rabbittemplate.confirmcallback接口
   * 注意,消息回调只能代表成功消息发送到rabbitmq服务器,不能代表消息被成功处理和接受
   */
  public void confirm(correlationdata correlationdata, boolean ack, string cause) {
    system.out.println(" 回调id:" + correlationdata);
    if (ack) {
      system.out.println("消息成功消费");
    } else {
      system.out.println("消息消费失败:" + cause+"\n重新发送");
    }
  }
}

需要注意的是消息回调只能代表消息成功发送到rabbitmq服务器

然后我们启动项目,访问http://localhost:8082/send1?msg=aaaa 会发现控制台输出了

收到消息 : aaaa
 回调id:correlationdata [id=37e6e913-835a-4eca-98d1-807325c5900f]
消息成功消费

当然回调id可能不同,如果我们访问http://localhost:8082/send2?msg=bbbb 则输出

queue1 收到消息 : bbbb 
 回调id:correlationdata [id=0cec7500-3117-4aa2-9ea5-4790879812d4]
消息成功消费

最后说两句

因为本文主要是说明如何从零到springboot集成rabbitmq,所以对于rabbitmq的很多信息和用法没有说明,如果对rabbitmq本身不太熟悉的可以去看看其他关于rabbitmq的文章,附上本文demo

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:http://raye.wang/2016/12/08/spring-bootji-cheng-rabbitmq/