介绍
Redis是一个内存数据结构存储库,用于缓存,高速数据摄取,处理消息队列,分布式锁定等等。
使用Redis优于其他内存存储的优点是Redis提供持久性和数据结构,如列表,集合,有序集和散列。
在本文中,我想简要介绍一下Redis键空间通知。我将解释键空间通知是什么,并演示如何配置Redis以接收它们。然后我将向您展示如何在python中订阅Redis通知。
在我们开始之前,请按照此处所述安装并启动Redis服务器:https://redis.io/topics/quickstart。
启用键空间通知
默认情况下,禁用键空间事件通知。我们可以在redis.conf或redis-cli中启用它们,如下所示:
$ redis-cli config set notify-keyspace-events KEA
OK
该KEA
字符串意味着每一个可能的事件被启用。要查看每个字符的含义,请查看文档。
该CLI可以在特殊模式下,它允许您订阅的频道,以接收邮件的工作。
现在让我们检查事件是否正常:
$ redis-cli --csv psubscribe '*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","*",1
psubscribe '*'
意味着我们想要使用模式订阅所有事件*
。在新的终端输入redis-cli和SET key1
to value1
。
127.0.0.1:6379> set key1 value1
OK
在第一个终端,您将看到:
$ redis-cli --csv psubscribe '*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","*",1
"pmessage","*","__keyspace@0__:key1","set"
"pmessage","*","__keyevent@0__:set","key1
通知正在运行:)
Redis键空间通知
Redis密钥空间通知自2.8.0版开始提供。对于每个更改任何Redis密钥的操作,我们可以配置Redis将消息发布到Pub / Sub。然后我们可以订阅这些通知。值得一提的是,只有在真正修改了密钥时才会生成事件。例如,删除不存在的密钥不会生成事件。
上面你收到了三个事件:
"psubscribe","*",1
"pmessage","*","__keyspace@0__:key1","set"
"pmessage","*","__keyevent@0__:set","key1
第一个事件意味着我们已成功订阅作为回复中第二个元素的通道。1表示我们当前订阅的频道数。第二个事件是密钥空间通知。在密钥空间信道中,我们收到了事件的名称set
作为消息。第三个事件是关键事件通知。在keyevent频道中,我们收到了密钥的名称key1
作为消息。
Redis Pub / Sub
使用Redis的Pub / Sub图层传递事件。
为了订阅频道channel1
和channel2
,客户端发出一个订阅与频道的名称命令:
SUBSCRIBE channel1 channel2
其他客户(发布者)发送到这些频道的消息将由Redis推送到所有订阅的客户端(订阅者)。
Redis Pub / Sub实现支持模式匹配。客户端可以订阅glob样式模式,以便使用PSUBSCRIBE接收发送到与给定模式匹配的通道名称的所有消息。
例如:
PSUBSCRIBE channel*
将接收所有发来的短信channel1
,channel.b
等等。
如果您的发布/订阅客户端断开连接并稍后重新连接,则在客户端断开连接期间传递的所有事件都将丢失。
Redis为每个客户端维护一个客户端输出缓冲区。Pub / Sub的客户端输出缓冲区的默认限制设置为:
client-output-buffer-limit pubsub 32mb 8mb 60
Redis将强制客户端在两种情况下断开连接:如果输出缓冲区增长超过32MB,或者输出缓冲区持续保持8MB数据60秒。
这些迹象表明客户消费数据的速度比发布时慢。
将来有计划允许更可靠的事件传递,但可能会在更一般的层面上解决,要么为Pub / Sub本身带来可靠性,要么允许Lua脚本拦截Pub / Sub消息以执行推送等操作把事件放到一个清单中。
订阅python中的通知
首先我们需要Redis redis-py的python客户端,所以让我们安装它:
$ pip install redis
事件循环
看看下面的代码。它订阅所有键空间通知并打印任何收到的。
import time
from redis import StrictRedis
redis = StrictRedis(host='localhost', port=6379)
pubsub = redis.pubsub()
pubsub.psubscribe('__keyspace@0__:*')
print('Starting message loop')
while True:
message = pubsub.get_message()
if message:
print(message)
else:
time.sleep(0.01)
这就是我们创建Redis连接的方式:
redis = StrictRedis(host='localhost', port=6379)
默认情况下,所有响应都以字节形式返回。用户负责解码它们。如果应解码来自客户端的所有字符串响应,则用户可以将SID_responses = True指定为StrictRedis。在这种情况下,任何返回字符串类型的Redis命令都将使用指定的编码进行解码。
接下来,我们创建一个pubsub对象,该对象订阅一个频道并侦听新消息:
pubsub = redis.pubsub()
pubsub.psubscribe('__keyspace@0__:*')
然后我们通过无限循环等待事件:
while True:
message = pubsub.get_message()
...
如果有数据,get_message()将读取并返回它。如果没有数据,则该方法将返回None。
从pubsub实例读取的每条消息都是一个包含以下键的字典:
-
键入:下列之一:
subscribe
,unsubscribe
,psubscribe
,punsubscribe
,message
,pmessage
- channel:订阅的频道或发布消息的频道
-
pattern:匹配已发布消息的通道的模式(除类型外在所有情况下均为None
pmessage
) - data:消息数据
现在启动python脚本,在另一个终端输入带有值的redis-cli和SET键mykey
myvalue
127.0.0.1:6379> set mykey myvalue
OK
您将看到脚本的以下输出:
$ python subscribe.py
Starting message loop
{'type': 'psubscribe', 'data': 1, 'channel': b'__keyspace@0__:*', 'pattern': None}
{'type': 'pmessage', 'data': b'set', 'channel': b'__keyspace@0__:mykey', 'pattern': b'__keyspace@0__:*'}
回调
也可以注册回调函数来处理已发布的消息。消息处理程序只接受一个参数即消息。要使用消息处理程序订阅通道或模式,请将通道或模式名称作为关键字参数传递,其值为回调函数。当使用消息处理程序在通道或模式上读取消息时,将创建消息字典并将其传递给消息处理程序。在这种情况下,从get_message()返回None值,因为消息已经处理完毕。
import time
from redis import StrictRedis
redis = StrictRedis(host='localhost', port=6379)
pubsub = redis.pubsub()
def event_handler(msg):
print('Handler', msg)
pubsub.psubscribe(**{'__keyspace@0__:*': event_handler})
print('Starting message loop')
while True:
message = pubsub.get_message()
if message:
print(message)
else:
time.sleep(0.01)
127.0.0.1:6379> set mykey myvalue
OK
如您所见,set事件mykey
由event_handler回调处理。
$ python subscribe2.py
Starting message loop
{'pattern': None, 'channel': b'__keyspace@0__:*', 'data': 1, 'type': 'psubscribe'}
Handler {'pattern': b'__keyspace@0__:*', 'channel': b'__keyspace@0__:mykey', 'data': b'set', 'type': 'pmessage'}
单独线程中的事件循环
另一种选择是在单独的线程中运行事件循环:
import time
from redis import StrictRedis
redis = StrictRedis(host='localhost', port=6379)
def event_handler(msg):
print(msg)
thread.stop()
pubsub = redis.pubsub()
pubsub.psubscribe(**{'__keyevent@0__:expired': event_handler})
thread = pubsub.run_in_thread(sleep_time=0.01)
上面的代码创建了一个新线程并启动了事件循环。处理完第一个过期事件后,我们使用该thread.stop()
方法关闭事件循环和线程。
在幕后,这只是一个围绕get_message()的包装器,它在一个单独的线程中运行。run_in_thread()
采用可选sleep_time
参数。如果指定,则事件循环将使用循环的每次迭代中的值调用time.sleep()。
127.0.0.1:6379> set mykey myvalue ex 1
OK
预期产量:
$ python subscribe3.py
{'type': 'pmessage', 'channel': b'__keyevent@0__:expired', 'pattern': b'__keyevent@0__:expired', 'data': b'mykey'}
概要
Redis的一个常见用例是,当应用程序需要能够响应存储在特定密钥或密钥中的值可能发生的更改时。感谢密钥空间通知和Pub / Sub,我们可以响应Redis数据中的更改。通知非常容易使用,而事件处理器可以在地理上分布。
最大的缺点是Pub / Sub实现要求发布者和订阅者一直处于启动状态。订阅服务器在停止或连接丢失时会丢失数据。
链接
- https://redis.io/topics/notifications - Redis密钥空间通知文档
- https://redis.io/topics/pubsub - Redis Pub / Sub文档
- https://github.com/andymccurdy/redis-py- Redis的Python客户端
- https://www.infoworld.com/article/3212768/database/how-to-use-redis-for-real-time-stream-processing.html - 如何使用Redis进行实时流处理
- https://matt.sh/advanced-redis-pubsub-scripts - 将脚本订阅到Pub / Sub通道