这篇文章主要讲基本的整合。先把代码跑起来,再说什么高级特性。
rabbitmq 中的一些术语
如果你打开 rabbitmq web 控制台,你会发现其中有一个 exhanges 不好理解。下面简单说明一下。
交换器(exchange)
交换器就像路由器,我们先是把消息发到交换器,然后交换器再根据路由键(routingkey)把消息投递到对应的队列。(明白这个概念很重要,后面的代码里面充分体现了这一点)
队列(queue)
队列很好理解,就不用解释了。
绑定(binding)
交换器怎么知道把这条消息投递到哪个队列呢?这就需要用到绑定了。大概就是:使用某个路由键(routingkey)把某个队列(queue)绑定到某个交换器(exchange),这样交换器就知道根据路由键把这条消息投递到哪个队列了。(后面的代码里面充分体现了这一点)
加入 rabbitmq maven 依赖
1
2
3
4
|
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
|
再加入另外一个依赖(这个依赖可省略,主要是用来简化代码)
1
2
3
4
5
|
<dependency>
<groupid>cn.hutool</groupid>
<artifactid>hutool-all</artifactid>
<version> 4.0 . 2 </version>
</dependency>
|
rabbitmqconfig.java 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
@configuration
public class rabbitmqconfig {
public final static string queue_name = "spring-boot-queue" ;
public final static string exchange_name = "spring-boot-exchange" ;
public final static string routing_key = "spring-boot-key" ;
// 创建队列
@bean
public queue queue() {
return new queue(queue_name);
}
// 创建一个 topic 类型的交换器
@bean
public topicexchange exchange() {
return new topicexchange(exchange_name);
}
// 使用路由键(routingkey)把队列(queue)绑定到交换器(exchange)
@bean
public binding binding(queue queue, topicexchange exchange) {
return bindingbuilder.bind(queue).to(exchange).with(routing_key);
}
@bean
public connectionfactory connectionfactory() {
cachingconnectionfactory connectionfactory = new cachingconnectionfactory( "127.0.0.1" , 5672 );
connectionfactory.setusername( "guest" );
connectionfactory.setpassword( "guest" );
return connectionfactory;
}
@bean
public rabbittemplate rabbittemplate(connectionfactory connectionfactory) {
return new rabbittemplate(connectionfactory);
}
}
|
生产者
直接调用 rabbittemplate 的 convertandsend 方法就可以了。从下面的代码里也可以看出,我们不是把消息直接发送到队列里面的,而是先发送到了交换器,交换器再根据路由键把我们的消息投递到对应的队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@restcontroller
public class producercontroller {
@autowired
private rabbittemplate rabbittemplate;
@getmapping ( "/sendmessage" )
public object sendmessage() {
new thread(() -> {
for ( int i = 0 ; i < 100 ; i++) {
string value = new datetime().tostring( "yyyy-mm-dd hh:mm:ss" );
console.log( "send message {}" , value);
rabbittemplate.convertandsend(rabbitmqconfig.exchange_name, rabbitmqconfig.routing_key, value);
}
}).start();
return "ok" ;
}
}
|
消费者
消费者也很简单,只需要对应的方法上加入 @rabbitlistener 注解,指定需要监听的队列名称即可。
1
2
3
4
5
6
7
8
|
@component
public class consumer {
@rabbitlistener (queues = rabbitmqconfig.queue_name)
public void consumemessage(string message) {
console.log( "consume message {}" , message);
}
}
|
运行项目
运行项目,然后打开浏览器,输入 http://localhost:9999/sendmessage
。在控制台就可以看到生产者在不停的的发送消息,消费者不断的在消费消息。
打开 rabbitmq web 控制台,也可以看到刚才我们在代码里面配置的交换器和队列,以及绑定信息。
点击进入交换器的详情
结语
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://my.oschina.net/u/3523423/blog/1618335