这一节主要介绍下val的加载。
对于某些命令,比如get somekey,当运行到processCommand时可能key对应的val不在内存中。在运行命令绑定的处理函数之前,redis会提前加载其val。
在 processCommand中,在vm开启并启用多线程时,会调用 blockClientOnSwappedKeys来加载可能已swap的val,如果blockClientOnSwappedKeys返回0,说明有 swap的val没被加载,则返回不调用call了(此时client会设置 REDIS_IO_WAIT标志,并已放到等待列表中)。代码如下:
static int processCommand(redisClient *c) { --- if (server.vm_enabled && server.vm_max_threads > 0 && blockClientOnSwappedKeys(c,cmd)) return 1; call(c,cmd); --- }
在blockClientOnSwappedKeys函数中,如果命令设置了预加载函数,比如zunionstore和zinterstore就设置了预加载函数 zunionInterBlockClientOnSwappedKeys,则使用设置的预加载函数加载swap的val,否则使用 waitForMultipleSwappedKeys加载swap的val,不过查看 zunionInterBlockClientOnSwappedKeys和waitForMultipleSwappedKeys的实现就可以发现, 这些函数最终都调用waitForSwappedKey。在预加载函数返回后,若client的io_keys链表非空(io_keys是该client 不在内存中的val的链表),则设置client的REDIS_IO_WAIT标志,取消client的read 事件(在这之前,client已放到对应key的阻塞队列中了)。
static int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) { if (cmd->vm_preload_proc != NULL) { cmd->vm_preload_proc(c,cmd,c->argc,c->argv); } else { waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv); } /* If the client was blocked for at least one key, mark it as blocked. */ if (listLength(c->io_keys)) { c->flags |= REDIS_IO_WAIT; aeDeleteFileEvent(server.el,c->fd,AE_READABLE); server.vm_blocked_clients++; return 1; } else { return 0; } }
我们看看waitForMultipleSwappedKeys的实现。waitForMultipleSwappedKeys会根据命令字表中设置的预加载参数,加载需要加载的val。
static void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { int j, last; if (cmd->vm_firstkey == 0) return; last = cmd->vm_lastkey; if (last < 0) last = argc+last; for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) { redisAssert(j < argc); waitForSwappedKey(c,argv[j]); } }
比如我们查看get命令字的设置。后面的1,1,1就是表示加载的val在argv中的位置,每个get命令最多需要预加载1个val。
{"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
waitForSwappedKey 涉及到vm的多线程,建议先粗略理解下,并在阅读vm章节后再返回此处阅读。该函数所做的主要工作就是将(c, key) 加到c->db->io_keys中,而db其实指向全局server的db,然后创建一个job,插入到工作线程中,让工作线程完成val 的加载。
static int waitForSwappedKey(redisClient *c, robj *key) { struct dictEntry *de; robj *o; list *l; /* If the key does not exist or is already in RAM we don't need to * block the client at all. */ de = dictFind(c->db->dict,key); if (de == NULL) return 0; o = dictGetEntryKey(de); if (o->storage == REDIS_VM_MEMORY) { return 0; } else if (o->storage == REDIS_VM_SWAPPING) { /* We were swapping the key, undo it! */ vmCancelThreadedIOJob(o); return 0; } /* OK: the key is either swapped, or being loaded just now. */ /* Add the key to the list of keys this client is waiting for. * This maps clients to keys they are waiting for. */ listAddNodeTail(c->io_keys,key); incrRefCount(key); /* Add the client to the swapped keys => clients waiting map. */ de = dictFind(c->db->io_keys,key); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); retval = dictAdd(c->db->io_keys,key,l); incrRefCount(key); assert(retval == DICT_OK); } else { l = dictGetEntryVal(de); } listAddNodeTail(l,c); /* Are we already loading the key from disk? If not create a job */ if (o->storage == REDIS_VM_SWAPPED) { iojob *j; o->storage = REDIS_VM_LOADING; j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_LOAD; j->db = c->db; j->key = o; j->key->vtype = o->vtype; j->page = o->vm.page; j->val = NULL; j->canceled = 0; j->thread = (pthread_t) -1; lockThreadedIO(); queueIOJob(j); unlockThreadedIO(); } return 1; }
插入工作线程的job在运行完后,会调用 vmThreadedIOCompletedJob,在该函数中会调用handleClientsBlockedOnSwappedKey处理阻塞的 client,而handleClientsBlockedOnSwappedKey所做的主要工作就是将所有val已加载的client放到 server.io_ready_clients中,此时client已ready好了,但还没有加入read事件循环(因为之前因为等待val已删除其 read事件)。
static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { struct dictEntry *de; list *l; listNode *ln; int len; de = dictFind(db->io_keys,key); if (!de) return; l = dictGetEntryVal(de); len = listLength(l); /* Note: we can't use something like while(listLength(l)) as the list * can be freed by the calling function when we remove the last element. */ while (len--) { ln = listFirst(l); redisClient *c = ln->value; if (dontWaitForSwappedKey(c,key)) { /* Put the client in the list of clients ready to go as we * loaded all the keys about it. */ listAddNodeTail(server.io_ready_clients,c); } } }
最后还剩下一个问题,那就是处于server.io_ready_clients的clint会在什么时候增加read事件,从而继续让其接收客户端的输入了。这个工作在beforeSleep函数中完成(前面的事件循环中有详细介绍)。beforeSleep会为server.io_ready_clients中的client增加read事件,调用processInputBuffer处理其输入。
另外注意,如果client需要的val在检查时都在内存中,但当执行命令处理函数时,该val被swap出去了,则只能使用vmLoadObject直接加载了(阻塞方式)。对此种情况,redis的解释是In practical terms this should onlyhappen with SORT BY command or if there is a bug in this function(参考blockClientOnSwappedKeys前的注释)。
Pingback 引用通告: redis源代码分析15–val加载机制 | Linux C++ 中文网