redis源代码分析9–事件处理(中)

接下来,我们分析下redis中事件的处理逻辑。

在函数initServer中调用aeCreateEventLoop完成初始化后,在main函数中调用ae_main,该函数是一个死循环:

static void initServer() {
    ---
    server.el = aeCreateEventLoop();
    ---
}

int main(int argc, char **argv) {
   ---
   initServer();
   ---
   aeSetBeforeSleepProc(server.el,beforeSleep);
   aeMain(server.el);
   ---
}

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}


尽管aeMain函数有退出条件,但除了基准测试中会调用aeStop修改该值,该条件不会被改变。

aeMain在处理event之前,先调用beforeSleep,该函数先处理已ready的client,然后刷新aof缓冲区(aof机制后续章节会详细分析):

static void beforeSleep(struct aeEventLoop *eventLoop) {
    REDIS_NOTUSED(eventLoop);

    /* Awake clients that got all the swapped keys they requested */
    if (server.vm_enabled && listLength(server.io_ready_clients)) {
        listIter li;
        listNode *ln;

        listRewind(server.io_ready_clients,&li);
        while((ln = listNext(&li))) {
            redisClient *c = ln->value;
            struct redisCommand *cmd;

            /* Resume the client. */
            listDelNode(server.io_ready_clients,ln);
            c->flags &= (~REDIS_IO_WAIT);
            server.vm_blocked_clients--;
            aeCreateFileEvent(server.el, c->fd, AE_READABLE,
                readQueryFromClient, c);
            cmd = lookupCommand(c->argv[0]->ptr);
            assert(cmd != NULL);
            call(c,cmd);
            resetClient(c);
            /* There may be more data to process in the input buffer. */
            if (c->querybuf && sdslen(c->querybuf) > 0)
                processInputBuffer(c);
        }
    }
    /* Write the AOF buffer on disk */
    flushAppendOnlyFile();
}

aeMain调用aeProcessEvents处理文件事件和timer事件。aeProcessEvents 先获得最先超时的timer,并记下该timer距此时的时间段,将该时间段作为aeApiPoll的超时时间(以能尽快调用timer处理,因为是先处理file事件,后处理timer事件),aeApiPoll返回后将调用注册的read、write函数进行读写:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to se the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
        // tvp为最近的一个timer
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

        /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

接着,aeProcessEvents调用 processTimeEvents处理timer事件(此时至少有一个超时),processTimeEvents循环处理已超时的timer。注意,processTimeEvent并不一定会删除超时的timer,代码如下:

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    // 中间注册的id必然比maxid大
    while(te) {
        long now_sec, now_ms;
        long long id;
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;
            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            ---
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                aeDeleteTimeEvent(eventLoop, id);
            }
            te = eventLoop->timeEventHead;
        } else {
            te = te->next;
        }
    }
    return processed;
}

当timer超时时,会调用timer创建时注册的timeProc,根据timerProc的返回值,是删除还是继续修改超时时间。注意,redis的主要循环处理函数serverCron就是靠这种定时机制得以反复运行的,该定时处理函数就一直返回100,这样就使得redis每隔100ms执行一次serverCron函数。

因此,redis的主要循环逻辑为一开始使用beforeSleep处理ready的client,然后处理相关的文件event,最后调用serverCron做一些工作。

下面一节分析下serverCron所做的工作。

此条目发表在 redis 分类目录。将固定链接加入收藏夹。

redis源代码分析9–事件处理(中)》有 8 条评论

  1. timebug 说:

    你好,请问:“中间注册的id必然比maxid大”这句话如何理解?eventLoop->timeEventNextId不是每次注册一个timeEvent事件就加1吗?为何这里还会有te->id大于maxId?

  2. seaky 说:

    请教下,redis的实现的框架中,定时器事件还是有死循环的可能啊,假设有两个事件A和B,A是链表头,A每100ms执行一次,B执行需要150ms。

    那么首先执行A,A执行完后,设置下次超时时间,然后从A开始执行,此时A还没超时跳过执行,执行B,B执行完后又从头开始执行,此时A已经超时,执行A,这时不是死循环了吗

    • petermao 说:

      不知道我的理解对不对,A循环执行并且该循环时间比较短(就是你说的100ms),而且执行很慢(你插入那个B好像也就是这个意思吧,其实这个无所谓,可以还有B、C、D,无非就是模拟中间还有其他函数费了不少时间),导致的结果就是A超时后处理然后继续超时,继续调用A的超时函数导致循环,因为redis在处理完一个超时事件后会返回到链表头重新检查。

      这个确实形成了死循环。但这没有逻辑问题。从要求上讲,A要求定时执行,既然超时了(每次返回到链表头检查时),当然要执行罗。。。

      只能说redis不适合这种场景。redis适合于IO密集型,不适合计算密集型。问题中由于A、又有B,导致B处理完A就超时了,典型的计算密集型,CPU根本就没有空出来啊~ 网络操作是IO密集型,redis才有用武之地。另一方面,记得redis整个源码中也只添加了serverCron一个定时器,而且这个函数的执行应该需要严格控制,比如那个redis中的db数就不应太多,so其实那个定时器链表其实没什么用,也难怪查找时作者说没有优化的必要了。。。

      想起了之前别人问过我的一个问题,也是关于这个time链表。这个链表是无序的,如果链表上有大量的超时time,按照这个顺序处理逻辑就可能不对了。因为按理说至少要排个序嘛,超时也得有个先后。。。当时很囧,没有代码在手边,没回答上这样做在redis中会不会存在问题。道理其实很简单,因为redis中暂时就一个time嘛。。。当然,如果要迁移这个框架为你所用,是需要改改的。。。比如可以用跳表、堆什么的优化一下。。。。

  3. xiaogang 说:

    你好,REDIS_NOTUSED(eventLoop),如何理解

  4. timebug 说:

    您好,请问redis中的epoll模型没有采用更为高效的ET模式有什么原因吗?

xiaogang 发表评论 取消回复

电子邮件地址不会被公开。 必填项已被标记为 *

*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>