Redis服务器负责与多个客户端建立网络连接,处理客户端发送的命令请求,在数据库中保存客户端执行命令所产生的数据,并通过资源管理来维持服务器自身的运行。
一. 命令请求的执行过程
一个命令请求从发送到获得回复过程中,客户端和服务器需要完成一系列操作。
举个例子:如果我们使用客户端执行命令set key value
那么客户端和服务器需要进行一下操作:
- 客户端向服务器发送命令请求set key value。
- 服务器接收并处理客户端发送的命令请求set key value,在数据库中进行数据操作,并产生命令回复ok。
- 服务器将命令回复ok发送给客户端。
- 客户端接收服务器返回的命令回复ok,并将这个回复打印给用户看。
下面是操作细节。
1.1 发送命令请求
Redis服务器的命令请求来自Redis客户端,当用户在客户端中键入一个命令请求时,客户端会将这个命令请求转换成协议格式,然后通过连接服务器得到的套接字,将协议格式的命令请求发送给服务器。
1.2 读取命令请求
当客户端与服务器之间连接套接字因为客户端的写入而变得可读时(就是客户端发送命令请求过来,套接字变成了可读状态),服务器将调用命令处理器来执行以下操作:
- 读取套接字中协议格式的命令请求,并将其保存到客户端状态的输入缓冲区中。
- 对输入缓冲区中的命令请求进行分析,提取出命令请求中包含的命令参数,以及命令参数个数,然后分别将参数和参数个数保存到客户端状态的argv属性和argc属性里面。
- 调用命令执行器,执行客户端指定的命令。
对于set key value命令请求,保存到客户端的输入缓冲区之后,客户端状态的样子:
服务器分析程序对输入缓冲区中协议进行分析后,并得出分析结果保存到客户端状态的argv属性和argc属性里面:
之后服务器调用命令执行器来完成执行命令所需的剩余步骤。
1.3 命令执行器(1):查找命令实现
命令执行器要做的第一件事就是根据客户端状态的argv[0]参数,在命令表(command table)中查找参数所指定的命令,并将找到的命令保存到客户端状态的cmd属性里面。
命令表是一个字典,字典的键是命令的名字,比如:"set","get","del"等,而字典的值则是一个个redisCommand结构,每一个redisCommand结构记录了一个Redis命令的实现信息,下表记录了这个结构各个主要属性的类型和作用。
下表列出了sflags属性可以使用的标识值,以及这些标识的意义:
下图展示了命令表,并以SET命令和GET命令作为例子,展示了redisCommand结构:
- SET命令的名字为"set",实现函数为setCommand函数,命令参数个数为-3,标识命令接受三个或以上数量的参数,命令标识为"wm",标识SET命令为写入命令,并且在执行这个命令之前,服务器应该对占用内存状况进行检查,因为这个命令可能会占用大量内存。
- GET命令的名字为"get",实现函数为getCommand函数,命令参数个数为2,表示命令只接受两个参数,命令标识为"r",表示这只是一个只读命令。
对于之前的例子set key value,当程序以argv[0]作为参数输入,在命令表中进行查找时,命令表将返回"set"键所对应的redisCommand结构,客户端状态的cmd指针会指向这个redisCommand结构,如下图。
注意:命令名字的大小写并不影响命令表的查找结果。
因为命令表使用的是与大小写无关的算法,无论输入的命令名字是大写,小写或者混合大小写,只要命令的名字正确,就能找到相应的redisCommand结构。
1.4 命令执行器(2):执行预备操作
到目前为止,服务器已经将执行命令所需的命令实现函数(保存在客户端状态的cmd属性),参数(保存在客户端状态的argv属性),参数个数(保存在客户端状态的argc属性)都收集齐了,但是在真正执行命令之前,程序还需要进行一些预备操作,从而可以确保命令可以正确,顺利的执行。
- 检查客户端状态的cmd指针是否指向NULL,如果是的话,那么说明用户输入的命令名字找不到相应的命令实现,服务器不再执行后续步骤,并向客户端返回一个错误。
- 根据客户端cmd属性指向的redisCommand结构的arity属性,检查命令请求参数个数是否正确,当参数个数不正确时,不再执行后续步骤,直接向客户端返回一个错误。
- 检查客户端是否通过身份验证,未通过身份验证的客户端只能执行AUTH命令,如果未通过身份验证的客户端试图执行除了AUTH命令之外的其他命令,那么服务器将向客户端返回一个错误。
- 如果服务器打开了maxmemory功能,在执行命令之前,先检查服务器的内存占用情况,并在有需要时进行内存回收,从而使得接下来的命令可以顺利执行。如果内存回收失败,那么不再执行后续步骤,向客户端返回一个错误。
- 如果服务器上一次执行BGSAVE命令时出错,并且服务器打开了stop-writes-on-bgsave-error功能,并且服务器即将要执行的命令是一个写命令,那么服务器将拒绝执行这个命令,并向客户端返回一个错误。
- 如果客户端当前正在用SUBSCRIBE命令订阅频道,或者正在用PSUBSCRIBE命令订阅模式,那么服务器只会执行客户端发来的SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE,PUNSUBSCRIBE四个命令,其它命令都会被服务器拒绝。
- 如果服务器正在进行数据载入,那么客户端发送的命令必须带有1标识(比如INFO,SHUTDOWN,PUBLISH等等)才会被服务器执行,其他命令都会被服务器拒绝。
- 如果服务器因为执行Lua脚本而超时并进入阻塞状态,那么服务器只会执行客户端发来的SHUTDOWN nosave命令和SCRIPT KILL命令,其他命令都会被服务器拒绝。
- 如果客户端正在执行事务,那么服务器只会执行客户端发来的EXEC,DISCARD,MULTI,WATCH四个命令,其他命令都会被放进事务队列中。
- 如果服务器打开了监视器功能,那么服务器会将要执行的命令和参数等信息发送给监视器。
当完成了以上预备操作之后,服务器就可以开始正在执行命令。
注意:以上只列出了服务器在单机模式下执行命令时的检查操作,当服务器在复制或者集群模式下执行命令时,预备操作会更多。
1.5 命令执行器(3):调用命令的实现函数
在前面操作中,服务器已经将要执行命令的实现保存到了客户端状态的cmd属性里,并将命令参数和参数个数分别保存到了客户端状态的argv属性个argc属性里面,当服务器决定要执行命令时,它只需要执行下面的语句:
//client是客户端状态的指针
client->cmd->proc(client);
//等价于
setCommand(client;
被调用的命令实现函数会执行指定操作,并产生相应的命令回复,这些回复会被保存在客户端状态的输出缓冲区里面(buf属性和reply属性),之后实现函数还会为客户端的套接字关联命令回复处理器,这些处理器负责将命令回复返回给客户端。
对于前面的set key value命令,函数嗲用setCommand(client)函数将产生一个"+OK\r\n"回复,这个回复保存在客户端状态的buf属性中:
1.6 命令执行器(4):执行后续操作
在执行完实现函数之后,服务器还需要执行一些后续操作:
- 如果服务器开启了慢查询日志功能,那么慢查询日志模块会检查是否需要为刚刚执行完的命令请求添加一条新的慢查询日志。
- 根据刚刚执行命令所耗费的时长,更新被执行命令的redisCommand结构的milliseconds属性,并将命令的redisCommand结构的calls计数器的值增一。
- 如果服务器开启了AOF持久化功能,那么AOF持久化模块会将刚刚执行的命令请求写入到AOF缓冲区里面。
- 如果其他从服务器正在复制当前服务器,那么服务器会将刚刚执行的命令传播给所有从服务器。
当以上操作执行完,服务器会与当前命令的执行到此告一段落,之后服务器就可以继续从文件时间处理器中取出并处理下一个命令请求。
1.7 将命令回复发送给客户端
命令实现函数会将命令回复保存到客户端状态的输出缓冲区中,并为客户端的套接字关联命令回复处理器,当客户端套接字变为了可写状态时,服务器就会执行命令回复处理器,将保存在客户端状态的输出缓冲区中的命令回复发送给客户端。
当命令回复完毕之后,回复处理器会清空客户端状态的输出缓冲区,为处理下一个命令请求做准备。
1.8 客户端接收并打印命令回复
当客户端接收到协议格式的命令回复之后,它会将这些回复转换成人类可读的格式,并打印给用户看(假设我们使用的是Redis自带的redis-cli客户端)。
二.serverCron函数
Redis服务器中的serverCron函数默认每隔100毫秒执行一次,这个函数负责管理服务器的资源,并保持服务器自身的良好运转。
下面介绍serverCron函数执行的操作进行完整介绍,并介绍redisServer结构(服务器状态)中和serverCron函数有关的属性。
2.1 更新服务器时间缓存
Redis服务器中有不少功能需要获取系统的当前时间,而每次获取系统的当前时间需要执行一次系统调用,为了减少系统调用的次数,服务器状态中unixtime属性和mstime属性被用作当前时间的缓存:
struct redisServer
{
//...
//保存了秒级精度的系统当前unix时间戳
time_t unixtime;
//保存了毫秒级精度的系统当前时间戳
long mstime;
//...
};
因为serverCron函数默认100毫秒一次的频率更新unixtime属性和mstime属性,所以这两个属性记录的时间精度不高。
- 服务器只会在打印日志,更新服务器的LRU时钟,决定是否执行持久化任务,计算服务器上线时间(uptime)这类对时间精度要求不高的功能上使用。
- 对于为键设置过期时间,添加慢查询日志这种需要高精度时间的功能来说,服务器还是会再次执行系统调用,从而获取最准确的系统当前时间。
2.2 更新LRU时钟
服务器状态中的lruclock属性保存了服务器的LRU时钟,这个属性和上面介绍的unixtime属性,mstime属性一样,都是服务器缓存时间的一种:
struct redisServer
{
//...
//默认每10秒更新一次
//用于计算键的空转时长
unsigned lruclock:22;
//...
};
每一个Redis对象都会有一个lru属性,这个属性保存了对象最后一次被命令访问的时间:
typedef struct redisObject
{
//...
unsigned lru:32;
//...
}robj;
当服务器要计算一个数据库键的空转时间(也就是数据库对应的值对象的空转时间),程序会用服务器的lruclock属性记录的时间减去对象的lru属性记录的时间,得出的计算结果就是这个对象的空转时间。使用OBJECT IDLETIME命令可以查看命令的空转时间。
127.0.0.1:6379> set key value
OK
127.0.0.1:6379> object idletime key
(integer) 59
127.0.0.1:6379> set key1 value1
OK
127.0.0.1:6379> object idletime key
(integer) 69
127.0.0.1:6379> object idletime key1
(integer) 5
127.0.0.1:6379> set key2 value2
OK
127.0.0.1:6379> object idletime key2
(integer) 2
127.0.0.1:6379>
可以使用info server命令来查看服务器的部分信息:
127.0.0.1:6379> info server
# Server
redis_version:7.0.5
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:45c238455365147b
redis_mode:standalone
os:Linux 3.10.0-1160.95.1.el7.x86_64 x86_64
arch_bits:64
monotonic_clock:POSIX clock_gettime
multiplexing_api:epoll
atomicvar_api:atomic-builtin
gcc_version:4.8.5
process_id:879
process_supervised:no
run_id:732d31e1b78cf9d74d4a38e931f5465659f3fc29
tcp_port:6379
server_time_usec:1709103280016862
uptime_in_seconds:3089976
uptime_in_days:35
hz:10
configured_hz:10
lru_clock:14604464
executable:/home/wy/redisfile/redis-stable/src/./redis-server
config_file:/home/wy/redisfile/redis-stable/redis.conf
io_threads_active:0
2.3 更新服务器每秒执行命令次数
serverCron函数中的trackInstantaneousMetric函数会以每100毫秒一次的频率执行,这个函数的功能是以抽样计算的方式,估算并记录服务器在最近一秒中处理的命令请求的数量,这个值可以通过命令INFO stats的instantaneous_ops_per_sec域查看:
上面命令结果显示,在最近一秒内,服务器处理了大概一个命令。
trackInstantaneousMetric函数和服务器状态中的下面四个属性有关:
struct redisServer
{
//...
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {
//上一次将进行抽样时间
long long last_sample_time; /* Timestamp of last sample in ms */
//上一次抽样,服务器已执行命令数量
long long last_sample_count;/* Count in last sample */
//数组中的每个项都记录了一次抽样结果
long long samples[STATS_METRIC_SAMPLES];
//samples数组索引值,每次抽样将值自增一
//在值等于STATS_METRIC_SAMPLES(16)时,重置为0
//使得samples构成环形数组
int idx;
} inst_metric[STATS_METRIC_COUNT];
//...
};
trackInstantaneousMetric函数每次运行,都会更具last_sample_time记录的上一次抽样时间和服务器当前时间,以及last_sample_count记录的上一次抽样的已执行命令数量和服务器当前的已执行命令的数量,计算出两次trackInstantaneousMetric调用之间,服务器平均每一毫秒处理了多少个命令请求,然后将这个平均值乘以1000,这就得到了服务器在一秒钟内能处理多少个命令请求的估计值,这个估计值会被放到samples环形数组中。
/* Add a sample to the operations per second array of samples. */
void trackInstantaneousMetric(int metric, long long current_reading) {
long long now = mstime();
long long t = now - server.inst_metric[metric].last_sample_time;
long long ops = current_reading -
server.inst_metric[metric].last_sample_count;
long long ops_sec;
ops_sec = t > 0 ? (ops*1000/t) : 0;
server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
ops_sec;
server.inst_metric[metric].idx++;
server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
server.inst_metric[metric].last_sample_time = now;
server.inst_metric[metric].last_sample_count = current_reading;
}
当客户端执行INFO stats命令时,服务器就会调用getInstantaneousMetric函数,根据simples
环形数组中的抽样结果,计算出instantaneous_ops_per_sec的值:
/* Return the mean of all the samples. */
long long getInstantaneousMetric(int metric) {
int j;
long long sum = 0;
for (j = 0; j < STATS_METRIC_SAMPLES; j++)
sum += server.inst_metric[metric].samples[j];
return sum / STATS_METRIC_SAMPLES;
}
根据上面的函数可以知道, instantaneous_ops_per_sec属性的值是通过计算最近STATS_MATRIC_SAMPLES次取样的平均值来计算得出的,它只是一个估算值。
2.4 更新服务器内存峰值记录
服务器状态中的stat_peak_memory属性记录了服务器的内存峰值大小:
struct redisServer
{
//...
//已使用内存峰值
size_t stat_peak_memory; /* Max used memory record */
//...
};
每次serverCron函数执行时,程序都会查看服务器当前使用的内存数量,并于stat_peak_memory保存的数值进行比较,如果当前使用的内存数量比stat_peak_memory属性记录的值要大,那么程序就将当前使用的内存数量记录到stat_peak_memory属性里面。
INFO memory命令的used_memory_peak和used_memory_peak_human两个域分别以两种格式记录了服务器的内存峰值。
2.5 处理SIGTERM信号
在启动服务器时,Redis会为服务器进程的SIGTERM信号关联处理器sigShutdownHandler函数,这个信号处理器负责在服务器接到SIGTERM信号时,打开服务器状态的shutdown_asap标识。
static void sigShutdownHandler(int sig) {
char *msg;
switch (sig) {
case SIGINT:
msg = "Received SIGINT scheduling shutdown...";
break;
case SIGTERM:
msg = "Received SIGTERM scheduling shutdown...";
break;
default:
msg = "Received shutdown signal, scheduling shutdown...";
};
/* SIGINT is often delivered via Ctrl+C in an interactive session.
* If we receive the signal the second time, we interpret this as
* the user really wanting to quit ASAP without waiting to persist
* on disk. */
if (server.shutdown_asap && sig == SIGINT) {
serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
rdbRemoveTempFile(getpid(), 1);
exit(1); /* Exit with an error since this was not a clean shutdown. */
} else if (server.loading) {
serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now.");
exit(0);
}
serverLogFromHandler(LL_WARNING, msg);
server.shutdown_asap = 1;
}
每次serverCron函数运行时,程序都会对服务器状态的shutdown_asap属性进行检查,并根据属性的值决定是否关闭服务器。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//......
/**
* 处理SIGTERM信号
*/
/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
if (server.shutdown_asap) {
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}
//......
}
下图展示了服务器收到SIGTERM信号之后,关闭服务器打印相关日志过程:
从日志中可以看出,服务器关闭自身之前会进行RDB持久化操作。这也就是为什么要为SIGTERM信号关联处理器的原因,如果服务器一收到SIGTERM信号就关闭服务器,就没办法执行持久化操作了。
2.6 管理客户端资源
serverCron函数每次执行都会调用clientCron函数,clientCron函数会对一定数量的客户端进行以下检查:
- 检查客户端与服务器之间连接是否超时(长时间没有和服务端互动),如果长时间没有互动,那么释放这个客户端
- 客户端输入缓冲区是否超过一定限制,如果超过限制,那么释放输入缓冲区,并创建一个默认大小的缓冲区,防止占用内存过多;
- 跟踪最近几秒钟内使用最大内存量的客户端。 这样可以给info命令提供相关信息,从而避免O(n)遍历client列表;
2.7 管理数据库资源
serverCron函数每次执行都会调用databasesCron函数,这个函数会对服务器中的一部分数据进行检查,删除其中的过期键,并在有需要的时候,对字典进行收缩操作。
2.8 执行延迟的BGREWRITEAOF
在服务器执行BGSAVE期间,如果客户端向服务器发来BGREWRITEAOF命令,那么服务器会将BGREWRITEAOF命令执行时间延迟到BGSAVE命令执行完毕之后。
服务器的aof_rewrite_scheduled属性记录了服务器是否延迟了BGREWRITEAOF命令:
struct redisServer
{
//...
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
//...
};
每次serverCron函数执行时,函数都会检查BGSAVE命令或者BGREWRITEAOF命令是否正在执行,如果两个命令都没在执行,并且aof_rewrite_scheduled属性的值为1,那么服务器会执行之前被推迟的BGREWRITEAOF命令。
2.9 检查持久化操作的运行状态
服务器使用rdb_child_pid属性和aof_child_pid属性记录正在执行的BGSAVE命令和BGREWRITEAOF命令子进程的id,这两个属性也可以用于检查BGSAVE命令或者BGREWRITEAOF命令是否正在执行。
如果服务器没有执行BGSAVE命令,那么rdb_child_pid属性值为-1,如果服务器没有执行BGREWRITEAOF命令,aof_child_pid值为-1。
每次serverCron函数执行时,程序都会检查rdb_child_pid和aof_child_pid两个属性的值,只要其中一个属性值不为-1,程序就会执行一次wait3函数,检查子进程是否有信号发来服务器进程:
- 如果有信号到达,那么表示新的RDB文件已经生成完毕(对于BGSAVE命令)或者AOF文件已经重写完毕(对于BGREWRITEAOF命令),服务器需要进行相应的后续操作,比如:新的RDB文件替换现有的RDB文件,或者用重写后的AOF文件替换现有的AOF文件。
- 如果信号没有达到,那么表示持久化操作未完成,程序不动作。
另一方面,如果rdb_child_pid和aof_child_pid两个属性的值都是-1,那么表示服务器没有在进行持久化操作,这种情况下,程序执行下面三个检查:
- 查看是否有BGREWRITEAOF被延迟了,如果有的话,那么开始一次新的BGREWRTEAOF操作。
- 检查服务器的自动保存条件是否已经满足,如果条件满足,并且服务器没有执行其他持久化操作,那么服务器开始一次新的BGSAVE操作(因为条件1可能会引发一次BGREWRITEAOF操作,所以在这个检查中,程序会再次确认服务器是否已经在执行持久化操作)。
- 检查服务器设置的AOF重写条件是否满足,如果条件满足,并且服务器没有执行其他持久化操作,那么服务器将开始新的BGREWRITEAOF操作(同理,这个操作会再次确认服务器是否已经正在执行持久化操作)。
在redis6版本中,Redis使用child_type属性和child_pid属性来保存aof或rdb子进程id。child_type表示子进程id类型, child_pid保存子进程id。
#define CHILD_TYPE_NONE 0
#define CHILD_TYPE_RDB 1
#define CHILD_TYPE_AOF 2
#define CHILD_TYPE_LDB 3
#define CHILD_TYPE_MODULE 4
struct redisServer
{
//...
pid_t child_pid; /* PID of current child */
int child_type; /* Type of current child */
//...
};
serverCron检查的操作原理同上:
/* Return true if there are active children processes doing RDB saving,
* AOF rewriting, or some side process spawned by a loaded module. */
int hasActiveChildProcess() {
return server.child_pid != -1;
}
void checkChildrenDone(void) {
int statloc = 0;
pid_t pid;
if ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {
int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
/* sigKillChildHandler catches the signal and calls exit(), but we
* must make sure not to flag lastbgsave_status, etc incorrectly.
* We could directly terminate the child process via SIGUSR1
* without handling it */
if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {
bysignal = SIGUSR1;
exitcode = 1;
}
if (pid == -1) {
serverLog(LL_WARNING,"waitpid() returned an error: %s. "
"child_type: %s, child_pid = %d",
strerror(errno),
strChildType(server.child_type),
(int) server.child_pid);
} else if (pid == server.child_pid) {
if (server.child_type == CHILD_TYPE_RDB) {
backgroundSaveDoneHandler(exitcode, bysignal);
} else if (server.child_type == CHILD_TYPE_AOF) {
backgroundRewriteDoneHandler(exitcode, bysignal);
} else if (server.child_type == CHILD_TYPE_MODULE) {
ModuleForkDoneHandler(exitcode, bysignal);
} else {
serverPanic("Unknown child type %d for child pid %d", server.child_type, server.child_pid);
exit(1);
}
if (!bysignal && exitcode == 0) receiveChildInfo();
resetChildState();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long) pid);
}
}
/* start any pending forks immediately. */
replicationStartPendingFork();
}
}
void serverCron(void)
{
//...
/* Check if a background saving or AOF rewrite in progress terminated. */
if (hasActiveChildProcess() || ldbPendingChildren())
{
run_with_period(1000) receiveChildInfo();
checkChildrenDone();
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
//...
}
2.10 将AOF缓冲区中的内容写入到AOF文件中
如果服务器开启了AOF持久化功能,并且AOF缓冲区中还有待写入的数据,那么serverCron函数会调用相应的程序,将AOF缓冲区的内如写入到AOF文件中。
2.11 关闭异步客户端
在这一步,服务器会关闭那些输出缓冲区大小超出限制的客户端。
2.12 增加cronloops计数器的值
服务器状态cronloops属性记录了serverCron函数执行的次数。
cronloops属性目前唯一的作用是,在复制模块中实现"每次执行serverCron函数N次就执行一次指定代码"的功能。
伪代码:
if server.cronloops % N == 0:
#执行指定代码
#实际代码
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
三.初始化服务器
一个Redis服务器从启动到能够接受客户端的命令请求,需要经过一系列的初始化和设置过程。比如:初始化服务器状态,接受用户指定的服务器配置,创建相应的数据结构和网络连接等。
3.1 初始化服务器状态结构
初始化服务器的第一步就是创建一个struct redisServer类型的实例变量server作为服务器状态,并为结构中的各个属性设置默认值。
初始化server变量的工作由server.c/initServerConfig函数完成,需要完成的主要工作为:
- 设置服务器的运行id。
- 设置服务器的默认运行频率。
- 设置服务器的默认配置文件路径。
- 设置服务器的运行构架。
- 设置服务器的默认端口号。
- 设置服务器的默认RDB持久化条件和AOF持久化条件。
- 初始化服务器的LRU时钟。
- 创建命令表。
void initServerConfig(void) {
int j;
updateCachedTime(1);
//设置服务器运行ID
getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
server.runid[CONFIG_RUN_ID_SIZE] = '\0';
changeReplicationId();
clearReplicationId2();
//设置服务器默认运行频率
server.hz = CONFIG_DEFAULT_HZ; /* Initialize it ASAP, even if it may get
updated later after loading the config.
This value may be used before the server
is initialized. */
server.timezone = getTimeZone(); /* Initialized by tzset(). */
//默认配置文件路径
server.configfile = NULL;
server.executable = NULL;
//设置服务器运行架构
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.bindaddr_count = 0;
server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
server.ipfd.count = 0;
server.tlsfd.count = 0;
server.sofd = -1;
server.active_expire_enabled = 1;
server.skip_checksum_validation = 0;
server.saveparams = NULL;
server.loading = 0;
server.loading_rdb_used_mem = 0;
server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
server.aof_state = AOF_OFF;
server.aof_rewrite_base_size = 0;
server.aof_rewrite_scheduled = 0;
server.aof_flush_sleep = 0;
server.aof_last_fsync = time(NULL);
atomicSet(server.aof_bio_fsync_status,C_OK);
server.aof_rewrite_time_last = -1;
server.aof_rewrite_time_start = -1;
server.aof_lastbgrewrite_status = C_OK;
server.aof_delayed_fsync = 0;
server.aof_fd = -1;
server.aof_selected_db = -1; /* Make sure the first time will not match */
server.aof_flush_postponed_start = 0;
server.pidfile = NULL;
server.active_defrag_running = 0;
server.notify_keyspace_events = 0;
server.blocked_clients = 0;
memset(server.blocked_clients_by_type,0,
sizeof(server.blocked_clients_by_type));
server.shutdown_asap = 0;
server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
server.next_client_id = 1; /* Client IDs, start from 1 .*/
server.loading_process_events_interval_bytes = (1024*1024*2);
unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
/* Replication related */
server.masterauth = NULL;
server.masterhost = NULL;
//设置默认端口
server.masterport = 6379;
server.master = NULL;
server.cached_master = NULL;
server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_transfer_s = NULL;
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.master_repl_offset = 0;
/* Replication partial resync backlog */
server.repl_backlog = NULL;
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
server.repl_backlog_off = 0;
server.repl_no_slaves_since = time(NULL);
/* Failover related */
server.failover_end_time = 0;
server.force_failover = 0;
server.target_replica_host = NULL;
server.target_replica_port = 0;
server.failover_state = NO_FAILOVER;
/* Client output buffer limits */
for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
/* Linux OOM Score config */
for (j = 0; j < CONFIG_OOM_COUNT; j++)
server.oom_score_adj_values[j] = configOOMScoreAdjValuesDefaults[j];
/* Double constants initialization */
R_Zero = 0.0;
R_PosInf = 1.0/R_Zero;
R_NegInf = -1.0/R_Zero;
R_Nan = R_Zero/R_Zero;
/* Command table -- we initialize it here as it is part of the
* initial configuration, since command names may be changed via
* redis.conf using the rename-command directive. */
server.commands = dictCreate(&commandTableDictType,NULL);
server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
server.zpopminCommand = lookupCommandByCString("zpopmin");
server.zpopmaxCommand = lookupCommandByCString("zpopmax");
server.sremCommand = lookupCommandByCString("srem");
server.execCommand = lookupCommandByCString("exec");
server.expireCommand = lookupCommandByCString("expire");
server.pexpireCommand = lookupCommandByCString("pexpire");
server.xclaimCommand = lookupCommandByCString("xclaim");
server.xgroupCommand = lookupCommandByCString("xgroup");
server.rpoplpushCommand = lookupCommandByCString("rpoplpush");
server.lmoveCommand = lookupCommandByCString("lmove");
/* Debugging */
server.watchdog_period = 0;
/* By default we want scripts to be always replicated by effects
* (single commands executed by the script), and not by sending the
* script to the slave / AOF. This is the new way starting from
* Redis 5. However it is possible to revert it via redis.conf. */
server.lua_always_replicate_commands = 1;
/* Client Pause related */
server.client_pause_type = CLIENT_PAUSE_OFF;
server.client_pause_end_time = 0;
initConfigValues();
}
initServerConfig函数设置的服务器状态属性基本都是一些整数,浮点数,或者字符串属性,除了命令表之外,initServerConfig函数没有创建服务器状态的其他数据结构。数据库、慢查询日志、Lua环境、共享对象这些数据结构在之后的步骤才会被创建出来。
当initServerConfig函数执行完毕后,服务器就可以进入初始化的第二阶段——载入配置选项。
3.2 载入配置选项
在启动服务器时,用户可以通过给定配置参数或者指定配置文件来修改服务器的默认配置。
举个例子:
1. 如果我们在终端输入:redis-server --port 10086命令,那么我们就通过给定配置参数的方式,修改了服务器的运行端口号
2. 如果我们在终端输入:redis-server redis.conf,并且配置文件中设置databases 32(服务器的数据库数量设置为32个),rdbcompression no(关闭RDB文件压缩功能),那么我们就通过指定配置文件的方式修改了服务器的数据库数量,以及RDB持久化模块的压缩功能。
服务器在用initServerConfig函数初始化完server变量后,就会开始载入用户给定的配置参数和配置文件,并根据用户给定的配置,对server变量相关属性值进行修改。
int main(int argc, char **argv) {
//...
// 检查用户是否指定了配置文件,或者配置选项
if (argc >= 2) {
j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
/* Handle special options --help and --version */
// 处理特殊选项 -h 、-v 和 --test-memory
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0) version();
if (strcmp(argv[1], "--help") == 0 ||
strcmp(argv[1], "-h") == 0) usage();
if (strcmp(argv[1], "--test-memory") == 0) {
if (argc == 3) {
memtest(atoi(argv[2]),50);
exit(0);
} else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
exit(1);
}
}
/* Parse command line options
* Precedence wise, File, stdin, explicit options -- last config is the one that matters.
*
* First argument is the config file name? */
// 如果第一个参数(argv[1])不是以 "--" 开头
// 那么它应该是一个配置文件
if (argv[1][0] != '-') {
/* Replace the config file in server.exec_argv with its absolute path. */
server.configfile = getAbsolutePath(argv[1]);
zfree(server.exec_argv[1]);
server.exec_argv[1] = zstrdup(server.configfile);
j = 2; // Skip this arg when parsing options
}
// 对用户给定的其余选项进行分析,并将分析所得的字符串追加稍后载入的配置文件的内容之后
// 比如 --port 6380 会被分析为 "port 6380\n"
while(j < argc) {
/* Either first or last argument - Should we read config from stdin? */
if (argv[j][0] == '-' && argv[j][1] == '\0' && (j == 1 || j == argc-1)) {
config_from_stdin = 1;
}
/* All the other options are parsed and conceptually appended to the
* configuration file. For instance --port 6380 will generate the
* string "port 6380\n" to be parsed after the actual config file
* and stdin input are parsed (if they exist). */
else if (argv[j][0] == '-' && argv[j][1] == '-') {
/* Option name */
if (sdslen(options)) options = sdscat(options,"\n");
options = sdscat(options,argv[j]+2);
options = sdscat(options," ");
} else {
/* Option argument */
options = sdscatrepr(options,argv[j],strlen(argv[j]));
options = sdscat(options," ");
}
j++;
}
// 载入配置文件, options 是前面分析出的给定选项
loadServerConfig(server.configfile, config_from_stdin, options);
if (server.sentinel_mode) loadSentinelConfigFromQueue();
sdsfree(options);
}
//...
}
- 如果用户为这些属性的相应选项知道了新值,那么服务器就使用用户指定的值来更新相应属性。
- 如果用户没有为属性的相应选项设置新的值,那么服务器就沿用之前initServerConfig函数为属性设置的默认值。
服务器在载入用户指定的配置选项,并对server状态进行更新后,服务器就可以进行初始化的第三阶段——初始化服务器数据结构。
3.3 初始化服务器数据结构
在之前执行initServerConfig函数初始化server状态时,程序只创建了命令表一个数据结构,不过除了命令表之外,服务器状态还包含其他数据结构:
- server.clients链表,这个链表记录了所有与服务器相连的客户端的状态结构,链表的每一个节点都包含一个redisClient结构实例。
- server.db数组,数组中包含了服务器的所有数据库。
- 用于保存频道订阅信息的server.pushsub_channels字典,以及用于保存模式订阅信息的server.pubsub_patterns链表。
- 用于执行Lua脚本的Lua环境server.lua。
- 用于保存慢查询日志的server.slowlog属性。
当初始化服务器进行到这一步,服务器将调用initServer函数,为以上提到的数据结构分配内存,并在有需要时,为这些数据结构设置或者关联初始化值。
服务器到现在才初始化数据结构的原因在于,服务器必须先载入用户指定的配置选项,然后才能正确的对数据结构进行初始化。如果在执行initServerConfig函数时就对数据结构进行初始化,那么一旦用户通过配置选项修改了和数据结构相关的服务器状态属性,服务器就要重新调整和修改已创建的数据结构。为了避免出现这种麻烦的的情况,服务器选择了将server状态初始化分为两步,initServerConfig函数主要负责初始化一般属性,而initServer函数主要负责初始化数据结构。
除了初始化服务器外,initServer还进行了一些非常重要的设置操作,其中包括:
- 为服务器设置进程信号处理器。
- 创建共享对象:这些对象包含Redis服务器经常用到的一些值,比如:包含"OK"回复的字符串对象,包含"ERR"回复的字符串对象,包含整数1到10000的字符串对象等,服务器通过重用这些共享对象来避免反复创建相同的对象。
- 打开服务器的监听端口,并为监听套接字关联连接应答事件处理器,等待服务器正式运行时接受客户端的连接。
- 为serverCron函数创建时间事件,等待服务器正是运行时执行serverCron函数。
- 如果AOF持久化功能已经打开,那么打开现有的AOF文件,如果AOF文件不存在,那么创建并打开一个新的AOF文件,为AOF写入做好准备。
- 初始化服务器的后台I/O模块(bio),为将来的I/O操作做好准备。
当initServer函数执行完毕之后,服务器将用ASCII字符在日志中打印出Redis的图标,以及Redis的版本信息。
3.4 还原数据库状态
在完成了对服务器状态server变量的初始化之后,服务器需要载入RDB文件或者AOF文件,并根据文件记录的内容来还原服务器的数据库状态。
根据服务器是否启用AOF持久化功能,服务器载入数据时所使用的目标文件会有所不同:
- 如果服务器启用了AOF持久化功能,那么服务器使用AOF文件来还原数据库状态。
- 如果服务器没有启用AOF持久化功能,那么服务器使用RDB文件来还原数据库状态。
当服务器完成数据库状态还原工作之后,服务器将在日志中打印出载入文件并还原数据库状态所耗费的时长。
3.5 执行事件循环
在初始化最后一步,服务器将打印出以下日志:
并开始执行服务器的事件循环(loop)。
至此,服务器的初始化工作完成,服务器现在可以接受客户端的连接请求,并处理客户端发来的命令请求。