redis现在只支持对list的阻塞式操作,相关的两个命令是brpop和blpop。
这两个命令在list中有元素时,跟普通的pop没有区别,弹出list的一个元素,然后返回。但在list没有元素时,会为redisClient设置REDIS_BLOCKED标志,然后client阻塞(设置REDIS_BLOCKED标志的redisClient会一直阻塞,参考命令处理章节),一直到新元素加入时(push操作的处理函数pushGenericCommand),才会返回。
这两个命令设置的处理函数brpopCommand和blpopCommand都会调用blockingPopGenericCommand。该函数在检查list中有元素后,会调用非阻塞的popGenericCommand来弹出一个元素,否则调用blockForKeys来处理阻塞的情况。
/* Blocking RPOP/LPOP */ static void blockingPopGenericCommand(redisClient *c, int where) { robj *o; long long lltimeout; time_t timeout; int j; /* Make sure timeout is an integer value */ if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout, "timeout is not an integer") != REDIS_OK) return; /* Make sure the timeout is not negative */ if (lltimeout < 0) { addReplySds(c,sdsnew("-ERR timeout is negative\r\n")); return; } for (j = 1; j < c->argc-1; j++) { o = lookupKeyWrite(c->db,c->argv[j]); if (o != NULL) { if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); return; } else { list *list = o->ptr; if (listLength(list) != 0) { /* If the list contains elements fall back to the usual * non-blocking POP operation */ robj *argv[2], **orig_argv; int orig_argc; /* We need to alter the command arguments before to call * popGenericCommand() as the command takes a single key. */ orig_argv = c->argv; orig_argc = c->argc; argv[1] = c->argv[j]; c->argv = argv; c->argc = 2; /* Also the return value is different, we need to output * the multi bulk reply header and the key name. The * "real" command will add the last element (the value) * for us. If this souds like an hack to you it's just * because it is... */ addReplySds(c,sdsnew("*2\r\n")); addReplyBulk(c,argv[1]); popGenericCommand(c,where); /* Fix the client structure with the original stuff */ c->argv = orig_argv; c->argc = orig_argc; return; } } } } /* If we are inside a MULTI/EXEC and the list is empty the only thing * we can do is treating it as a timeout (even with timeout 0). */ if (c->flags & REDIS_MULTI) { addReply(c,shared.nullmultibulk); return; } /* If the list is empty or the key does not exists we must block */ timeout = lltimeout; if (timeout > 0) timeout += time(NULL); blockForKeys(c,c->argv+1,c->argc-2,timeout); }
blockForKeys会在db->blockingkeys记下client和等待的key的对应关系,然后给client设置REDIS_BLOCKED标志,这样client就一直阻塞了。
static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { dictEntry *de; list *l; int j; --- if (c->fd < 0) return; c->blockingkeys = zmalloc(sizeof(robj*)*numkeys); c->blockingkeysnum = numkeys; c->blockingto = timeout; for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ c->blockingkeys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ de = dictFind(c->db->blockingkeys,keys[j]); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); retval = dictAdd(c->db->blockingkeys,keys[j],l); incrRefCount(keys[j]); assert(retval == DICT_OK); } else { l = dictGetEntryVal(de); } listAddNodeTail(l,c); } /* Mark the client as a blocked client */ c->flags |= REDIS_BLOCKED; server.blpop_blocked_clients++; }
等待的client会一直阻塞,直到有push操作,此时会调用unblockClientWaitingData来解除client的阻塞。
/* Unblock a client that's waiting in a blocking operation such as BLPOP */ // 减少对所阻塞对象的引用 static void unblockClientWaitingData(redisClient *c) { dictEntry *de; list *l; int j; assert(c->blockingkeys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ for (j = 0; j < c->blockingkeysnum; j++) { /* Remove this client from the list of clients waiting for this key. */ de = dictFind(c->db->blockingkeys,c->blockingkeys[j]); assert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) dictDelete(c->db->blockingkeys,c->blockingkeys[j]); decrRefCount(c->blockingkeys[j]); } /* Cleanup the client structure */ zfree(c->blockingkeys); c->blockingkeys = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting * in the input buffer. Note that this is safe even if * unblockClientWaitingData() gets called from freeClient() because * freeClient() will be smart enough to call this function * *after* c->querybuf was set to NULL. */ if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); }
Pingback 引用通告: redis源代码分析16–阻塞式命令 | Linux C++ 中文网