上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务
channel.basic_qos(prefetch_count=1)
带消息持久化+公平分发的完整代码
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.queue_declare(queue='task_queue', durable=True) 9 10 message = ' '.join(sys.argv[1:]) or "Hello World!" 11 channel.basic_publish(exchange='', 12 routing_key='task_queue', 13 body=message, 14 properties=pika.BasicProperties( 15 delivery_mode = 2, # make message persistent 16 )) 17 print(" [x] Sent %r" % message) 18 connection.close()
1 import pika 2 import time 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.queue_declare(queue='task_queue', durable=True) 9 print(' [*] Waiting for messages. To exit press CTRL+C') 10 11 def callback(ch, method, properties, body): 12 print(" [x] Received %r" % body) 13 time.sleep(body.count(b'.')) 14 print(" [x] Done") 15 ch.basic_ack(delivery_tag = method.delivery_tag) 16 17 channel.basic_qos(prefetch_count=1) 18 channel.basic_consume(callback, 19 queue='task_queue') 20 21 channel.start_consuming()