redis的发布/订阅(publish/subscribe)功能类似于传统的消息路由功能,发布者发布消息,订阅者接收消息,沟通发布者和订阅者之间的桥梁是订阅的channel或者pattern。发布者向指定的publish或者pattern发布消息,订阅者阻塞在订阅的channel或者pattern。可以看到,发布者不会指定哪个订阅者才能接收消息,订阅者也无法只接收特定发布者的消息。这种订阅者和发布者之间的关系是松耦合的,订阅者不知道是谁发布的消息,发布者也不知道谁会接收消息。
redis的发布/订阅功能主要通过SUBSCRIBE、UNSUBSCRIBE、PSUBSCRIBE、PUNSUBSCRIBE 、PUBLISH五个命令来表现。其中SUBSCRIBE、UNSUBSCRIBE用于订阅或者取消订阅channel,而PSUBSCRIBE、PUNSUBSCRIBE用于订阅或者取消订阅pattern,发布消息则通过publish命令。
对于发布/订阅功能的实现,我们先来看看几个与此相关的结构。
struct redisServer { --- /* Pubsub */ dict *pubsub_channels;/* Map channels to list of subscribed clients */ list *pubsub_patterns;/* A list of pubsub_patterns */ --- } typedef struct redisClient { --- dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ } redisClient;
在redis的全局server变量(redisServer类型)中,channel和订阅者之间的关系用字典pubsub_channels来保存,特定channel和所有订阅者组成的链表构成pubsub_channels字典中的一项,即字典中的每一项可表示为(channel,订阅者链表);pattern和订阅者之间的关系用链表pubsub_patterns来保存,链表中的每一项可表示成(pattern,redisClient)组成的字典。
在特定订阅者redisClient的结构中,pubsub_channels保存着它所订阅的channel的字典,而订阅的模式则保存在链表pubsub_patterns中。
从上面的解释,我们再来看看订阅/发布命令的最坏时间复杂度(注意字典增删查改一项的复杂度为O(1),而链表的查删复杂度为O(N),从链表尾部增加一项的复杂度为O(1))。
SUBSCRIBE:
订阅者用SUBSCRIBE订阅特定channel,这需要在订阅者的redisClient结构中的pubsub_channels增加一项(复杂度为 O(1)),然后在redisServer 的pubsub_channels找到该channel(复杂度为O(1)),并在该channel的订阅者链表的尾部增加一项(复杂度为O(1),注意,如果pubsub_channels中没找到该channel,则插入的复杂度也同为O(1)),因此订阅者用SUBSCRIBE订阅特定 channel的最坏时间复杂度为O(1)。
UNSUBSCRIBE:
订阅者取消订阅时,需要先从订阅者的redisClient结构中的pubsub_channels删除一项(复杂度为O(1)),然后在 redisServer 的pubsub_channels找到该channel(复杂度为O(1)),然后在channel的订阅者链表中删除该订阅者(复杂度为O(1)),因此总的复杂度为O(N),N为特定channel的订阅者数。
PSUBSCRIBE:
订阅者用PSUBSCRIBE订阅pattern时,需要先在redisClient结构中的pubsub_patterns先查找是否已存在该 pattern(复杂度为O(N)),并在不存在的情况下往redisClient结构中的pubsub_patterns和redisServer结构中的pubsub_patterns链表尾部各增加一项(复杂度都为O(1)),因此,总的复杂度为O(N),其中N为订阅者已订阅的模式。
PUNSUBSCRIBE:
订阅者用PUNSUBSCRIBE取消对pattern的订阅时,需要先在redisClient结构中的pubsub_patterns链表中删除该 pattern(复杂度为O(N)),并在redisServer结构中的pubsub_patterns链表中删除订阅者和pattern组成的映射(复杂度为O(M),因此,总的复杂度为O(N+M),其中N为订阅者已订阅的模式,而M为系统中所有订阅者和所有pattern组成的映射数。
PUBLISH:
发布消息时,只会向特定channel发布,但该channel可能会匹配某个pattern。因此,需要先在redisServer结构中的 pubsub_channels找到该channel的订阅者链表(O(1)),然后发送给所有订阅者(复杂度为O(N)),然后查看 redisServer结构中的pubsub_patterns链表中的所有项,看channel是否和该项中的pattern匹配(复杂度为O(M))(注意,这并不包括模式匹配的复杂度),因此,总的复杂度为O(N+M),。其中N为该channel的订阅者数,而M为系统中所有订阅者和所有 pattern组成的映射数。另外,从这也可以看出,一个订阅者是可能多次收到同一个消息的。
解释了发布/订阅的算法后,其代码就好理解了,这里仅给出PUBLISH命令的处理函数publishCommand的代码,更多相关命令的代码请参看redis的源代码。
static void publishCommand(redisClient *c) { int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); addReplyLongLong(c,receivers); } /* Publish a message */ static int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; struct dictEntry *de; listNode *ln; listIter li; /* Send to clients listening for that channel */ de = dictFind(server.pubsub_channels,channel); if (de) { list *list = dictGetEntryVal(de); listNode *ln; listIter li; listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { redisClient *c = ln->value; addReply(c,shared.mbulk3); addReply(c,shared.messagebulk); addReplyBulk(c,channel); addReplyBulk(c,message); receivers++; } } /* Send to clients listening to matching channels */ if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulk4); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; } } decrRefCount(channel); } return receivers; }
最后提醒一下,处于发布/订阅模式的client,是无法发布上述五种命令之外的命令(quit除外),这是在processCommand函数中检查的,可以参看前面命令处理章节对该函数的解释。
Pingback 引用通告: redis源代码分析20–发布/订阅 | Linux C++ 中文网