【Spring】使用Spring和AMQP发送接收消息(上)

时间:2022-12-15 16:47:40

讲AMQP之前,先讲下传统的JMS的消息模型,JMS中主要有三个参与者:消息的生产者、消费者、传递消息的通道(队列或者主题),两种消息模型如下:
通道是队列:

通道是队列:

【Spring】使用Spring和AMQP发送接收消息(上)

通道是主题:

【Spring】使用Spring和AMQP发送接收消息(上)

在JMS中,虽然通道有助于解耦消息的生产者和消费者,但这两者依然会与通道相耦合。生产者会将消息发布到一个特定的队列或主题上,消费者从特定的队列或主题上接收这些消息,通道具有双重责任,就是传递数据和确定这些消息该发送到什么地方,队列的话会使用点对点算法发送,主题的话就使用发布-订阅方式。
而使用AMQP的话,生产者并不会直接将消息发布到队列中,AMQP的消息的生产者以及传递消息的队列之间引入间接机制Exchange,Exchange再与队列绑定。关系图如下:

【Spring】使用Spring和AMQP发送接收消息(上)

从图可以看出Exchange收到消息后,Exchange会将信息路由到队列上,消费者再从队列中取数据并处理。这里Exchange不是简单地把消息传递到队列中,AMQP定义了四种不同类型的Exchange,每种都有不同的路由算法,以此决定是否将信息放到队列中。根据Exchange算法的不同,它可能会使用消息的routing key和/或参数,并将其与Exchange和队列之间的binding和routing key和参数进行对比,如果对比结果满足相应的算法,那么消息就路由到该队列上。

AMQP中定义的四种不同类型的Exchange:

Direct:如果消息的routing key与binding的routing key直接匹配的话,消息将会路由到该队列上;
Topic:如果消息的routing key与binding的routing key符合通配符匹配的话,消息将会路由到该队列上;
Headers:如果消息参数表中的头信息和值都与binding参数表相匹配,消息将会路由到该队列上;
Fanout:不管消息的routing key和参数表的头信息/值是什么,消息将会路由到所有队列上。
这里要深入了解AMQP的到www.amqp.org查看,下面使用Spring、AMQP实现发送、接收消息。

先配置Spring支持AMQP消息

使用Spring AMQP前要先配置一个连接工厂,具体来讲,这里选择配置RabbitMQ连接工厂。RabbitMQ实现了AMQP,也是目前较常用的消息代理。使用RabbitMQ发送接收消息前,要先安装RabbitMQ,具体安装步骤可以在www.rabbitmq.com/download.html上找到安装指南,这里就不详细展开。

配置RabbitMQ连接工厂最简单的方式是使用Spring AMQP所提供的rabbit配置命名空间,要使用该功能,首先要确保Spring配置文件中声明了该模式:

<?xml version="1.0" encoding="UTF-8"?>  
<beans:beans xmlns="http://www.springframework.org/schema/beans"
xmlns:rabbit
="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation
="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"
>
...
</beans:beans>

这个配置不是必须的,这里选择将rabbit作为首选命名空间,将beans作为第二的命名空间,因为在这个配置中,会更多的声明rabbit而不是bean,这样的话,只会有少量的bean元素使用“beans:”前缀,而rabbit元素就能避免使用前缀了。

rabbit命名空间包含了多个在Spring中配置RabbitMQ的元素,用得最多的就是<connection-factory>,使用时最好给其设置一个bean ID,不然就难将连接工厂装配到需要它的bean中了。

<connectioin-factory id="connectionFactory">

连接工厂默认情况下会设置RabbitMQ服务器监听localhost的5672端口,用户名密码都为guest,一般在开发环境可以不作修改,在生产环境的修改方式如下 :

<connectioin-factory id="connectionFactory" 
host
="${rabbitmq.host}"
post
="${rabbitmq.post}"
username
="${rabbitmq.username}"
password
="${rabbitmq.password}"
/>

这样设置的好处是具体值可以到属性文件中配置。

接下来看看如何创建队列、Exchange、binding

在传统的JMS中,队列和主题的路由行为是通过规范建立的,而AMQP依赖于如何定义队列和Exchange以及如何将它们绑定在一起。声明队列、Exchange和binding的一种方式是使用RabbitMQ Channel接口的各种方法,不过这里使用rabbit命名空间更加方便,它包含了多个元素,可以帮我们声明队列、Exchange以及将它们结合在一起的binding。元素如下:

【Spring】使用Spring和AMQP发送接收消息(上)

这些配置元素要与<admin>元素一起使用,<admin>元素会创建一个RabbitMQ管理组件,如果上述表格中的元素在RabbitMQ代理中尚未存在的话,<admin>会自动创建它们。
比如现在想声明名为spittle.test.queue的队列,只需要在Spring配置中添加如下配置即可:

<admin connection-factory="connectionFactory" />
<queue id="spittleTestQueue" name="spittle.test" />

当如此配置时,Exchange默认是一个没有名称的direct Exchange,所有队列都会绑定到这个Exchange中,并且routing key与队列的名称相同。在这样的配置下,我们可以将消息发送到这个没有名称的Exchange上,并将routing key设定为spittle.test.queue,这样消息就会路由到这个队列中(其实这里就类似于JMS的点对点模型)。
其他有意思的路由需要我们自行声明一个或更多的Exchange,并将其绑定到队列上,比如不管routing key是什么, 要将消息路由到多个队列上,可以按照如下方式配置一个fanout以及多个队列:

<admin connection-factory="connectionFactory" />
<queue name="spittle.test.queue.1" >
<queue name="spittle.test.queue.2" >
<queue name="spittle.test.queue.3" >
<fanout-exchange name="spittle.fanout">
<bindings>
<binding queue="spittle.test.queue.1" />
<binding queue="spittle.test.queue.2" />
<binding queue="spittle.test.queue.3" />
</bindings>
</fanout-exchange>

其他类型的Exchange读者可以根据上面的表格自行尝试,下一篇将继续写如何发送消息。