redis源代码分析10–事件处理(下)

serverCron做的工作很多,后续的很多章节都与此有关。该函数较复杂,分段分析。
一开始将当前时间保存,方便后续vm等机制对当前时间的访问:

/* We take a cached value of the unix time in the global state because
* with virtual memory and aging there is to store the current time
* in objects at every object access, and accuracy is not needed.
* To access a global var is faster than calling time(NULL) */
// 缓存时间
server.unixtime = time(NULL);

接着,如果收到SIGTERM等信号,则会在信号处理函数中设置server.shutdown_asap为1,此时就会调用prepareForShutdown做结束运行前的结尾工作:

/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
if (server.shutdown_asap) {
    if (prepareForShutdown() == REDIS_OK) exit(0);
    redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
}

接着,显示些db中的信息:

/* Show some info about non-empty databases */
for (j = 0; j < server.dbnum; j++) {
    long long size, used, vkeys;

    size = dictSlots(server.db[j].dict);
    used = dictSize(server.db[j].dict);
    vkeys = dictSize(server.db[j].expires);
    if (!(loops % 50) && (used || vkeys)) {
        redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
        /* dictPrintStats(server.dict); */
    }
}

当没有进行save或者rewrite aof的后台子进程运行时,会调用tryResizeHashTables、incrementallyRehash,以分别调整db的大小和重新rehash db。当有后台子进程运行时,进行rehash会使得系统使用较多的内存。

/* We don't want to resize the hash tables while a bacground saving
* is in progress: the saving child is created using fork() that is
* implemented with a copy-on-write semantic in most modern systems, so
* if we resize the HT while there is the saving child at work actually
* a lot of memory movements in the parent will cause a lot of pages
* copied. */
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) {
    if (!(loops % 10)) tryResizeHashTables();
    if (server.activerehashing) incrementallyRehash();
}

接着,显示些client的信息,并关闭timeout的连接:

/* Show information about connected clients */
if (!(loops % 50)) {
    redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
        listLength(server.clients)-listLength(server.slaves),
        listLength(server.slaves),
        zmalloc_used_memory());
}

/* Close connections of timedout clients */
if ((server.maxidletime && !(loops % 100)) || server.blpop_blocked_clients)
    closeTimedoutClients();

如果有后台子进程进行save或者rewrite aof的工作,此时等待其退出,并调用backgroundSaveDoneHandler或者backgroundRewriteDoneHandler做些后续工作,否则检查是否需要save db:

/* Check if a background saving or AOF rewrite in progress terminated */
if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
    int statloc;
    pid_t pid;

    if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
        if (pid == server.bgsavechildpid) {
            backgroundSaveDoneHandler(statloc);
        } else {
            backgroundRewriteDoneHandler(statloc);
        }
        updateDictResizePolicy();
    }
} else {
    /* If there is not a background saving in progress check if
     * we have to save now */
     time_t now = time(NULL);
     for (j = 0; j < server.saveparamslen; j++) {
        struct saveparam *sp = server.saveparams+j;

        if (server.dirty >= sp->changes &&
            now-server.lastsave > sp->seconds) {
            redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                sp->changes, sp->seconds);
            rdbSaveBackground(server.dbfilename);
            break;
        }
     }
}

释放expire的key:释放时从expired链表中随机选择,如果循环中超时的key的数量超过了设置值(REDIS_EXPIRELOOKUPS_PER_CRON)的25%,则继续释放:

/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace. */
for (j = 0; j < server.dbnum; j++) {
    int expired;
    redisDb *db = server.db+j;

    /* Continue to expire if at the end of the cycle more than 25%
     * of the keys were expired. */
    do {
        long num = dictSize(db->expires);
        time_t now = time(NULL);

        expired = 0;
        if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
            num = REDIS_EXPIRELOOKUPS_PER_CRON;
        while (num--) {
            dictEntry *de;
            robj *key;
            time_t t;

            if ((de = dictGetRandomKey(db->expires)) == NULL) break;
            t = (time_t) dictGetEntryVal(de);
            key = dictGetEntryKey(de);
            /* Don't expire keys that are in the contest of I/O jobs.
             * Otherwise decrRefCount will kill the I/O thread and
             * clients waiting for this keys will wait forever.
             *
             * In general this change will not have any impact on the
             * performance of the expiring algorithm but it's much safer. */
            if (server.vm_enabled &&
                (key->storage == REDIS_VM_SWAPPING ||
                 key->storage == REDIS_VM_LOADING)) continue;
            if (now > t) {
                deleteKey(db,dictGetEntryKey(de));
                expired++;
                server.stat_expiredkeys++;
            }
        }
    } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
}

如果内存超出阈值,则释放内存,前面的内存章节中已对此进行过分析:

    /* Swap a few keys on disk if we are over the memory limit and VM
     * is enbled. Try to free objects from the free list first. */
    if (vmCanSwapOut()) {
        while (server.vm_enabled && zmalloc_used_memory() >
                server.vm_max_memory)
        {
            int retval;

            if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
            retval = (server.vm_max_threads == 0) ?
                        vmSwapOneObjectBlocking() :
                        vmSwapOneObjectThreaded();
            if (retval == REDIS_ERR && !(loops % 300) &&
                zmalloc_used_memory() >
                (server.vm_max_memory+server.vm_max_memory/10))
            {
                redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
            }
            /* Note that when using threade I/O we free just one object,
             * because anyway when the I/O thread in charge to swap this
             * object out will finish, the handler of completed jobs
             * will try to swap more objects if we are still out of memory. */
            if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
        }
    }

当redis作为slave时,检查我们是否连接Master:

    /* Check if we should connect to a MASTER */
    if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER...");
        if (syncWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
            if (server.appendonly) rewriteAppendOnlyFileBackground();
        }
    }

返回100,表示在接下来的100ms内,serverCron会重新被调用:

    return 100;
}
此条目发表在 redis 分类目录。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。 必填项已被标记为 *

*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>