redis实现消息队列

时间:2022-05-28 17:40:39

原文链接:Redis小案例(二):redis实现消息队列

一、任务队列概述

消息队列,顾名思义就是一个用来传递任务的队列。消息队列在开发中十分常见,经常用在页面后台处理需要很长时间的操作时,例如发送邮件、短信以及进行复杂数据运算操作等,这些操作通常会阻塞页面相当长的时间,为了避免用户等待太久,一般会先给用户页面进行相应,然后在后台使用独立的线程或者进程来处理这些复杂的操作。

消息队列分为两个部分,生产者和消费者。生产者负责把任务放进队列,消费者则负责从队列中取出任务执行。最常见的一个场景是:当我们在某个站点注册账号时,一般都会给我们的邮箱发送邮件验证,由于发送邮件比较耗时,并且邮件的实时性要求也不是很高,所以这里就可以使用消息队列来完成。先把发送邮件放到队列中,然后开启另外的一个线程专门读取任务,读取邮件并发送出去。

二、使用redis实现一个简单的任务队列

可以使用redis中的列表来实现一个任务队列,开启两个程序,一个作为生产者使用LPUSH写队列,一个作为消费者使用RPOP读队列,由于消费者并不知道什么时候会有数据过来,所以消费者需要一直循环读取数据。两者的消息使用json进行封装协议传输。

生产者:

# -*- coding: utf8 -*-

""" 生产者模型 """

import json
import redis

# 消息类型
MSG_TYPE_READ_BOOK = 0
MSG_TYPE_PLAY_GAME = 1
MSG_TYPE_SING_SONG = 2


def make_message(m_id, m_type):
    """ 产生一个消息 :param m_id: 消息的id :param m_type: 消息类型 :return: json字符串 """
    mess_dict = {"id": m_id, "type": m_type}
    return json.dumps(mess_dict)


def creator():
    """ 生产消息并放入消息队列 """
    conn = redis.StrictRedis()
    for i in range(1, 10):
        js_data = make_message(i, i % 3)
        print "push message: %s" % js_data
        conn.lpush("msgQueue", js_data)


if __name__ == "__main__":
    creator()

消费者:

# -*- coding: utf8 -*-

""" 消费者模型 """

import json
import redis

# 消息类型
MSG_TYPE_READ_BOOK = 0
MSG_TYPE_PLAY_GAME = 1
MSG_TYPE_SING_SONG = 2


def parse_message(js_data):
    """ 把消息队列中的消息解析成字典 :param js_data: json字符串 :return: 字典 """
    return json.loads(js_data)


def handle_message():
    """ 从消息队列读取消息并执行 """
    conn = redis.StrictRedis()
    print "start handle message!"
    while True:
        msg = conn.rpop("msgQueue")
        if msg is None:
            continue
        msg_dict = parse_message(msg)
        m_id = msg_dict["id"]
        m_type = msg_dict["type"]
        if m_type == MSG_TYPE_PLAY_GAME:
            print "消息%d:我要打游戏" % m_id
        elif m_type == MSG_TYPE_READ_BOOK:
            print "消息%d:我要读书" % m_id
        else:
            print "消息%d:我要唱歌" % m_id


if __name__ == "__main__":
    handle_message()

先运行消费者的代码,会输出一下信息:

start handle message!

由于此时队列没有消息,所以不会有其他的消息被打印,此时运行生产者,生产者会把消息插入到消息队列:

push message: {"type": 1, "id": 1}
push message: {"type": 2, "id": 2}
push message: {"type": 0, "id": 3}
push message: {"type": 1, "id": 4}
push message: {"type": 2, "id": 5}
push message: {"type": 0, "id": 6}
push message: {"type": 1, "id": 7}
push message: {"type": 2, "id": 8}
push message: {"type": 0, "id": 9}

消费者则会读取消息队列中的消息:

start handle message! 消息1:我要打游戏 消息2:我要唱歌 消息3:我要读书 消息4:我要打游戏 消息5:我要唱歌 消息6:我要读书 消息7:我要打游戏 消息8:我要唱歌 消息9:我要读书

二、改进

上面的代码中,消费者在没有读到数据情况下会一直循环读取,对电脑来说十分占资源,此时可以利用redis的阻塞读取命令BRPOP来进行改进,修改消费者代码:

def handle_message():
    ...
    while True:
        msg = conn.brpop("msgQueue")[1]
        msg_dict = parse_message(msg)
        ...

同样也能和上面一样完成同样的功能,只是和上面不同的是,这里读取消息不会一直循环去读取,而是一直阻塞,等到有消息过来才读取。

三、优先级队列

某些时候会有一些需求要把不同的需求根据不同优先级来执行,例如给用户发送邮件的途中突然发现用户账户异常,需要发送短信优先提醒,这时就需要用到优先级队列。

优先级队列依旧使用BRPOP命令完成,BPOP命令后面可以跟多个参数:

BRPOP queue1 queue2

redis会先读取queue1中的数据,只有queue1中的数据读完之后才会读queue2中的数据。

# -*- coding: utf8 -*-

import redis

conn = redis.StrictRedis()


def creator():
    """ 生产者,插入两个队列 """
    msg1 = ["msg1: 1", "msg1: 2", "msg1: 3"]
    msg2 = ["msg2: 1", "msg2: 2", "msg2: 3"]

    for i in msg1:
        conn.lpush("msg1", i)

    for i in msg2:
        conn.lpush("msg2", i)


def customer():
    """ 循环读取消息队列 :return: """
    while True:
        # msg2 优先级高
        msg = conn.brpop(["msg2", "msg1"])
        print msg[1]


if __name__ == "__main__":
    creator()
    customer()

输出:

msg2: 1
msg2: 2
msg2: 3
msg1: 1
msg1: 2
msg1: 3