这一节我们简略介绍下client连接的3个核心函数。涉及的很多参数跟redis的诸多特性有关,可在阅读后续章节后返回查看其相关值。
redis将新client的连接状态保存在redisClient结构体中,该结构体的代码如下:
typedef struct redisClient { int fd; // client对应的文件描述符 redisDb *db; // 当前操作的db int dictid; // sds querybuf; // 请求字符串 robj **argv, **mbargv; // 解析请求后的参数数组(mbargv用于multi命令) int argc, mbargc; int bulklen; /* bulk read len. -1 if not in bulk read mode */ int multibulk; /* multi bulk command format active */ list *reply; int sentlen; time_t lastinteraction; /* time of the last interaction, used for timeout */ int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */ int slaveseldb; /* slave selected db, if this client is a slave */ int authenticated; /* when requirepass is non-NULL */ int replstate; /* replication state if this is a slave */ int repldbfd; /* replication DB file descriptor */ long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ robj **blockingkeys; /* The key we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ int blockingkeysnum; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time * is >= blockingto then the operation timed out. */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ } redisClient;
新建client状态是用createClient函数完成,释放是用freeClient,超时是用closeTimedoutClients。
createClient 在上一节中的acceptHandler中被调用,一开始会将client设置成非阻塞、非延迟的,然后设置client有read事件时的处理函数 readQueryFromClient,接着默认选择db 0,清空命令缓冲区和参数数组等,最后将client加到全局的server.clients中。详细代码如下:
static redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(*c)); anetNonBlock(NULL,fd); anetTcpNoDelay(NULL,fd); if (!c) return NULL; if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } selectDb(c,0); c->fd = fd; c->querybuf = sdsempty(); c->argc = 0; c->argv = NULL; c->bulklen = -1; c->multibulk = 0; c->mbargc = 0; c->mbargv = NULL; c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); c->authenticated = 0; c->replstate = REDIS_REPL_NONE; c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); c->blockingkeys = NULL; c->blockingkeysnum = 0; c->io_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); listSetFreeMethod(c->pubsub_patterns,decrRefCount); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); listAddNodeTail(server.clients,c); initClientMultiState(c); return c; }
freeClient用于释放client连接状态,在redis接收的client数超过了配置的 server.maxclients、读写client信息出错时、client连接超时时,都会被调用。
static void freeClient(redisClient *c) { listNode *ln; /* Note that if the client we are freeing is blocked into a blocking * call, we have to set querybuf to NULL *before* to call * unblockClientWaitingData() to avoid processInputBuffer() will get * called. Also it is important to remove the file events after * this, because this call adds the READABLE event. */ // 释放querybuf sdsfree(c->querybuf); c->querybuf = NULL; if (c->flags & REDIS_BLOCKED) unblockClientWaitingData(c); /* Unsubscribe from all the pubsub channels */ // 释放channel pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllPatterns(c,0); dictRelease(c->pubsub_channels); listRelease(c->pubsub_patterns); /* Obvious cleanup */ aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); listRelease(c->reply); freeClientArgv(c); close(c->fd); /* Remove from the list of clients */ ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); /* Remove from the list of clients waiting for swapped keys, or ready * to be restarted, but not yet woken up again. */ // 从io等待队列中删除client if (c->flags & REDIS_IO_WAIT) { redisAssert(server.vm_enabled); if (listLength(c->io_keys) == 0) { ln = listSearchKey(server.io_ready_clients,c); /* When this client is waiting to be woken up (REDIS_IO_WAIT), * it should be present in the list io_ready_clients */ redisAssert(ln != NULL); listDelNode(server.io_ready_clients,ln); } else { while (listLength(c->io_keys)) { ln = listFirst(c->io_keys); dontWaitForSwappedKey(c,ln->value); } } server.vm_blocked_clients--; } listRelease(c->io_keys); /* Master/slave cleanup */ if (c->flags & REDIS_SLAVE) { if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) close(c->repldbfd); list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; ln = listSearchKey(l,c); redisAssert(ln != NULL); listDelNode(l,ln); } if (c->flags & REDIS_MASTER) { server.master = NULL; server.replstate = REDIS_REPL_CONNECT; } /* Release memory */ zfree(c->argv); zfree(c->mbargv); freeClientMultiState(c); zfree(c); }
closeTimedoutClients在serverCron中被调用。
static void closeTimedoutClients(void) { redisClient *c; listNode *ln; time_t now = time(NULL); listIter li; listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { c = listNodeValue(ln); if (server.maxidletime && !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */ dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ listLength(c->pubsub_patterns) == 0 && (now - c->lastinteraction > server.maxidletime)) { redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); } else if (c->flags & REDIS_BLOCKED) { if (c->blockingto != 0 && c->blockingto < now) { addReply(c,shared.nullmultibulk); unblockClientWaitingData(c); } } } }
Pingback 引用通告: redis源代码分析12–client连接(中) | Linux C++ 中文网