这一节我们简略介绍下client连接的3个核心函数。涉及的很多参数跟redis的诸多特性有关,可在阅读后续章节后返回查看其相关值。
redis将新client的连接状态保存在redisClient结构体中,该结构体的代码如下:
typedef struct redisClient {
int fd; // client对应的文件描述符
redisDb *db; // 当前操作的db
int dictid; //
sds querybuf; // 请求字符串
robj **argv, **mbargv; // 解析请求后的参数数组(mbargv用于multi命令)
int argc, mbargc;
int bulklen; /* bulk read len. -1 if not in bulk read mode */
int multibulk; /* multi bulk command format active */
list *reply;
int sentlen;
time_t lastinteraction; /* time of the last interaction, used for timeout */
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
int slaveseldb; /* slave selected db, if this client is a slave */
int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */
int repldbfd; /* replication DB file descriptor */
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
robj **blockingkeys; /* The key we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
int blockingkeysnum; /* Number of blocking keys */
time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
} redisClient;
新建client状态是用createClient函数完成,释放是用freeClient,超时是用closeTimedoutClients。
createClient 在上一节中的acceptHandler中被调用,一开始会将client设置成非阻塞、非延迟的,然后设置client有read事件时的处理函数 readQueryFromClient,接着默认选择db 0,清空命令缓冲区和参数数组等,最后将client加到全局的server.clients中。详细代码如下:
static redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(*c));
anetNonBlock(NULL,fd);
anetTcpNoDelay(NULL,fd);
if (!c) return NULL;
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
selectDb(c,0);
c->fd = fd;
c->querybuf = sdsempty();
c->argc = 0;
c->argv = NULL;
c->bulklen = -1;
c->multibulk = 0;
c->mbargc = 0;
c->mbargv = NULL;
c->sentlen = 0;
c->flags = 0;
c->lastinteraction = time(NULL);
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->reply = listCreate();
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
c->blockingkeys = NULL;
c->blockingkeysnum = 0;
c->io_keys = listCreate();
listSetFreeMethod(c->io_keys,decrRefCount);
c->pubsub_channels = dictCreate(&setDictType,NULL);
c->pubsub_patterns = listCreate();
listSetFreeMethod(c->pubsub_patterns,decrRefCount);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
listAddNodeTail(server.clients,c);
initClientMultiState(c);
return c;
}
freeClient用于释放client连接状态,在redis接收的client数超过了配置的 server.maxclients、读写client信息出错时、client连接超时时,都会被调用。
static void freeClient(redisClient *c) {
listNode *ln;
/* Note that if the client we are freeing is blocked into a blocking
* call, we have to set querybuf to NULL *before* to call
* unblockClientWaitingData() to avoid processInputBuffer() will get
* called. Also it is important to remove the file events after
* this, because this call adds the READABLE event. */
// 释放querybuf
sdsfree(c->querybuf);
c->querybuf = NULL;
if (c->flags & REDIS_BLOCKED)
unblockClientWaitingData(c);
/* Unsubscribe from all the pubsub channels */
// 释放channel
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeAllPatterns(c,0);
dictRelease(c->pubsub_channels);
listRelease(c->pubsub_patterns);
/* Obvious cleanup */
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
listRelease(c->reply);
freeClientArgv(c);
close(c->fd);
/* Remove from the list of clients */
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
/* Remove from the list of clients waiting for swapped keys, or ready
* to be restarted, but not yet woken up again. */
// 从io等待队列中删除client
if (c->flags & REDIS_IO_WAIT) {
redisAssert(server.vm_enabled);
if (listLength(c->io_keys) == 0) {
ln = listSearchKey(server.io_ready_clients,c);
/* When this client is waiting to be woken up (REDIS_IO_WAIT),
* it should be present in the list io_ready_clients */
redisAssert(ln != NULL);
listDelNode(server.io_ready_clients,ln);
} else {
while (listLength(c->io_keys)) {
ln = listFirst(c->io_keys);
dontWaitForSwappedKey(c,ln->value);
}
}
server.vm_blocked_clients--;
}
listRelease(c->io_keys);
/* Master/slave cleanup */
if (c->flags & REDIS_SLAVE) {
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
close(c->repldbfd);
list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c);
redisAssert(ln != NULL);
listDelNode(l,ln);
}
if (c->flags & REDIS_MASTER) {
server.master = NULL;
server.replstate = REDIS_REPL_CONNECT;
}
/* Release memory */
zfree(c->argv);
zfree(c->mbargv);
freeClientMultiState(c);
zfree(c);
}
closeTimedoutClients在serverCron中被调用。
static void closeTimedoutClients(void) {
redisClient *c;
listNode *ln;
time_t now = time(NULL);
listIter li;
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
c = listNodeValue(ln);
if (server.maxidletime &&
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
!(c->flags & REDIS_MASTER) && /* no timeout for masters */
dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
listLength(c->pubsub_patterns) == 0 &&
(now - c->lastinteraction > server.maxidletime))
{
redisLog(REDIS_VERBOSE,"Closing idle client");
freeClient(c);
} else if (c->flags & REDIS_BLOCKED) {
if (c->blockingto != 0 && c->blockingto < now) {
addReply(c,shared.nullmultibulk);
unblockClientWaitingData(c);
}
}
}
}
Pingback 引用通告: redis源代码分析12–client连接(中) | Linux C++ 中文网