static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask){ redisClient *c =(redisClient*)privdata; -- nread = read(fd, buf, REDIS_IOBUF_LEN); --- if(nread){ c->querybuf = sdscatlen(c->querybuf, buf, nread); c->lastinteraction = time(NULL); } else { return; } processInputBuffer(c); }
processInputBuffer只处理不在REDIS_BLOCKED 和REDIS_IO_WAIT状态的client,也就是已ready好的client。另外如果c->bulklen ==-1(对于一般命令,c->bulklen都为-1,对于用multibulk协议传输的命令,下一个函数有更详细的介绍),则按行解析querybuf,并将解析到的参数保存在argv中,然后调用processCommand进行下一步处理,并且如果processCommand返回非0,会继续处理client输入。
static void processInputBuffer(redisClient *c) { again: --- if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; if (c->bulklen == -1) { -- if (p) { --- if (c->argc) { --- if (processCommand(c) && sdslen(c->querybuf)) goto again; } -- } } else { -- int qbl = sdslen(c->querybuf); if (processCommand(c) && sdslen(c->querybuf)) goto again; return; } }
processCommand这段代码对multi-bulk 协议的解析写得真不敢恭维,转来转去的,真没劲。参看代码中的解释。解析完multibulk后,如果输入的命令是quit,则表示客户端退出了,释放其连接,返回0,表示不用继续处理了。接着使用 lookupCommand查看命令在cmdTable中对应的命令项,然后又是multbulk,接着检查安全认证情况,接着检查内存使用(前面内存章节中有介绍),接着查看 pubsub_channels、pubsub_patterns长度是否为0,若不为0,则表示处于订阅模式下(后文介绍),只允许命令 subscribeCommand、unsubscribeCommand、psubscribeCommand、 punsubscribeCommand。接着如果client处于事务模式下,则在命令不是execCommand、discardCommand的情况下将命令排队(事务处理后文也有介绍)。接着看看是否需要预先加载key,最后终于来到call函数中调用命令了。
static int processCommand(redisClient *c) { struct redisCommand *cmd; --- // 第一个字符是 * 表示后面是multi-bulk协议格式 // 解析得到后面的data 项数 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') { c->multibulk = atoi(((char*)c->argv[0]->ptr)+1); if (c->multibulk <= 0) { resetClient(c); return 1; } else { decrRefCount(c->argv[c->argc-1]); c->argc--; return 1; } } else if (c->multibulk) { // 解析时对于普通的命令: c->bulklen始终 = -1, //前面已获得 c->multibulk值, c->bulklen一开始为-1,随后在 if (c->bulklen == -1) 中置为需要读取的字符个数,然后返回到processInputBuffer的else中处理得到输入的参数,然后再到这儿时就会进入 if (c->bulklen == -1) 的else中,将参数保存到mbargv中,这样一直到 c->multibulk为0,才解析完multibulk协议,进行下一步处理。 if (c->bulklen == -1) { if (((char*)c->argv[0]->ptr)[0] != '$') { addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n")); resetClient(c); return 1; } else { int bulklen = atoi(((char*)c->argv[0]->ptr)+1); decrRefCount(c->argv[0]); if (bulklen < 0 || bulklen > 1024*1024*1024) { c->argc--; addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); resetClient(c); return 1; } c->argc--; c->bulklen = bulklen+2; /* add two bytes for CR+LF */ return 1; } } else { c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1)); c->mbargv[c->mbargc] = c->argv[0]; c->mbargc++; c->argc--; c->multibulk--; if (c->multibulk == 0) { robj **auxargv; int auxargc; /* Here we need to swap the multi-bulk argc/argv with the * normal argc/argv of the client structure. */ auxargv = c->argv; c->argv = c->mbargv; c->mbargv = auxargv; auxargc = c->argc; c->argc = c->mbargc; c->mbargc = auxargc; /* We need to set bulklen to something different than -1 * in order for the code below to process the command without * to try to read the last argument of a bulk command as * a special argument. */ c->bulklen = 0; /* continue below and process the command */ } else { c->bulklen = -1; return 1; } } } /* -- end of multi bulk commands processing -- */ --- if (!strcasecmp(c->argv[0]->ptr,"quit")) { freeClient(c); return 0; } --- cmd = lookupCommand(c->argv[0]->ptr); if (!cmd) { addReplySds(c, sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n", (char*)c->argv[0]->ptr)); resetClient(c); return 1; } else if ((cmd->arity > 0 && cmd->arity != c->argc) || (c->argc < -cmd->arity)) { addReplySds(c, sdscatprintf(sdsempty(), "-ERR wrong number of arguments for '%s' command\r\n", cmd->name)); resetClient(c); return 1; } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { /* This is a bulk command, we have to read the last argument yet. */ int bulklen = atoi(c->argv[c->argc-1]->ptr); decrRefCount(c->argv[c->argc-1]); if (bulklen < 0 || bulklen > 1024*1024*1024) { c->argc--; addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); resetClient(c); return 1; } c->argc--; c->bulklen = bulklen+2; /* add two bytes for CR+LF */ --- if ((signed)sdslen(c->querybuf) >= c->bulklen) { c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2); c->argc++; c->querybuf = sdsrange(c->querybuf,c->bulklen,-1); } else { /* Otherwise return... there is to read the last argument * from the socket. */ return 1; } } /* Let's try to encode the bulk object to save space. */ if (cmd->flags & REDIS_CMD_BULK) c->argv[c->argc-1] = tryObjectEncoding(c->argv[c->argc-1]); /* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); resetClient(c); return 1; } if (server.maxmemory) freeMemoryIfNeeded(); if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) && zmalloc_used_memory() > server.maxmemory) { addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n")); resetClient(c); return 1; } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0) && cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand && cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) { addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n")); resetClient(c); return 1; } /* Exec the command */ if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) { queueMultiCommand(c,cmd); addReply(c,shared.queued); } else { if (server.vm_enabled && server.vm_max_threads > 0 && blockClientOnSwappedKeys(c,cmd)) return 1; call(c,cmd); } /* Prepare the client for the next command */ resetClient(c); return 1; }
call函数首先调用命令字绑定的处理函数,返回时检查是否修改数据,若有修改,则在aof启用的情况下,写aof log,并在数据改变或者强制复制的情况下向slaves复制,最后向monitors发送当前命令及参数。
/* Call() is the core of Redis execution of a command */ static void call(redisClient *c, struct redisCommand *cmd) { long long dirty; dirty = server.dirty; cmd->proc(c); dirty = server.dirty-dirty; if (server.appendonly && dirty) feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc); if ((dirty || cmd->flags & REDIS_CMD_FORCE_REPLICATION) && listLength(server.slaves)) replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc); if (listLength(server.monitors)) replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); server.stat_numcommands++; }
struct redisCommand { char *name; redisCommandProc *proc; int arity; int flags; /* Use a function to determine which keys need to be loaded * in the background prior to executing this command. Takes precedence * over vm_firstkey and others, ignored when NULL */ redisVmPreloadProc *vm_preload_proc; /* What keys should be loaded in background when calling this command? */ int vm_firstkey; /* The first argument that's a key (0 = no keys) */ int vm_lastkey; /* THe last argument that's a key */ int vm_keystep; /* The step between first and last key */ };
Pingback 引用通告: redis源代码分析14–命令处理的一般过程 | Linux C++ 中文网