一、RabbitMQ的介绍
RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache).
消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下:
从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的消息,并且给予相应的处理.消息队列常用于分布式系统之间互相信息的传递.
对于RabbitMQ来说,除了这三个基本模块以外,还添加了一个模块,即交换机(Exchange).它使得生产者和消息队列之间产生了隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列.那么RabitMQ的工作流程如下所示:
紧接着说一下交换机.交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.
Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.
topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.
headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.
Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.
概念:
- 生产者 消息的产生方,负责将消息推送到消息队列
- 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
- 队列 消息的寄存器,负责存放生产者发送的消息
- 交换机 负责根据一定规则分发生产者产生的消息
- 绑定 完成交换机和队列之间的绑定
模式:
1、direct
直连模式,用于实例间的任务分发
2、topic
话题模式,通过可配置的规则分发给绑定在该exchange上的队列
3、headers
适用规则复杂的分发,用headers里的参数表达规则
4、fanout
分发给所有绑定到该exchange上的队列,忽略routing key
安装
单机版安装很简单,大概步骤如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# 安装erlang包
yum install erlang
# 安装socat
yum install socat
# 安装rabbit
rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm
# 启动服务
rabbitmq-server start
# 增加管理控制功能
rabbitmq-plugins enable rabbitmq_management
# 增加用户:
sudo rabbitmqctl add_user root password
rabbitmqctl set_user_tags root administrator
rabbitmqctl set_permissions -p / root '.*' '.*' '.*'
|
集群安装,可参考这篇文章:
以上就是rabbitmq的介绍,下面开始本文的正文:spring boot 集成rabbitmq ,本人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。
二、springboot配置
废话少说直接上代码:
配置参数
application.yml:
1
2
3
4
5
6
7
|
spring:
rabbitmq:
addresses: 192.168.1.1:5672
username: username
password: password
publisher-confirms: true
virtual-host: /
|
java config读取参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
/**
* RabbitMq配置文件读取类
*
* @author chenhf
* @create 2017-10-23 上午9:31
**/
@Configuration
@ConfigurationProperties (prefix = "spring.rabbitmq" )
public class RabbitMqConfig {
@Value ( "${spring.rabbitmq.addresses}" )
private String addresses;
@Value ( "${spring.rabbitmq.username}" )
private String username;
@Value ( "${spring.rabbitmq.password}" )
private String password;
@Value ( "${spring.rabbitmq.publisher-confirms}" )
private Boolean publisherConfirms;
@Value ( "${spring.rabbitmq.virtual-host}" )
private String virtualHost;
// 构建mq实例工厂
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(publisherConfirms);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
@Bean
@Scope (ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(){
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
}
|
三、rabbitmq生产者配置
主要配置了直连和话题模式,其中话题模式设置两个队列(queueTopicTest1、queueTopicTest2),此两个队列在和交换机绑定时分别设置不同的routingkey(.TEST.以及lazy.#)来验证匹配模式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
/**
* 用于配置交换机和队列对应关系
* 新增消息队列应该按照如下步骤
* 1、增加queue bean,参见queueXXXX方法
* 2、增加queue和exchange的binding
* @author chenhf
* @create 2017-10-23 上午10:33
**/
@Configuration
@AutoConfigureAfter (RabbitMqConfig. class )
public class RabbitMqExchangeConfig {
/** logger */
private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig. class );
/**
* @Author:chenhf
* @Description: 主题型交换机
* @Date:下午5:49 2017/10/23
* @param
* @return
*/
@Bean
TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
rabbitAdmin.declareExchange(contractTopicExchange);
logger.debug( "完成主题型交换机bean实例化" );
return contractTopicExchange;
}
/**
* 直连型交换机
*/
@Bean
DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
rabbitAdmin.declareExchange(contractDirectExchange);
logger.debug( "完成直连型交换机bean实例化" );
return contractDirectExchange;
}
//在此可以定义队列
@Bean
Queue queueTest(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug( "测试队列实例化完成" );
return queue;
}
//topic 1
@Bean
Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug( "话题测试队列1实例化完成" );
return queue;
}
//topic 2
@Bean
Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug( "话题测试队列2实例化完成" );
return queue;
}
//在此处完成队列和交换机绑定
@Bean
Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug( "测试队列与直连型交换机绑定完成" );
return binding;
}
//topic binding1
@Bean
Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug( "测试队列与话题交换机1绑定完成" );
return binding;
}
//topic binding2
@Bean
Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug( "测试队列与话题交换机2绑定完成" );
return binding;
}
}
|
在这里用到枚举类:RabbitMqEnum
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
/**
* 定义rabbitMq需要的常量
*
* @author chenhf
* @create 2017-10-23 下午4:07
**/
public class RabbitMqEnum {
/**
* @param
* @Author:chenhf
* @Description:定义数据交换方式
* @Date:下午4:08 2017/10/23
* @return
*/
public enum Exchange {
CONTRACT_FANOUT( "CONTRACT_FANOUT" , "消息分发" ),
CONTRACT_TOPIC( "CONTRACT_TOPIC" , "消息订阅" ),
CONTRACT_DIRECT( "CONTRACT_DIRECT" , "点对点" );
private String code;
private String name;
Exchange(String code, String name) {
this .code = code;
this .name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
/**
* describe: 定义队列名称
* creat_user: chenhf
* creat_date: 2017/10/31
**/
public enum QueueName {
TESTQUEUE( "TESTQUEUE" , "测试队列" ),
TOPICTEST1( "TOPICTEST1" , "topic测试队列" ),
TOPICTEST2( "TOPICTEST2" , "topic测试队列" );
private String code;
private String name;
QueueName(String code, String name) {
this .code = code;
this .name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
/**
* describe: 定义routing_key
* creat_user: chenhf
* creat_date: 2017/10/31
**/
public enum QueueEnum {
TESTQUEUE( "TESTQUEUE1" , "测试队列key" ),
TESTTOPICQUEUE1( "*.TEST.*" , "topic测试队列key" ),
TESTTOPICQUEUE2( "lazy.#" , "topic测试队列key" );
private String code;
private String name;
QueueEnum(String code, String name) {
this .code = code;
this .name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
}
|
以上完成消息生产者的定义,下面封装调用接口
测试时直接调用此工具类,testUser类需自己实现
1
2
3
|
rabbitMqSender.sendRabbitmqDirect( "TESTQUEUE1" ,testUser);
rabbitMqSender.sendRabbitmqTopic( "lazy.1.2" ,testUser);
rabbitMqSender.sendRabbitmqTopic( "lazy.TEST.2" ,testUser);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
/**
* rabbitmq发送消息工具类
*
* @author chenhf
* @create 2017-10-26 上午11:10
**/
@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
/** logger */
private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender. class );
private RabbitTemplate rabbitTemplate;
@Autowired
public RabbitMqSender(RabbitTemplate rabbitTemplate) {
this .rabbitTemplate = rabbitTemplate;
this .rabbitTemplate.setConfirmCallback( this );
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
logger.info( "confirm: " + correlationData.getId());
}
/**
* 发送到 指定routekey的指定queue
* @param routeKey
* @param obj
*/
public void sendRabbitmqDirect(String routeKey,Object obj) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
logger.info( "send: " + correlationData.getId());
this .rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
}
/**
* 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
* @param routeKey
* @param obj
*/
public void sendRabbitmqTopic(String routeKey,Object obj) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
logger.info( "send: " + correlationData.getId());
this .rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
}
}
|
四、rabbitmq消费者配置
springboot注解方式监听队列,无法手动指定回调,所以采用了实现ChannelAwareMessageListener接口,重写onMessage来进行手动回调,详见以下代码,详细介绍可以在spring的官网上找amqp相关章节阅读
直连消费者
通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
/**
* 消费者配置
*
* @author chenhf
* @create 2017-10-30 下午3:14
**/
@Configuration
@AutoConfigureAfter (RabbitMqConfig. class )
public class ExampleAmqpConfiguration {
@Bean ( "testQueueContainer" )
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames( "TESTQUEUE" );
container.setMessageListener(exampleListener());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean ( "testQueueListener" )
public ChannelAwareMessageListener exampleListener() {
return new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
//通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费
if ( "2" .equals(testUser.getUserName())){
System.out.println(testUser.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false );
}
if ( "1" .equals(testUser.getUserName())){
System.out.println(testUser.toString());
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , true );
}
}
};
}
}
|
topic消费者1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
/**
* 消费者配置
*
* @author chenhf
* @create 2017-10-30 下午3:14
**/
@Configuration
@AutoConfigureAfter (RabbitMqConfig. class )
public class TopicAmqpConfiguration {
@Bean ( "topicTest1Container" )
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames( "TOPICTEST1" );
container.setMessageListener(exampleListener1());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean ( "topicTest1Listener" )
public ChannelAwareMessageListener exampleListener1(){
return new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
System.out.println( "TOPICTEST1:" +testUser.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false );
}
};
}
}
|
topic消费者2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/**
* 消费者配置
*
* @author chenhf
* @create 2017-10-30 下午3:14
**/
@Configuration
@AutoConfigureAfter (RabbitMqConfig. class )
public class TopicAmqpConfiguration2 {
@Bean ( "topicTest2Container" )
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames( "TOPICTEST2" );
container.setMessageListener(exampleListener());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean ( "topicTest2Listener" )
public ChannelAwareMessageListener exampleListener() {
return new ChannelAwareMessageListener() {
@Override
public void
|
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。
原文链接:https://segmentfault.com/a/1190000011797667