一、任务队列概述
消息队列,顾名思义就是一个用来传递任务的队列。消息队列在开发中十分常见,经常用在页面后台处理需要很长时间的操作时,例如发送邮件、短信以及进行复杂数据运算操作等,这些操作通常会阻塞页面相当长的时间,为了避免用户等待太久,一般会先给用户页面进行相应,然后在后台使用独立的线程或者进程来处理这些复杂的操作。
消息队列分为两个部分,生产者和消费者。生产者负责把任务放进队列,消费者则负责从队列中取出任务执行。最常见的一个场景是:当我们在某个站点注册账号时,一般都会给我们的邮箱发送邮件验证,由于发送邮件比较耗时,并且邮件的实时性要求也不是很高,所以这里就可以使用消息队列来完成。先把发送邮件放到队列中,然后开启另外的一个线程专门读取任务,读取邮件并发送出去。
二、使用redis实现一个简单的任务队列
可以使用redis中的列表来实现一个任务队列,开启两个程序,一个作为生产者使用LPUSH
写队列,一个作为消费者使用RPOP
读队列,由于消费者并不知道什么时候会有数据过来,所以消费者需要一直循环读取数据。两者的消息使用json
进行封装协议传输。
生产者:
# -*- coding: utf8 -*-
""" 生产者模型 """
import json
import redis
# 消息类型
MSG_TYPE_READ_BOOK = 0
MSG_TYPE_PLAY_GAME = 1
MSG_TYPE_SING_SONG = 2
def make_message(m_id, m_type):
""" 产生一个消息 :param m_id: 消息的id :param m_type: 消息类型 :return: json字符串 """
mess_dict = {"id": m_id, "type": m_type}
return json.dumps(mess_dict)
def creator():
""" 生产消息并放入消息队列 """
conn = redis.StrictRedis()
for i in range(1, 10):
js_data = make_message(i, i % 3)
print "push message: %s" % js_data
conn.lpush("msgQueue", js_data)
if __name__ == "__main__":
creator()
消费者:
# -*- coding: utf8 -*-
""" 消费者模型 """
import json
import redis
# 消息类型
MSG_TYPE_READ_BOOK = 0
MSG_TYPE_PLAY_GAME = 1
MSG_TYPE_SING_SONG = 2
def parse_message(js_data):
""" 把消息队列中的消息解析成字典 :param js_data: json字符串 :return: 字典 """
return json.loads(js_data)
def handle_message():
""" 从消息队列读取消息并执行 """
conn = redis.StrictRedis()
print "start handle message!"
while True:
msg = conn.rpop("msgQueue")
if msg is None:
continue
msg_dict = parse_message(msg)
m_id = msg_dict["id"]
m_type = msg_dict["type"]
if m_type == MSG_TYPE_PLAY_GAME:
print "消息%d:我要打游戏" % m_id
elif m_type == MSG_TYPE_READ_BOOK:
print "消息%d:我要读书" % m_id
else:
print "消息%d:我要唱歌" % m_id
if __name__ == "__main__":
handle_message()
先运行消费者的代码,会输出一下信息:
start handle message!
由于此时队列没有消息,所以不会有其他的消息被打印,此时运行生产者,生产者会把消息插入到消息队列:
push message: {"type": 1, "id": 1}
push message: {"type": 2, "id": 2}
push message: {"type": 0, "id": 3}
push message: {"type": 1, "id": 4}
push message: {"type": 2, "id": 5}
push message: {"type": 0, "id": 6}
push message: {"type": 1, "id": 7}
push message: {"type": 2, "id": 8}
push message: {"type": 0, "id": 9}
消费者则会读取消息队列中的消息:
start handle message! 消息1:我要打游戏 消息2:我要唱歌 消息3:我要读书 消息4:我要打游戏 消息5:我要唱歌 消息6:我要读书 消息7:我要打游戏 消息8:我要唱歌 消息9:我要读书
二、改进
上面的代码中,消费者在没有读到数据情况下会一直循环读取,对电脑来说十分占资源,此时可以利用redis的阻塞读取命令BRPOP
来进行改进,修改消费者代码:
def handle_message():
...
while True:
msg = conn.brpop("msgQueue")[1]
msg_dict = parse_message(msg)
...
同样也能和上面一样完成同样的功能,只是和上面不同的是,这里读取消息不会一直循环去读取,而是一直阻塞,等到有消息过来才读取。
三、优先级队列
某些时候会有一些需求要把不同的需求根据不同优先级来执行,例如给用户发送邮件的途中突然发现用户账户异常,需要发送短信优先提醒,这时就需要用到优先级队列。
优先级队列依旧使用BRPOP
命令完成,BPOP
命令后面可以跟多个参数:
BRPOP queue1 queue2
redis会先读取queue1
中的数据,只有queue1
中的数据读完之后才会读queue2
中的数据。
# -*- coding: utf8 -*-
import redis
conn = redis.StrictRedis()
def creator():
""" 生产者,插入两个队列 """
msg1 = ["msg1: 1", "msg1: 2", "msg1: 3"]
msg2 = ["msg2: 1", "msg2: 2", "msg2: 3"]
for i in msg1:
conn.lpush("msg1", i)
for i in msg2:
conn.lpush("msg2", i)
def customer():
""" 循环读取消息队列 :return: """
while True:
# msg2 优先级高
msg = conn.brpop(["msg2", "msg1"])
print msg[1]
if __name__ == "__main__":
creator()
customer()
输出:
msg2: 1
msg2: 2
msg2: 3
msg1: 1
msg1: 2
msg1: 3