接下来的三节我们介绍client连接,按接受client连接、client连接的3个核心函数、client连接的几个标志3个部分顺序介绍。
这一节我们介绍下redis如何接受client连接。
在main函数中调用的initServer中,可看到如下代码:
static void initServer() {
---
server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
---
if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
acceptHandler, NULL) == AE_ERR) oom("creating file event");
---
}
anetTcpServer 也就是socket、bind、listen的封装,返回listen后的fd,并保存在全局的server.fd中,然后调用 aeCreateFileEvent,使server.fd监听read事件,并在read事件响应后(也就是有新的client来到时),调用 acceptHandler来处理。可以看到acceptHandler最终调用系统api accept来处理。在acceptHandler返回后(尽管acceptHandler仅等待一个client连接,但由于server.fd的read事件一直被监听,所以会在aeProcessEvents反复被处理,从而导致acceptHandler反复被调用),acceptHandler调用createClient返回一个redisClient结构,保存新的client连接的状态。如果客户端连接数过多,则向客户端返回出错信息,并释放连接。从这里可以看出,redis对client连接的处理完全是使用事件来处理的,没有多进程,没有多线程。
static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
---
cfd = anetAccept(server.neterr, fd, cip, &cport);
---
if ((c = createClient(cfd)) == NULL) {
redisLog(REDIS_WARNING,"Error allocating resoures for the client");
close(cfd); /* May be already closed, just ingore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in nonblocking
* mode and we can send an error for free using the Kernel I/O */
if (server.maxclients && listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
freeClient(c);
return;
}
server.stat_numconnections++;
}
int anetAccept(char *err, int serversock, char *ip, int *port)
{
int fd;
struct sockaddr_in sa;
unsigned int saLen;
while(1) {
saLen = sizeof(sa);
fd = accept(serversock, (struct sockaddr*)&sa, &saLen);
if (fd == -1) {
if (errno == EINTR)
continue;
else {
anetSetError(err, "accept: %s\n", strerror(errno));
return ANET_ERR;
}
}
break;
}
if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
if (port) *port = ntohs(sa.sin_port);
return fd;
}