先说下Redis主从复制的特点。
官方文档ReplicationHowto中提到以下特点:
1. 一个master支持多个slave
2. slave可以接受其他slave的连接,作为其他slave的master,从而形成一个master-slave的多级结构
3. 复制在master端是非阻塞的,也就是master在向client复制时可处理其他client的命令,而slave在第一次同步时是阻塞的
4. 复制被利用来提供可扩展性,比如可以将slave端用作数据冗余,也可以将耗时的命令(比如sort)发往某些slave从而避免master的阻塞,另外也可以用slave做持久化,这只需要将master的配置文件中的save指令注释掉。
client可以在一开始时作为slave连接master,也可以在运行后发布sync命令,从而跟master建立主从关系。
接下来我们分别从slave和master的视角概述下redis的主从复制的运行机制。
如果redis作为slave运行,则全局变量server.replstate的状态有REDIS_REPL_NONE(不处于复制状态)、 REDIS_REPL_CONNECT(需要跟master建立连接)、REDIS_REPL_CONNECTED(已跟master建立连接)三种。在读入slaveof配置或者发布slaveof命令后,server.replstate取值为REDIS_REPL_CONNECT,然后在syncWithMaster跟master执行第一次同步后,取值变为REDIS_REPL_CONNECTED。
如果redis作为master运行,则对应某个客户端连接的变量slave.replstate的状态有REDIS_REPL_WAIT_BGSAVE_START(等待bgsave运行)、REDIS_REPL_WAIT_BGSAVE_END(bgsave已dump db,该bulk传输了)、REDIS_REPL_SEND_BULK(正在bulk传输)、REDIS_REPL_ONLINE(已完成开始的bulk传输,以后只需发送更新了)。对于slave客户端(发布sync命令),一开始slave.replstate都处于REDIS_REPL_WAIT_BGSAVE_START状态(后面详解syncCommand函数),然后在后台dump db后(backgroundSaveDoneHandler函数),处于REDIS_REPL_WAIT_BGSAVE_END 状态,然后updateSlavesWaitingBgsave会将状态置为REDIS_REPL_SEND_BULK,并设置write事件的函数 sendBulkToSlave,在sendBulkToSlave运行后,状态就变为REDIS_REPL_ONLINE了,此后master会一直调用replicationFeedSlaves给处于REDIS_REPL_ONLINE状态的slave发送新命令。
我们先看处于master端的redis会执行的代码。
slave端都是通过发布sync命令来跟master同步的,sync命令的处理函数syncCommand如下所示。
该函数中的注释足够明了。如果slave的client设置了REDIS_SLAVE标志,说明master已用syncCommand处理了该 slave。如果master还有对这个client的reply没有发送,则返回出错信息。此后若server.bgsavechildpid != -1且有slave处于REDIS_REPL_WAIT_BGSAVE_END状态,则说明dump db的后台进程刚结束,此时新的slave可直接用保存的rdb进行bulk传输(注意复制reply参数,因为master是非阻塞的,此时可能执行了一些命令,call函数会调用replicationFeedSlaves函数将命令参数保存到slave的reply参数中)。如果没有slave处于REDIS_REPL_WAIT_BGSAVE_END状态,但server.bgsavechildpid != -1,则说明bgsave后台进程没有运行完,需要等待其结束(bgsave后台进程结束后会处理等待的slave)。如果server.bgsavechildpid 等于 -1,则需要启动一个后台进程来dump db了。最后将当前client加到master的slaves链表中。
static void syncCommand(redisClient *c) { /* ignore SYNC if aleady slave or in monitor mode */ if (c->flags & REDIS_SLAVE) return; /* SYNC can't be issued when the server has pending data to send to * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ if (listLength(c->reply) != 0) { addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n")); return; } redisLog(REDIS_NOTICE,"Slave ask for synchronization"); /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ if (server.bgsavechildpid != -1) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save */ redisClient *slave; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; } if (ln) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ listRelease(c->reply); c->reply = listDup(slave->reply); c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } } else { /* Ok we don't have a BGSAVE in progress, let's start one */ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n")); return; } c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } c->repldbfd = -1; c->flags |= REDIS_SLAVE; c->slaveseldb = 0; listAddNodeTail(server.slaves,c); return; }
此后slave无论处于REDIS_REPL_WAIT_BGSAVE_START还是REDIS_REPL_WAIT_BGSAVE_END,都只能等 dump db的后台进程运行结束后才会被处理。该进程结束后会执行backgroundSaveDoneHandler函数,而该函数调用 updateSlavesWaitingBgsave来处理slaves。
updateSlavesWaitingBgsave和syncCommand一样,涉及到slave的几个状态变换。对于等待dump db的slave,master都会将其放入server.slaves 链表中。此时,若slave->replstate == REDIS_REPL_WAIT_BGSAVE_START,说明当前dump db不是该slave需要的,redis需要重新启动后台进程来dump db。若slave->replstate == REDIS_REPL_WAIT_BGSAVE_END,则说明当前dump db正是该slave所需要的,此时设置slave的write事件的处理函数sendBulkToSlave。
static void updateSlavesWaitingBgsave(int bgsaveerr) { listNode *ln; int startbgsave = 0; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { startbgsave = 1; slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { struct redis_stat buf; if (bgsaveerr != REDIS_OK) { freeClient(slave); redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); continue; } if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 || redis_fstat(slave->repldbfd,&buf) == -1) { freeClient(slave); redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); continue; } slave->repldboff = 0; slave->repldbsize = buf.st_size; slave->replstate = REDIS_REPL_SEND_BULK; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { freeClient(slave); continue; } } } if (startbgsave) { if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { listIter li; listRewind(server.slaves,&li); redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) freeClient(slave); } } } }
sendBulkToSlave 的逻辑不复杂。它根据slave->repldbfd指向的db,先从dump后的rdb文件中读入db数据,然后发送。发送完后会删除write 事件,设置slave->replstate状态为REDIS_REPL_ONLINE,此后master就会在收到命令后调用call函数,然后使用replicationFeedSlaves同步更新该slave了。replicationFeedSlaves也是遍历slave链表,对处于REDIS_REPL_ONLINE状态的slave,发送当前命令及其参数。
static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *slave = privdata; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); char buf[REDIS_IOBUF_LEN]; ssize_t nwritten, buflen; if (slave->repldboff == 0) { /* Write the bulk write count before to transfer the DB. In theory here * we don't know how much room there is in the output buffer of the * socket, but in pratice SO_SNDLOWAT (the minimum count for output * operations) will never be smaller than the few bytes we need. */ sds bulkcount; bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) slave->repldbsize); if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount)) { sdsfree(bulkcount); freeClient(slave); return; } sdsfree(bulkcount); } lseek(slave->repldbfd,slave->repldboff,SEEK_SET); buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); if (buflen <= 0) { redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", (buflen == 0) ? "premature EOF" : strerror(errno)); freeClient(slave); return; } if ((nwritten = write(fd,buf,buflen)) == -1) { redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s", strerror(errno)); freeClient(slave); return; } slave->repldboff += nwritten; if (slave->repldboff == slave->repldbsize) { close(slave->repldbfd); slave->repldbfd = -1; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); slave->replstate = REDIS_REPL_ONLINE; if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) { freeClient(slave); return; } addReplySds(slave,sdsempty()); redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); } }
接下来我们看看redis作为slave是如何运行的。
redis 作为slave(当然也可以使用普通的client作为slave端,这样则跟具体client的实现有关了)时,需要在配置文件中指明master的位置,在loadServerConfig读取配置参数时,会将server.replstate设置为REDIS_REPL_CONNECT状态。处于此状态的redis需要运行到serverCron后才能使用syncWithMaster来和master进行初始同步。查看syncWithMaster的代码可知,其实也向master发布sync命令来建立主从关系的,另外,该函数接收、发送数据时使用的是syncRead、syncWrite函数,而这些函数是阻塞的,因此,redis作为slave运行时,建立最初的主从关系时也是阻塞的。
/* Check if we should connect to a MASTER */ if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) { redisLog(REDIS_NOTICE,"Connecting to MASTER..."); if (syncWithMaster() == REDIS_OK) { redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded"); if (server.appendonly) rewriteAppendOnlyFileBackground(); } }
另外跟主从复制有关的一个命令就是slaveof命令。此命令是redis主从状态的转换函数,通过前面的分析可知,这只需要更改几个状态即可。
static void slaveofCommand(redisClient *c) { if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { if (server.masterhost) { sdsfree(server.masterhost); server.masterhost = NULL; if (server.master) freeClient(server.master); server.replstate = REDIS_REPL_NONE; redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)"); } } else { sdsfree(server.masterhost); server.masterhost = sdsdup(c->argv[1]->ptr); server.masterport = atoi(c->argv[2]->ptr); if (server.master) freeClient(server.master); server.replstate = REDIS_REPL_CONNECT; redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", server.masterhost, server.masterport); } addReply(c,shared.ok); }
Pingback 引用通告: redis源代码分析19–主从复制 | Linux C++ 中文网