static void initServer() { --- server.el = aeCreateEventLoop(); --- } int main(int argc, char **argv) { --- initServer(); --- aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); --- } void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
static void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; listNode *ln; listRewind(server.io_ready_clients,&li); while((ln = listNext(&li))) { redisClient *c = ln->value; struct redisCommand *cmd; /* Resume the client. */ listDelNode(server.io_ready_clients,ln); c->flags &= (~REDIS_IO_WAIT); server.vm_blocked_clients--; aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c); cmd = lookupCommand(c->argv[0]->ptr); assert(cmd != NULL); call(c,cmd); resetClient(c); /* There may be more data to process in the input buffer. */ if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); } } /* Write the AOF buffer on disk */ flushAppendOnlyFile(); }
aeMain调用aeProcessEvents处理文件事件和timer事件。aeProcessEvents 先获得最先超时的timer,并记下该timer距此时的时间段,将该时间段作为aeApiPoll的超时时间(以能尽快调用timer处理,因为是先处理file事件,后处理timer事件),aeApiPoll返回后将调用注册的read、write函数进行读写:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to se the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // tvp为最近的一个timer numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
接着,aeProcessEvents调用 processTimeEvents处理timer事件(此时至少有一个超时),processTimeEvents循环处理已超时的timer。注意,processTimeEvent并不一定会删除超时的timer,代码如下:
static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; // 中间注册的id必然比maxid大 while(te) { long now_sec, now_ms; long long id; if (te->id > maxId) { te = te->next; continue; } aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); processed++; --- if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { aeDeleteTimeEvent(eventLoop, id); } te = eventLoop->timeEventHead; } else { te = te->next; } } return processed; }
呃 是我没看仔细。现在明白了,之前没考虑到te->timeProc(eventLoop, id, te->clientData)可能会注册timeEvent事件。
嗯~~ 是这样的~~
只能说redis不适合这种场景。redis适合于IO密集型,不适合计算密集型。问题中由于A、又有B,导致B处理完A就超时了,典型的计算密集型,CPU根本就没有空出来啊~ 网络操作是IO密集型,redis才有用武之地。另一方面,记得redis整个源码中也只添加了serverCron一个定时器,而且这个函数的执行应该需要严格控制,比如那个redis中的db数就不应太多,so其实那个定时器链表其实没什么用,也难怪查找时作者说没有优化的必要了。。。