AOF

时间:2025-04-15 07:02:59
  • <pre name="code" class="cpp">/* Call() is the core of Redis execution of a command */  
  • void call(redisClient *c) {  
  •     long long dirty, start = ustime(), duration;  
  •   
  •     dirty = ;  
  •     c->cmd->proc(c);    //命令执行  
  •     dirty = -dirty;   //计算dirty值,更新操作将改变dirty值  
  •     duration = ustime()-start;  
  •     slowlogPushEntryIfNeeded(c->argv,c->argc,duration);  
  •     if ( && dirty > 0) //启动了AOF,并且数据有更新  
  •         feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);   
  •     if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&  
  •         listLength())  
  •         replicationFeedSlaves(,c->db->id,c->argv,c->argc);  
  •     if (listLength())  
  •         replicationFeedMonitors(,c->db->id,c->argv,c->argc);  
  •     server.stat_numcommands++;  
  • }</pre>  
  • <pre></pre>  
  • <pre name="code" class="cpp"><pre name="code" class="cpp">void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {  
  •   
  •     if (dictid != ) { //当前操作的数据库和之前的数据库不一致,则写一条改变数据库的命令  
  •         char seldb[64];  
  •   
  •         snprintf(seldb,sizeof(seldb),"%d",dictid);  
  •         buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",  
  •             (unsigned long)strlen(seldb),seldb);  
  •          = dictid;  
  •     }  
  •   
  •      .....  
  •   
  •      = sdscatlen(,buf,sdslen(buf)); //数据放入atobuf中  
  •   
  •     if ( != -1)  //如果子进程在进行Aof log rewrite,则同时将数据放入缓冲区bgrewritebuf  
  •          = sdscatlen(,buf,sdslen(buf));  
  •   
  •     sdsfree(buf);  
  • }</pre>  
  • <pre></pre>  
  • feedAppendOnlyFile并不真正写Aof log<strong>,写log操作发生在返回给用户请求之前的<span><span>flushAppendOnlyFile</span></span>函数</strong><pre name="code" class="cpp"><pre name="code" class="cpp">void flushAppendOnlyFile(int force) {  
  •     ssize_t nwritten;  
  •     int sync_in_progress = 0;  
  •   
  •     if (sdslen() == 0) return;  
  •   
  •     if ( == APPENDFSYNC_EVERYSEC)   //当appendfsync设置为everysec,check是否有数据等待fsync  
  •         sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;  
  •   
  •     //如果appendfsync设置为everysec,阻塞时如果主线程等待fsync时间不超过2s,则返回(数据缓存在aofbuf中)  
  •     if ( == APPENDFSYNC_EVERYSEC && !force) {  
  •         if (sync_in_progress) {  
  •             if (server.aof_flush_postponed_start == 0) {  
  •                 server.aof_flush_postponed_start = ;  
  •                 return;  
  •             } else if ( - server.aof_flush_postponed_start < 2) {  
  •                 return;  
  •             }  
  •             redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");  
  •         }  
  •     }  
  •     server.aof_flush_postponed_start = 0;  
  •      
  •     //写Aof log  
  •     nwritten = write(,,sdslen());  
  •     if (nwritten != (signed)sdslen()) {  
  •         if (nwritten == -1) {  
  •             redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));  
  •         } else {  
  •             redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));  
  •         }  
  •         exit(1);  
  •     }  
  •     server.appendonly_current_size += nwritten; //记录log文件大小  
  •   
  •     //清空aofbuf,如果aofbuf较小时,复用之  
  •     if ((sdslen()+sdsavail()) < 4000) {  
  •         sdsclear();  
  •     } else {  
  •         sdsfree();  
  •          = sdsempty();  
  •     }  
  •   
  •     //如果设置no-appendfsync-on-rewrite 为yes,且当前有子进程进行rewrite,则直接返回  
  •     if (server.no_appendfsync_on_rewrite &&  
  •         ( != -1 ||  != -1))  
  •             return;  
  •   
  •     /* Perform the fsync if needed. */  
  •     if ( == APPENDFSYNC_ALWAYS) {  
  •         /* aof_fsync is defined as fdatasync() for Linux in order to avoid 
  •          * flushing metadata. */  
  •         aof_fsync(); /* Let's try to get this data on the disk */  
  •          = ;  
  •     } else if (( == APPENDFSYNC_EVERYSEC &&  
  •                  > )) {  
  •         if (!sync_in_progress) aof_background_fsync();  
  •          = ;  
  •     }  
  • }</pre>  
  • <pre></pre>  
  • <pre name="code" class="cpp">void aof_background_fsync(int fd) {  
  •     bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);  
  • }</pre><pre name="code" class="cpp"><pre name="code" class="cpp">void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {  
  •     struct bio_job *job = zmalloc(sizeof(*job));  
  •   
  •     job->time = time(NULL);  
  •     job->arg1 = arg1;  
  •     job->arg2 = arg2;  
  •     job->arg3 = arg3;  
  •     pthread_mutex_lock(&bio_mutex[type]);  
  •     listAddNodeTail(bio_jobs[type],job);  
  •     bio_pending[type]++;  
  •     pthread_cond_signal(&bio_condvar[type]);  
  •     pthread_mutex_unlock(&bio_mutex[type]);  
  • }</pre>  
  • <pre></pre>  
  • <strong>对于<span style="color:#FF0000">appendfsync</span>设置为everysec的情况,fsync是由独立线程完成</strong><span style="font-size:16px"><strong>rewrite Aof log:</strong></span>在两种情况下,redis会对aof log做rewrite1.  配置自动rewrite的阈值出现2.  客户端发送bgrewriteaof命令  
  • <p>接收到bgrewriteaof命令执行函数:</p>  
  • <pre name="code" class="cpp">void bgrewriteaofCommand(redisClient *c) {  
  •     if ( != -1) {  
  •         addReplyError(c,"Background append only file rewriting already in progress");  
  •     } else if ( != -1) {  
  •         server.aofrewrite_scheduled = 1;  
  •         addReplyStatus(c,"Background append only file rewriting scheduled");  
  •     } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {  
  •         addReplyStatus(c,"Background append only file rewriting started");  
  •     } else {  
  •         addReply(c,);  
  •     }  
  • }</pre><br>  
  • <pre name="code" class="cpp">int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {  
  •     ......  
  •       
  •     /* Start a scheduled AOF rewrite if this was requested by the user while 
  •      * a BGSAVE was in progress. */  
  •     if ( == -1 &&  == -1 &&  
  •         server.aofrewrite_scheduled)  
  •     {  
  •         rewriteAppendOnlyFileBackground();  
  •     }  
  •   
  •     //等待后台rewrite子进程结束并做后处理  
  •     if ( != -1 ||  != -1) {  
  •         int statloc;  
  •         pid_t pid;  
  •   
  •         if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {  
  •             if (pid == ) {  
  •                 backgroundSaveDoneHandler(statloc);  
  •             } else {  
  •                 backgroundRewriteDoneHandler(statloc);  
  •             }  
  •             updateDictResizePolicy();  
  •         }  
  •     } else {  
  •          time_t now = time(NULL);  
  •   
  •         ......  
  •   
  •          //check重写aof log的配置条件是否出现  
  •          if ( == -1 &&  
  •               == -1 &&  
  •              server.auto_aofrewrite_perc &&  
  •              server.appendonly_current_size > server.auto_aofrewrite_min_size)  
  •          {  
  •             long long base = server.auto_aofrewrite_base_size ?  
  •                             server.auto_aofrewrite_base_size : 1;  
  •             long long growth = (server.appendonly_current_size*100/base) - 100;  
  •             if (growth >= server.auto_aofrewrite_perc) {  
  •                 redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);  
  •                 rewriteAppendOnlyFileBackground();  
  •             }  
  •         }  
  •     }  
  •     ......  
  • }</pre>执行重写:<br>  
  • <pre name="code" class="cpp"><pre name="code" class="cpp">int rewriteAppendOnlyFileBackground(void) {  
  •     ......  
  •     if ((childpid = fork()) == 0) { //创建子进程,由子进程  
  •         char tmpfile[256];  
  •   
  •         //以下为子进程执行  
  •         if (server.vm_enabled) vmReopenSwapFile();  
  •         if ( > 0) close();  
  •         if ( > 0) close();  
  •         snprintf(tmpfile,256,"temp-rewriteaof-bg-%", (int) getpid());  
  •         if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {  
  •             _exit(0);  
  •         } else {  
  •             _exit(1);  
  •         }  
  •     } else {  
  •         //以下父进程执行  
  •         ......  
  •          = childpid;  
  •         ......  
  •         return REDIS_OK;  
  •     }  
  •     return REDIS_OK; /* unreached */  
  • }</pre>  
  • <pre></pre>  
  • <span style="color:#FF0000"><span style="color:#333333">子进程重写采用<span style="color:#FF0000">copy on write</span>,将当前子进程看到的数据状态写入日志:</span></span><pre name="code" class="cpp"><span style="color:#333333;"></span><pre name="code" class="cpp">int rewriteAppendOnlyFile(char *filename) {  
  •     ......  
  •     //创建临时文件  
  •     snprintf(tmpfile,256,"temp-rewriteaof-%", (int) getpid());  
  •     fp = fopen(tmpfile,"w");  
  •     if (!fp) {  
  •         redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));  
  •         return REDIS_ERR;  
  •     }  
  •       
  •     //遍历所有数据库  
  •     for (j = 0; j < ; j++) {  
  •         char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";  
  •         redisDb *db = +j;  
  •         dict *d = db->dict;  
  •         if (dictSize(d) == 0) continue;  
  •         di = dictGetSafeIterator(d);  
  •         if (!di) {  
  •             fclose(fp);  
  •             return REDIS_ERR;  
  •         }  
  •   
  •         //写选择数据库命令  
  •         if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;  
  •         if (fwriteBulkLongLong(fp,j) == 0) goto werr;  
  •   
  •         //写数据库所有元素  
  •         while((de = dictNext(di)) != NULL) {  
  •             .....  
  •         }  
  •     }  
  •   
  •     //数据写入disk  
  •     fflush(fp);  
  •     aof_fsync(fileno(fp));  
  •     fclose(fp);  
  •     ......  
  • </pre>  
  • <pre></pre>  
  • <pre></pre>  
  • <pre></pre>  
  • <pre></pre>  
  • <pre></pre>  
  • <pre></pre>  
  • <pre></pre>  
  •   
  • </pre></pre></pre></pre></pre>  
  • 相关文章