最近项目需要一个可以实现发布/订阅 机制的消息队列, 首先想到的是Kafka, RabbitMQ之类的消息队列组件, 但是感觉我们的项目也许不需要引入那么复杂的组件, Redis也有一个比较轻量的订阅机制。 我们可以参考redis的 Publish/Subscribe 机制, 来得到比较好的问题解决方案。
publish/subscribe的用法
redis提供了如下6个命令来支持该功能:
序号 | 命令 | 描述 |
---|---|---|
1 | PSUBSCRIBE pattern [pattern …] | 订阅一个或多个符合给定模式的频道 |
2 | PUBSUB subcommand [argument [argument …]] | 查看订阅与发布系统状态 |
3 | PUBLISH channel message | 将消息发送到指定的频道 |
4 | PUNSUBSCRIBE [pattern [pattern …]] | 退订所有给定模式的频道 |
5 | SUBSCRIBE channel [channel …] | 订阅给定的一个或多个频道的信息 |
6 | UNSUBSCRIBE [channel [channel …]] | 指退订给定的频道 |
- 客户端可以一次性订阅一个或者多个channel,SUBSCRIBE channel1 channel2 channel3;
- PUBSUB返回当前publish/subscribe 系统的内部命令的活动状态, 内部命令包括:channels(列出当前活跃的channel),NUMSUB(返回指定channel的订阅数目),NUMPAT(返回订阅pattern的订阅数);
- 订阅多个channel,通配符 * 可以匹配上面所有的channel, PSUBSCRIBE chan* ;
- 消息发布,PUBLISH channel2 hello-test;
- 取消某一个channel消息订阅, UNSUBSCRIBE channel1;
- 取消某个pattern的消息订阅, PUNSUBSCRIBE chan* ;
publish/subscribe 的实现代码分析
基本上所有的代码都在pubsub.c里面, 都是通郭一个字典和一个链表来实现的, 字典里面包含了从一个channel名字, 关联到channel对应的订阅clients; 对于pattern模式的订阅, 使用了链表来保存所有的pattern, 以及pattern对应的订阅者。
struct redisClient {
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
}
struct redisServer {
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of REDIS_NOTIFY... flags. */
}
subscribe 实现
现将channel添加都redisClient.pubsub_channels字典内, 然后去RedisServer.pubsub_channels字典内驱查询, 如果没有该channel, 就添加一个到字典里面, 如果已经存在, 返回当前的值;最后返回客户端;
void subscribeCommand(redisClient *c) {
int j;
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= REDIS_PUBSUB;
}
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
struct dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
unsubscribe
将channel在RedisClient.pubsub_channels字典内的KV对删除, 用channel去redisServer.pubsub_channels 字典内查找该channel对应的clients的链表ln, 将指定的clients删除掉, 如果ln内的所有元素都被删除了, 就删除该字典内的KV对。
void unsubscribeCommand(redisClient *c) {
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB;
}
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
struct dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
/* Remove the channel from the client -> channels hash table */
incrRefCount(channel); /* channel may be just a pointer to the same object
we have in the hash tables. Protect it... */
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
/* Remove the client from the channel -> clients list hash table */
de = dictFind(server.pubsub_channels,channel);
redisAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
redisAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
dictDelete(server.pubsub_channels,channel);
}
}
/* Notify the client */
if (notify) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
psubscribe 与punsubscribe
这两个与subscribe, unsubscribe的实现基本相同, 不同地方主要是其对应的KV对存在pubsub_patterns 链表里面;
publish
publish的实现是遍历server端的pubsub_channels 字典以及pubsub_patterns链表, 将message发送给他们对应的client;
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
struct dictEntry *de;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
/* Send to clients listening to matching channels */
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
pubsub
pubsub主要是查看订阅-发布系统的内部活动的状态, 相当于一个该系统的一个统计命令, 透过它, 用户可以检查当前的系统的发布订阅的状况。
void pubsubCommand(redisClient *c) {
if (!strcasecmp(c->argv[1]->ptr,"channels") &&
(c->argc == 2 || c->argc ==3))
{
/* PUBSUB CHANNELS [<pattern>] */
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
dictIterator *di = dictGetIterator(server.pubsub_channels);
dictEntry *de;
long mblen = 0;
void *replylen;
replylen = addDeferredMultiBulkLength(c);
while((de = dictNext(di)) != NULL) {
robj *cobj = dictGetKey(de);
sds channel = cobj->ptr;
if (!pat || stringmatchlen(pat, sdslen(pat),
channel, sdslen(channel),0))
{
addReplyBulk(c,cobj);
mblen++;
}
}
dictReleaseIterator(di);
setDeferredMultiBulkLength(c,replylen,mblen);
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
int j;
addReplyMultiBulkLen(c,(c->argc-2)*2);
for (j = 2; j < c->argc; j++) {
list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
addReplyBulk(c,c->argv[j]);
addReplyLongLong(c,l ? listLength(l) : 0);
}
} else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
/* PUBSUB NUMPAT */
addReplyLongLong(c,listLength(server.pubsub_patterns));
} else {
addReplyErrorFormat(c,
"Unknown PUBSUB subcommand or wrong number of arguments for '%s'",
(char*)c->argv[1]->ptr);
}
}
总结
redis的Publish/Subscribe机制, 能够比较方便实现将消息从一个客户端传递到一个或者几个客户端的基本功能, 但是从上面的代码也能看出来, 它存在着一些问题:
- 无法保证发布的消息一定能被订阅者收到;
- 重启之后, 在重启过程中的消息丢失;