接下来,我们分析下redis中事件的处理逻辑。
在函数initServer中调用aeCreateEventLoop完成初始化后,在main函数中调用ae_main,该函数是一个死循环:
static void initServer() { --- server.el = aeCreateEventLoop(); --- } int main(int argc, char **argv) { --- initServer(); --- aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); --- } void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
尽管aeMain函数有退出条件,但除了基准测试中会调用aeStop修改该值,该条件不会被改变。
aeMain在处理event之前,先调用beforeSleep,该函数先处理已ready的client,然后刷新aof缓冲区(aof机制后续章节会详细分析):
static void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; listNode *ln; listRewind(server.io_ready_clients,&li); while((ln = listNext(&li))) { redisClient *c = ln->value; struct redisCommand *cmd; /* Resume the client. */ listDelNode(server.io_ready_clients,ln); c->flags &= (~REDIS_IO_WAIT); server.vm_blocked_clients--; aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c); cmd = lookupCommand(c->argv[0]->ptr); assert(cmd != NULL); call(c,cmd); resetClient(c); /* There may be more data to process in the input buffer. */ if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); } } /* Write the AOF buffer on disk */ flushAppendOnlyFile(); }
aeMain调用aeProcessEvents处理文件事件和timer事件。aeProcessEvents 先获得最先超时的timer,并记下该timer距此时的时间段,将该时间段作为aeApiPoll的超时时间(以能尽快调用timer处理,因为是先处理file事件,后处理timer事件),aeApiPoll返回后将调用注册的read、write函数进行读写:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to se the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // tvp为最近的一个timer numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
接着,aeProcessEvents调用 processTimeEvents处理timer事件(此时至少有一个超时),processTimeEvents循环处理已超时的timer。注意,processTimeEvent并不一定会删除超时的timer,代码如下:
static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; // 中间注册的id必然比maxid大 while(te) { long now_sec, now_ms; long long id; if (te->id > maxId) { te = te->next; continue; } aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); processed++; --- if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { aeDeleteTimeEvent(eventLoop, id); } te = eventLoop->timeEventHead; } else { te = te->next; } } return processed; }
当timer超时时,会调用timer创建时注册的timeProc,根据timerProc的返回值,是删除还是继续修改超时时间。注意,redis的主要循环处理函数serverCron就是靠这种定时机制得以反复运行的,该定时处理函数就一直返回100,这样就使得redis每隔100ms执行一次serverCron函数。
因此,redis的主要循环逻辑为一开始使用beforeSleep处理ready的client,然后处理相关的文件event,最后调用serverCron做一些工作。
下面一节分析下serverCron所做的工作。
你好,请问:“中间注册的id必然比maxid大”这句话如何理解?eventLoop->timeEventNextId不是每次注册一个timeEvent事件就加1吗?为何这里还会有te->id大于maxId?
呃 是我没看仔细。现在明白了,之前没考虑到te->timeProc(eventLoop, id, te->clientData)可能会注册timeEvent事件。
嗯~~ 是这样的~~
请教下,redis的实现的框架中,定时器事件还是有死循环的可能啊,假设有两个事件A和B,A是链表头,A每100ms执行一次,B执行需要150ms。
那么首先执行A,A执行完后,设置下次超时时间,然后从A开始执行,此时A还没超时跳过执行,执行B,B执行完后又从头开始执行,此时A已经超时,执行A,这时不是死循环了吗
不知道我的理解对不对,A循环执行并且该循环时间比较短(就是你说的100ms),而且执行很慢(你插入那个B好像也就是这个意思吧,其实这个无所谓,可以还有B、C、D,无非就是模拟中间还有其他函数费了不少时间),导致的结果就是A超时后处理然后继续超时,继续调用A的超时函数导致循环,因为redis在处理完一个超时事件后会返回到链表头重新检查。
这个确实形成了死循环。但这没有逻辑问题。从要求上讲,A要求定时执行,既然超时了(每次返回到链表头检查时),当然要执行罗。。。
只能说redis不适合这种场景。redis适合于IO密集型,不适合计算密集型。问题中由于A、又有B,导致B处理完A就超时了,典型的计算密集型,CPU根本就没有空出来啊~ 网络操作是IO密集型,redis才有用武之地。另一方面,记得redis整个源码中也只添加了serverCron一个定时器,而且这个函数的执行应该需要严格控制,比如那个redis中的db数就不应太多,so其实那个定时器链表其实没什么用,也难怪查找时作者说没有优化的必要了。。。
想起了之前别人问过我的一个问题,也是关于这个time链表。这个链表是无序的,如果链表上有大量的超时time,按照这个顺序处理逻辑就可能不对了。因为按理说至少要排个序嘛,超时也得有个先后。。。当时很囧,没有代码在手边,没回答上这样做在redis中会不会存在问题。道理其实很简单,因为redis中暂时就一个time嘛。。。当然,如果要迁移这个框架为你所用,是需要改改的。。。比如可以用跳表、堆什么的优化一下。。。。
你好,REDIS_NOTUSED(eventLoop),如何理解
您好,请问redis中的epoll模型没有采用更为高效的ET模式有什么原因吗?
可以参考下zhihu上的讨论:
http://www.zhihu.com/question/20502870