python 学习笔记十 rabbitmq(进阶篇)

时间:2024-01-04 14:57:08

RabbitMQ

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

安装

http://erlang.org/download/otp_win64_18.3.exe #依赖包erlang

http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1.exe

pip install pika

简单的通信

import pika
#producer端 #建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #创建channel
channel = connection.channel() #声明queue
channel.queue_declare(queue='testMQ') channel.basic_publish(exchange='',#Producer只能发送到exchange,它是不能直接发送到queue的,发送到默认exchange
routing_key="testMQ",#路由key发送指定队列
body="hello this is test!") #消息 print(" [x] Sent 'hello this is test!'") #关闭连接
connection.close()
import pika

#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #创建channel
channel = connection.channel() #如果生产者先运行并创建了队列这里就可以不用声明,但是有可能消费者先运行 下面的basic_consume就会因为没有队列报错。
channel.queue_declare(queue="testMQ") #定义回调函数用于取出队列中的数据
def callback(ch, method, properties, body):
print(" [x] Received %r" % body) channel.basic_consume(callback,
queue="testMQ",
no_ack=True)
#不用确认消息
print(" [*] Waiting for messages. To exit press Ctrl + C ")
channel.start_consuming()#监听数据

  默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方 式叫做round-robin。

acknowledgment 消息确认

  每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么 非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完 成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。

如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。

在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。

如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

import pika
#producer端 #建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #生成通道
channel = connection.channel() #声明queue 生产者必须声明
channel.queue_declare(queue='testMQ') channel.basic_publish(exchange='',
routing_key="testMQ",
body="hello this is test!") print(" [x] Sent 'hello this is test!'") #关闭连接
connection.close()
#client
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() #如果生产者先运行并创建了队列这里就可以不用声明,但是有可能消费者先运行 下面的basic_consume就会因为没有队列报错。
#channel.queue_declare(queue="testMQ")
#已经创建的队列不是durable再赋值durable也无法改变
channel.queue_declare(queue="task_mq",durable=True) #定义回调函数用于取出队列中的数据
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep()
print("ok")
ch.basic_ack(delivery_tag = method.delivery_tag) #消息持久化 channel.basic_consume(callback,
queue = "task_mq",
no_ack = False)
#no-ack = False,如果生产者遇到情况(its channel is closed, connection is closed,
#or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。但是生产者挂了,消息就没有了!!! print(" [*] Waiting for messages. To exit press Ctrl + C ")
channel.start_consuming()

Message durability消息持久化

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel() #确保队列不丢失
channel.queue_declare(queue='task_mq', durable=True) channel.basic_publish(exchange='',
routing_key='task_mq',
body="will i come back!",
properties=pika.BasicProperties(
delivery_mode = 2
,#make messages persistent
)) print(" [x] Sent 'will i come back!'")
connection.close()
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))

channel = connection.channel()

#如果生产者先运行并创建了队列这里就可以不用声明,但是有可能消费者先运行 下面的basic_consume就会因为没有队列报错。
#channel.queue_declare(queue="testMQ")
#已经创建的队列,再赋值durable是无法改变的,rabbitmq已经再维护它了。
channel.queue_declare(queue="task_mq",durable=True) #定义回调函数用于取出队列中的数据
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep()
print("ok")
ch.basic_ack(delivery_tag = method.delivery_tag) #消息持久化 channel.basic_consume(callback,
queue = "task_mq",
no_ack = False)
#no-ack = False,如果生产者遇到情况(its channel is closed, connection is closed,
#or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
#也就是说如果生产者挂了,消息就没有了!!! print(" [*] Waiting for messages. To exit press Ctrl + C ")
channel.start_consuming()
为了数据不丢失,我们采用了:

    在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。
持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。
持久化Message,理由同上。 但是这样能保证数据100%不丢失吗? 答案是否定的。问题就在与RabbitMQ需要时间去把这些信息存到磁盘上,这个time window虽然短,但是它的确还是有。
在这个时间窗口内如果数据没有保存,数据还会丢失。还有另一个原因就是RabbitMQ并不是为每个Message都做fsync:
它可能仅仅是把它保存到Cache里,还没来得及保存到物理磁盘上。

消息公平分发

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel() #确保队列不丢失
channel.queue_declare(queue='task_mq', durable=True) channel.basic_publish(exchange='',
routing_key='task_mq',
body="will i come back!",
properties=pika.BasicProperties(
delivery_mode = ,#make messages persistent
)) print(" [x] Sent 'will i come back!'")
connection.close()
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel() # make message persistent
channel.queue_declare(queue='testMQ') def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep()
print('ok')
ch.basic_ack(delivery_tag = method.delivery_tag) #告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count=) channel.basic_consume(callback,
queue='testMQ',
no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Publish\Subscribe(消息发布\订阅) 

 之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,

 Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。

  • fanout: 所有bind到此exchange的queue都可以接收消息
  • direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
  • topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
import pika
import sys #发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。
# 所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。 connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() #定义exchange 类型为fanout
channel.exchange_declare(exchange='logs',
type='fanout') #发送消息
message = ' '.join(sys.argv[:]) or "info: Hello World!" channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()

发布者fanout

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() #定义exchange
channel.exchange_declare(exchange='logs',
type='fanout') #定义队列每一个订阅者都有自己的队列,队列名是随机的
#Consumer关闭连接时,这个queue要被deleted。可以加个exclusive的参数。
result = channel.queue_declare(exclusive=True)
#获取队列名
queue_name = result.method.queue #将队列和exchangebanding 默认发布不是直接发送到队列而是先到exchange
#由exchange分发给订阅者
channel.queue_bind(exchange='logs',
queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r" % body) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

订阅者fanout

有选择的接收消息(exchange type=direct) 

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

python 学习笔记十 rabbitmq(进阶篇)

exchange X和两个queue绑定在一起。C1的binding key是error。C2的binding key是info,error和warning。当P publish key是info时,exchange会把它放到C2。如果是error那么就会到C1,C2。

import pika
import sys #连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(
host="localhost"
))
#生成channel
channel = connection.channel() #设置exchange为direct模式
channel.exchange_declare(exchange="direct_logs",
type = "direct") #队列关键字 自己输入或默认info
serverity = sys.argv[] if len(sys.argv) > else "info" #消息 自己输入或默认
message = ' '.join(sys.argv[:]) or "hello world" #发布
channel.basic_publish(exchange="direct_logs",
routing_key=serverity, #根据关键字发送到指定队列
body=message) print(" [x] Sent %r:%r" % (serverity, message))
connection.close()

发布者direct

import pika
import sys #建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
host="localhost"
))
#生成channel
channel = connection.channel() #声明exchang
channel.exchange_declare(exchange="direct_logs",
type="direct") #声明专用队列队列名随机
result = channel.queue_declare(exclusive=True)
#获取队列名
queue_name = result.method.queue #用户定义消息关键字
serverities = sys.argv[:]
if not serverities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[])
sys.exit() for serverity in serverities:
channel.queue_bind(exchange="direct_logs",
queue=queue_name,
routing_key=serverity)#关键字 print(' [*] Waiting for logs. To exit prees CTRL+C') #获取数据
def callback(ch, method, propertes, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

订阅者direct

Topic exchange

对于Message的routing_key是有限制的,不能使任意的。格式是以点号“."分割的字符表。比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') #用户输入关键字定义
routing_key = sys.argv[] if len(sys.argv) > else 'anonymous.info'
#自定义发送消息
message = ' '.join(sys.argv[:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close() #执行
> python rabbitmq_publisher_topic.py nginx.error nginx is down #c2接收
> python rabbitmq_publisher_topic.py login.info koka loggin #c1接收
> python rabbitmq_publisher_topic.py kernel.critical kernel fault #都收不到

发布者_topic

import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
#用户定义接收关键字
binding_keys = sys.argv[:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[])
sys.exit() for binding_key in binding_keys:
  #队列绑定关键字
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C')
#回调处理接收信息
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming() #开始监听 #执行
c1
python rabbitmq_subcriber_topic.py *.info
c2
python rabbitmq_subvriber_topic.py #.erro

订阅者_topic

远程过程调用 RPC

 1. 客户端接口 Client interface

为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class。 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果。代码如下:

fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call()
print(" [.] Got %r" % response)

 2.回调函数队列 Callback queue

总体来说,在RabbitMQ进行RPC远程调用是比较容易的。client发送请求的Message然后server返回响应结果。为了收到响应 client在publish message时需要提供一个”callback“(回调)的queue地址。code如下:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)

 AMQP 预定义了14个属性。它们中的绝大多很少会用到。以下几个是平时用的比较多的:

  • delivery_mode: 持久化一个Message(通过设定值为2)。其他任意值都是非持久化。
  • content_type: 描述mime-type 的encoding。比如设置为JSON编码:设置该property为application/json。
  • reply_to: 一般用来指明用于回调的queue(Commonly used to name a callback queue)。
  • correlation_id: 在请求中关联处理RPC响应(correlate RPC responses with requests)。

 3.关联ID (Correlation ID)

在上个小节里,实现方法是对每个RPC请求都会创建一个callback queue。这是不高效的。幸运的是,在这里有一个解决方法:为每个client创建唯一的callback queue。

这又有其他问题了:收到响应后它无法确定是否是它的,因为所有的响应都写到同一个queue了。上一小节的correlation_id在这种情况下就派上用场了:对于每个request,

    都设置唯一的一个值,在收到响应后,通过这个值就可以判断是否是自己的响应。如果不是自己的响应,就不去处理。

 4.概要(Summary)

python 学习笔记十 rabbitmq(进阶篇)
我们的RPC将会这样执行:
>  当客户端启动后,它创建一个匿名的唯一的回调队列
> 对一个RPC请求, 客户端发送一个消息包含两个属性: reply_to (用来设置回调队列)和 correlation_id(用来为每个请求设置一个唯一标识)
> 请求发送到 rpc_queue队列
> RPC worker( 服务端) 在那个队列中等待请求,当一个请求出现后,服务端就执行一个job并将结果消息发送给客户端,使用reply_to字段中的队列
> 客户端在callback 队列中等待数据, 当一个消息出现后,检查这个correlation_id属性,如果和请求中的值匹配将返回给应用
#server.py
import pika
#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
#生成通道
channel = connection.channel() #声明客户端请求队列
channel.queue_declare(queue='rpc_queue') #定义操作函数
def fib(n):
if n == :
return
elif n == :
return
else:
return fib(n-) + fib(n-) #处理请求
def on_request(ch, method, props, body):
#消息体
n = int(body)
#打印请求
print(" [.] fib(%s)" % (n,))
#回应结果
response = fib(n)
#回应请求
ch.basic_publish(exchange='',
routing_key=props.reply_to,#发送至客户端的回调队列 callback queue
properties=pika.BasicProperties(correlation_id = \
props.correlation_id), #回应消息关联corr_id
#消息体
body=str(response)) #确保客户端发送请求被收到
ch.basic_ack(delivery_tag = method.delivery_tag) #make message persistent #告诉RabbitMQ服务端当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count=) #接收客户端请求,调用on_request函数处理
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming() #监听客户端请求
服务端代码详单简单:
> (4) 和往常一样我们建立一个连接并定义一个队列
> (11) 我们定义了  斐波纳契 函数,假定输入的都是合法正数
> (19) 我们定义了一个回调的 basic_consume, RPC服务的核心。 当收到请求后执行这个函数并返回结果
> (32) 我们可能会执行多个服务端,为了在多个服务端上均匀的分布负荷,我们需要这是 prefetch_count。
#client.py

import pika
import uuid class FibonacciRpcClient(object):
def __init__(self):
#连接rabbitmq
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
#定义通道
self.channel = self.connection.channel()
#定义专用队列,队列名随机,断开连接时删除队列
result = self.channel.queue_declare(exclusive=True)
#获取队列名
self.callback_queue = result.method.queue
#接收服务端回应的callback_queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
#接收消息
def on_response(self, ch, method, props, body):
#服务端回应的correlation_id等于请求的id 接收数据
if self.corr_id == props.correlation_id:
self.response = body
#发起请求
def call(self, n):
self.response = None
#生成corr_id
self.corr_id = str(uuid.uuid4())
#发起请求
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',#发送至rpc_queue队列
properties=pika.BasicProperties(
reply_to = self.callback_queue,#回调队列 #告诉服务端从这个队列回应请求
correlation_id = self.corr_id, #请求关联corr_id
),
body=str(n))#消息
#监听回应消息
while self.response is None:
self.connection.process_data_events()
return int(self.response) fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
#发送请求
response = fibonacci_rpc.call()
print(" [.] Got %r" % response)
客户端代码稍微复杂些:
> (7) 我们建立一个连接,通道并定义一个专门的’callback‘队列用来接收回复
> (16) 我们订阅了“callback”队列,因此我们能够接收 RPC 的返回结果
> (18) ’on_response'  在每个返回中执行的回调是一个简单的job, 对每个返回消息将检查是否correlation_id使我们需要查找的那个ID,如果是,将保存结果到 self.response 并终端consuming循环
> (23) 下一步,我们定义我们的main方法 - 执行实际的RPC请求
> (24) 在这方法中,首先我们生产一个唯一的 correlatin_id 号并保存 -- 'on_response"回调函数将用着号码来匹配发送和接收的消息值
> (25) 下一步,发布请求信息,使用两个属性: reply_to 和 correlation_id
> (32) 这一步我们可以坐等结果的返回
>(33) 最后我们返回结果给用户
执行
server端:
 [x] Awaiting RPC requests
 [.] fib(30)
client端:
[x] Requesting fib()
[.] Got
更多内容可以参考anzhsoft: http://blog.csdn.net/anzhsoft/article/details/19633107
memcache 和 redis 相关内容: http://www.cnblogs.com/wupeiqi/articles/5132791.html