1.1本部分内容简介
这部分我们将要发送一个消息到多个Consumer,这部分称之为“publish/subscribe”
我们实现的方式就是发送端,发送一个消息,与此同时,多个接收端将同时接收到消息并打印在屏幕上面。
1.2exchange简介
在前面的博文中,我们的讲解是:发送端发送消息至消息队列,接收端从消息队列获取消息。现在我们来介绍一下rabbitmq的完整消息传送模型。
>Producer:用来发送消息的应用程序
>queue:用来存储消息的缓存
>Consumer:用来接收消息的应用程序
消息传送模型的核心是,Producer从不会直接将消息传送给queue,而是,将消息传送给exchange,exchange是个很简单的东西,在一侧,他接收来自Producer的消息,另一侧将消息传送给queue。exchange将消息传送给你个queue,还是传送给多个queue,这主要是由exchange的type决定。模型图如下:
exchange有很多type可用,如下:direct、topic、headers、fanout。本博客针对fanout讲解,后续博文对其他类型有所讲解,让我们创建一个exchange,type为fanout,名字为logs,代码如下:
channel.exchange(exchange='logs',type='fanout')
对于type为fanout的exchange,理解起来非常简单,它将接收到的消息,广播给他所知道的所有的queue,即所有和他建立连接的queue。前面的博文降到了命令行查看list_exchanges的命令如下:
$ :sudo rabbitmqctl list_exchanges Listing exchanges ... logs fanout amq.direct direct amq.topic topic amq.fanout fanout amq.headers headers ...done.
对于上图中,你会看到很多amq.*的exchange,这些是系统默认建立的,在你不建立exchange时,系统默认建立上面几个。
对于消息的发布函数basic_publish()也随之变为:
channel.basic_publish(exchange='logs',routing_key='',body=message)
1.3临时queue
正如你前面学到的,对于一个queue,会有自己的名字(hello什么的),
首先:
result = channel.queue_declare()
然后通过result.method.queue,系统会随机给queue命名。
如果我们想Producer与Consumer断开连接时,队列queue删除,那么需要改成下面的代码:
result = channel.queue_declare(exclusive=True)
1.4Bingings(将queue与exchange绑定)
模型图如下:
我们已经创建了一个type为fanout的exchange,现在,我们要告诉exchange,将消息发送给我们自己定义的queue,在exchange与queue之间建立连接的是binding,代码如下:
channel.queue_bind(exchange='logs',queue=result.method.queue)
在命令行查看binding的列表,命令如下:
$: sudo rabbitmqctl list_bindings
1.5最终代码
最终的模型如下:
send.py代码如下:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" #如果键盘有输入,message为键盘输入,如果键盘没有输入,消息message="info: Hello World!"; channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()
receive.py代码
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
1.6代码测试
开启一个命令行窗口,运行send.py:
$: python send.py #(此时你传送的内容为info: Hello World!)或者
$: python send.py message #message为你想发送的内容
开启两个命令行窗口,分别运行receive.py,两个窗口你会看到有相同的消息输出:
$: python receive.py