接下来的三节我们介绍client连接,按接受client连接、client连接的3个核心函数、client连接的几个标志3个部分顺序介绍。
这一节我们介绍下redis如何接受client连接。
在main函数中调用的initServer中,可看到如下代码:
static void initServer() { --- server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); --- if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, acceptHandler, NULL) == AE_ERR) oom("creating file event"); --- }
anetTcpServer 也就是socket、bind、listen的封装,返回listen后的fd,并保存在全局的server.fd中,然后调用 aeCreateFileEvent,使server.fd监听read事件,并在read事件响应后(也就是有新的client来到时),调用 acceptHandler来处理。可以看到acceptHandler最终调用系统api accept来处理。在acceptHandler返回后(尽管acceptHandler仅等待一个client连接,但由于server.fd的read事件一直被监听,所以会在aeProcessEvents反复被处理,从而导致acceptHandler反复被调用),acceptHandler调用createClient返回一个redisClient结构,保存新的client连接的状态。如果客户端连接数过多,则向客户端返回出错信息,并释放连接。从这里可以看出,redis对client连接的处理完全是使用事件来处理的,没有多进程,没有多线程。
static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { --- cfd = anetAccept(server.neterr, fd, cip, &cport); --- if ((c = createClient(cfd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resoures for the client"); close(cfd); /* May be already closed, just ingore errors */ return; } /* If maxclient directive is set and this is one client more... close the * connection. Note that we create the client instead to check before * for this condition, since now the socket is already set in nonblocking * mode and we can send an error for free using the Kernel I/O */ if (server.maxclients && listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; /* That's a best effort error message, don't check write errors */ if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } freeClient(c); return; } server.stat_numconnections++; } int anetAccept(char *err, int serversock, char *ip, int *port) { int fd; struct sockaddr_in sa; unsigned int saLen; while(1) { saLen = sizeof(sa); fd = accept(serversock, (struct sockaddr*)&sa, &saLen); if (fd == -1) { if (errno == EINTR) continue; else { anetSetError(err, "accept: %s\n", strerror(errno)); return ANET_ERR; } } break; } if (ip) strcpy(ip,inet_ntoa(sa.sin_addr)); if (port) *port = ntohs(sa.sin_port); return fd; }