Redis中单机数据库的实现

时间:2022-04-04 22:09:08

1. 内存操作层 zmalloc 系接口

redis为了优化内存操作, 封装了一层内存操作接口. 默认情况下, 其底层实现就是最简朴的libc中的malloc系列接口. 如果有定制化需求, 可以通过配置方式, 将底层内存操作的实现更换为tcmallocjemalloc库.

redis封装的这一层接口, 其接口定义与默认实现在zmalloc.hzmalloc.c中. 其默认实现支持在O(1)复杂度的情况下返回内存块的大小. 具体实现上的思路也十分简朴: 就是在内存块头部多分配一个long的空间, 将内存块的大小记录在此.

Redis中单机数据库的实现

zmalloc.c中可以看到, 底层接口的具体实现可选的有tcmallocjemalloc两种:

/* Explicitly override malloc/free etc when using tcmalloc. */
#if defined(USE_TCMALLOC)
#define malloc(size) tc_malloc(size)
#define calloc(count,size) tc_calloc(count,size)
#define realloc(ptr,size) tc_realloc(ptr,size)
#define free(ptr) tc_free(ptr)
#elif defined(USE_JEMALLOC)
#define malloc(size) je_malloc(size)
#define calloc(count,size) je_calloc(count,size)
#define realloc(ptr,size) je_realloc(ptr,size)
#define free(ptr) je_free(ptr)
#define mallocx(size,flags) je_mallocx(size,flags)
#define dallocx(ptr,flags) je_dallocx(ptr,flags)
#endif

内存分配接口的实现如下, 从代码中可以看出, 其头部多分配了PREFIX_SIZE个字节用于存储数据区的长度.

void *zmalloc(size_t size) {
void *ptr = malloc(size+PREFIX_SIZE); if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(zmalloc_size(ptr));
return ptr;
#else
*((size_t*)ptr) = size;
update_zmalloc_stat_alloc(size+PREFIX_SIZE);
return (char*)ptr+PREFIX_SIZE;
#endif
}

2. 事件与IO多路复用

redis中的事件处理机制和内存分配一样, 也是糊了一层接口. 其底层实现是可选的. 默认情况下, 会自动选取当前OS平台上速度最快的多路IO接口, 比如在Linux平台上就是epoll, 在Sun/Solaris系列平台上会选择evport, 在BSD/Mac OS平台上会选择kqueue, 实再没得选了, 会使用POSIX标准的select接口. 保证起码能跑起来

事件处理对各个不同底层实现的包裹, 分别在ae_epoll.c, ae_evport.c, ae_kqueue, ae_select.c中, 事件处理器的定义与实现在ae.hae.c中.

ae.h中, 定义了

  1. 事件处理回调函数的别名: ae***Proc函数指针类型别名
  2. 文件事件与定时事件两种结构: aeFileEventaeTimeEvent
  3. 结构aeFiredEvent, 代表被触发的事件
  4. 结构aeEventLoop, 一个事件处理器. 包含一个事件循环.

分别如下:

#define AE_NONE 0       /* No events registered. */
#define AE_READABLE 1 /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the
READABLE event already fired in the same event
loop iteration. Useful when you want to persist
things to disk before sending replies, and want
to do that in a group fashion. */ // ..... /* Types and data structures */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); /* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc; // 可读回调
aeFileProc *wfileProc; // 可写回调
void *clientData; // 自定义数据
} aeFileEvent; /* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */ // 定时事件的编号
long when_sec; /* seconds */ // 触发时间(单位为秒)
long when_ms; /* milliseconds */ // 触发时间(单位为毫秒)
aeTimeProc *timeProc; // 定时回调
aeEventFinalizerProc *finalizerProc; // 事件自杀析构回调
void *clientData; // 自定义数据
struct aeTimeEvent *prev; // 链表的前向指针与后向指针
struct aeTimeEvent *next;
} aeTimeEvent; /* A fired event */
typedef struct aeFiredEvent { // 描述了一个被触发的事件
int fd; // 文件描述符
int mask; // 触发的类型
} aeFiredEvent; /* State of an event based program */
typedef struct aeEventLoop { // 描述了一个热火朝天不停循环的事件处理器
int maxfd; // 当前监控的文件事件中, 文件描述符的最大值
int setsize; // 事件处理器的最大容量(能监听的文件事件的个数)
long long timeEventNextId; // 下一个到期要执行的定时事件的编号
time_t lastTime; // 上一次有回调函数执行的时间戳, 用以防止服务器时间抖动或溢出
aeFileEvent *events; // 所有注册的文件事件
aeFiredEvent *fired; // 所有被触发的文件事件
aeTimeEvent *timeEventHead; // 定时事件链表
int stop; // 急停按钮
void *apidata; // 底层多路IO接口所需的额外数据
aeBeforeSleepProc *beforesleep; // 一次循环结束, 所有事件处理结束后, 要执行的回调函数
aeBeforeSleepProc *aftersleep; // 一次循环开始, 在执行任何事件之前, 要执行的回调函数
} aeEventLoop;

对于文件事件来说, 除了可读AE_READABLE与可写AE_WRITABLE两种监听触发方式外, 还有一种额外的AE_BARRIER触发方式. 若监听文件事件时, 将AE_BARRIERAE_WRITABLE组合时, 保证若当前文件如果正在处理可读(被可读触发), 就不再同时触发可写. 在一些特殊场景下这个特性是比较有用的.

事件处理器aeEventLoop中, 对于文件事件的处理, 走的都是老套路. 这里需要注意的是事件处理器中, 定时事件的设计:

  1. 所有定时事件, 像糖葫芦一样, 串成一个链表
  2. 定时事件不是严格定时的, 定时事件中不适宜执行耗时操作

事件处理器的核心接口有以下几个:

// 创建一个事件处理器实例
aeEventLoop *aeCreateEventLoop(int setsize);
// 销毁一个事件处理器实例
void aeDeleteEventLoop(aeEventLoop *eventLoop);
// 事件处理器急停
void aeStop(aeEventLoop *eventLoop);
// 创建一个文件事件, 并添加进事件处理器中
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
// 从事件处理器中删除一个文件事件.
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
// 通过文件描述符, 获取事件处理器中, 该文件事件注册的触发方式(可读|可写|BARRIER)
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
// 创建一个定时事件, 并挂在事件处理器的时间事件链表中. 返回的是创建好的定时事件的编号
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
// 删除事件处理器中的某个时间事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
// 调用多路IO接口, 处理所有被触发的文件事件, 与所有到期的定时事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
// tricky接口.内部调用poll接口, 等待milliseconds毫秒, 或直至指定fd上的指定事件被触发
int aeWait(int fd, int mask, long long milliseconds);
// 事件处理器启动器
void aeMain(aeEventLoop *eventLoop);
// 获取底层路IO接口的名称
char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);

核心代码如下:

首先是启动器, 启动器负责三件事:

  1. 响应急停
  2. 执行beforesleep回调, 如果设置了该回调的值, 那么在每次事件循环之前, 该函数会被执行
  3. 一遍一遍的轮大米, 调用aeProcessEvents, 即事件循环
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}

然后是事件循环

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents; // 无事快速退朝
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
// 在 "事件处理器中至少有一个文件事件" 或 "以阻塞形式执行事件循环, 且处理定时事件"
// 时, 进入该分支
// 这里有一点绕, 其实就是, 如果
// "事件处理器中没有监听任何文件事件", 且,
// "执行事件循环时指明不执行定时事件, 或虽然指明了执行定时事件, 但是要求以非阻塞形式运行事件循环"
// 都不会进入该分支, 不进入该分支时, 分两种情况:
// 1. 没监听任何文件事件, 且不执行定时事件, 相当于直接返回了
// 2. 没监听任何文件事件, 执行定时事件, 但要求以非阻塞形式执行. 则立即执行所有到期的时间事件, 如果没有, 则相当于直接返回 // 第一步: 计算距离下一个到期的定时事件, 还有多长时间. 计该时间为tvp
// 即便本次事件循环没有指明要处理定时事件, 也计算这个时间
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms; aeGetTime(&now_sec, &now_ms);
tvp = &tv; /* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms; if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
} // 第二步: 以tvp为超时时间, 调用多路IO接口, 获取被触发的文件事件, 并处理文件事件
// 如果tvp为null, 即当前事件处理器中没有定时事件, 则调用aeApiPoll的超时时间是无限的
numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop); for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */ // 通常情况下, 如果一个文件事件, 同时被可读与可写同时触发
// 都是先执行可读回调, 再执行可写回调
// 但如果事件掩码中带了AE_BARRIER, 就会扭转这个行为, 先执行可写回调, 再执行可读加高
int invert = fe->mask & AE_BARRIER; if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
} if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
} if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
} processed++;
}
} // 按需执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */
}

3. 网络库

anet.hanet.c中, 对*nix Tcp Socket相关接口进行了一层薄封装, 基本上就是在原生接口的基础上做了一些错误处理, 并整合了一些接口调用. 这部分代码很简单, 甚至很无聊.

4. 单机数据库的启动

在Redis进程中, 有一个全局变量 struct redisServer server, 这个变量描述的就是Redis服务端. (结构体定义位于server.h中, 全局变量的定义位于server.c中), 结构体的定义如下:

struct redisServer {
// ...
aeEventLoop * el; // 事件处理器
// ...
redisDb * db; // 数据库数组.
// ...
int ipfd[CONFIG_BINDADDR_MAX]; // listening tcp socket fd
int ipfd_count;
int sofd; // listening unix socket fd
// ...
list * clients; // 所有活动客户端
list * clients_to_close; // 待关闭的客户端
list * clients_pending_write; // 待写入的客户端
// ...
client * current_clients; // 当前客户端, 仅在crash报告时才使用的客户端
int clients_paused; // 暂停标识
// ...
list * clients_waiting_acks; // 执行WAIT命令的客户端
// ...
uint64_t next_client_id; // 下一个连接至服务端的客户端的编号, 自增编号
}

这个结构体中的字段实再是太多了, 这里仅列出一些与单机数据库相关的字段

结构体中定义了一个指针字段, 指向struct redisDb数组类型. 这个结构的定义如下(也位于server.h中)

typedef struct redisDb {
dict *dict; /* 数据库的键空间 */
dict *expires; /* 有时效的键, 以及对应的过期时间 */
dict *blocking_keys; /* 队列: 与阻塞操作有关*/
dict *ready_keys; /* 队列: 与阻塞操作有关 */
dict *watched_keys; /* 与事务相关 */
int id; /* 数据库编号 */
long long avg_ttl; /* 统计值 */
} redisDb;

在介绍阻塞操作与事务之前, 我们只需要关心三个字段:

  1. dict 数据库中的所有k-v对都存储在这里, 这就是数据库的键空间
  2. expires 数据库中所有有时效的键都存储在这里
  3. id 数据库的编号

从数据结构上也能看出来, 单机数据库的启动其实需要做以下的事情:

  1. 初始化全局变量server, 并初始化server.db数组, 数组中的每一个元素就是一个数据库
  2. 创建一个事件处理器, 用于监听来自用户的命令, 并进行处理

server.c文件中的main函数跟下去, 就能看到上面两步, 这里的代码很繁杂, 下面只选取与单机数据库启动相关的代码进行展示:

int main(int argc, char **argv) {
// 初始化基础设计
// ... server.sentinel_mode = checkForSentinelMode(argc,argv); // 从命令行参数中解析, 是否以sentinel模式启动server initServerConfig(); // 初始化server配置
moduleInitModulesSystem(); // 初始化Module system // ... if (server.sentinel_mode) { // sentinel相关逻辑
initSentinelConfig();
initSentinel();
} // 如果以 redis-check-rdb 或 redis-check-aof 模式启动server, 则就是调用相应的 xxx_main()函数, 然后终止进程就行了
if (strstr(argv[0],"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv,NULL);
else if (strstr(argv[0],"redis-check-aof") != NULL)
redis_check_aof_main(argc,argv); // 处理配置文件与命令行参数, 并输出欢迎LOGO...
// ... // 判断运行模式, 如果是后台运行, 则进入daemonize()中, 使进程守护化
server.supervised = redisIsSupervised(server.supervised_mode);
int background = server.daemonize && !server.supervised;
if (background) daemonize(); // 单机数据库启动的核心操作: 初始化server
initServer(); // 琐事: 创建pid文件, 设置进程名, 输出ASCII LOGO, 检查TCP backlog队列等
//... if(!server.sentinel_mode) {
// 加载自定义Module
// ...
} else {
sentinelIsRunning();
} // 检查内存状态, 如有必要, 输出警告日志
// ... // 向事件处理器注册事前回调与事后回调, 并开启事件循环
// 至此, Redis服务端已经启动
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el) // 服务端关闭的善后工作
aeDeleteEventLoop(server.el);
return 0;
}

main函数中能大致了解启动流程, 也基本能猜出来, 在最核心的initServer函数的调用中, 至少要做两件事:

  1. 初始化事件处理器server.el
  2. 初始化服务端的数据库server.db
  3. 初始化监听socket, 并将监听socket fd加入事件处理回调

如下:

void initSetver(void) {
// 处理 SIG_XXX 信号
// ... // 初始化server.xxx下的一部分字段, 值得关注的有:
server.pid = getpid()
server.current_client = NULL;
server.clients = listCreate();
server.clients_to_close = listCreate();
//...
server.clients_pending_write = listCreate();
//...
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
//...
server.clients_paused = 0
//... // 创建全局共享对象
createSharedObjects(); // 尝试根据最大支持的客户端连接数, 调整最大打开文件数
adjustOpenFilesLimit(void) // 创建事件处理器
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR)
if (server.el == NULL) {
// ...
exit(1);
} // 初始化数据库数组
server.db = zmalloc(sizeof(redisDb) * server.dbnum); // 监听tcp端口, tcp 监听文件描述符存储在 server.ipfd 中
if (server.port != 0 && listenToPort(server.port, server.ipfd, &server.ipfd_count) == C_ERR) exit(1); // 监听unix端口
if (server.unixsocket != NULL) {
//...
} // 如果没有任何tcp监听fd存在, abort
if (server.ipfd_count == 0 && server.sofd < 0) {
//...
exit(1);
} // 初始化数据库数组中的各个数据库
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType, NULL); // 初始化键空间
server.db[j].expires = dictCreate(&keyptrDictType, NULL); // 初始化有时效的键字典. key == 键, value == 过期时间戳
server.db[j].blocking_keys = dictCreate(&keylistDictType, NULL);// 初始化阻塞键字典.
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType, NULL); // 初始化解除阻塞键字典
server.db[j].watched_keys = dictCreate(&keylistDictType, NULL); // 初始化与事务相关的一个字典
server.db[j].id = j; // 初始化数据库的编号
server.db[j].avg_ttl = 0; // 初始化统计数据
} // 其它一大堆细节操作
// ... // 向事件处理器中加一个定时回调, 这个回调中处理了很多杂事. 比如清理过期客户端连接, 过期key等
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
//...
exit(1);
} // 将tcp与unix监听fd添加到事件处理器中
// 回调函数分别为: acceptTcpHandler和acceptUnixHandler
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.ipfd file event");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el, server.sofd, AE_READABLE, acceptUnixHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.sofd file event.");
} // 注册一个额外的事件回调: 用于唤醒一个被module操作阻塞的客户端
// ... // 如有需要, 打开AOF文件
// ... // 对32位机器, 且没有配置maxmemory时, 自动设置为3GB
// ... // 其它与单机数据库无关的操作
// ...
}

以上就是单机数据库的启动流程

总结:

0. Redis服务端进程中, 以server这个全局变量来描述服务端

0. server全局变量中, 持有着所有数据库的指针(server.db), 一个事件处理器(server.el), 与所有与其连接的客户端(server.clients), 监听的tcp/unix socket fd(server.ipfd, server.sofd)

0. 在Redis服务端启动过程中, 会初始化监听的tcp/unix socket fd, 并把它们加入到事件处理器中, 相应的事件回调分别为acceptTcpHandleracceptUnixHandler

5. 客户端与Redis服务端建立连接的过程

上面已经讲了Redis单机数据库服务端的启动过程, 下面来看一看, 当一个客户端通过tcp连接至服务端时, 服务端会做些什么. 显然要从listening tcp socket fd在server.el中的事件回调开始看起.

逻辑上来讲, Redis服务端需要做以下几件事:

  1. 建立连接, 接收请求
  2. 协议交互, 命令处理

这里我们先只看建立连接部分, 下面是acceptTcpHandler函数, 即listening tcp socket fd的可读事件回调:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata); while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 建立数据连接
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd,0,cip); // 处理请求
}
}

这个函数中只做了两件事:

  1. 建立tcp数据连接
  2. 调用acceptCommonHandler

下面是acceptCommonHandler

#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c; // 创建一个client实例
if ((c = createClient(fd)) == NULL) {
// ...
return;
}
// 如果设置了连接上限, 并且当前数据连接数已达上限, 则关闭这个数据连接, 退出, 什么也不做
if (listLength(server.clients) > server.maxclients) {
//...
return;
} // 如果服务端运行在保护模式下, 并且没有登录密码机制, 那么不接受外部tcp请求
if (server.protected_mode &&
server.bindaddr_count == 0 &&
server.requirepass == NULL &&
!(flags & CLIENT_UNIX_SOCKET) &&
ip != NULL)
{
if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
// ...
return;
}
} server.stat_numconnections++;
c->flags |= flags;
}

这里可以看到, 对每一个数据连接, Redis中将其抽象成了一个 struct client 结构, 所有创建struct client实例的细节被隐藏在createClient函数中, 下面是createClient的实现:

client *createClient(int fd) {
client * c = zmalloc(sizeof(client)) if (fd != -1) {
anetNonBlock(NULL, fd); // 设置非阻塞
anetEnableTcpNoDelay(NULL, fd); // 设置no delay
if (server.tcpkeepalive) anetKeepAlive(NULL, fd, server.tcpkeepalive); // 按需设置keep alive
if (aeCreateFileEvent(server.el, fd, AE_READABLE, readQueryFromClient, c) == AE_ERR) { // 注册事件回调
close(fd); zfree(c); return NULL;
}
} selectDb(c, 0) // 默认选择第一个数据库 uint64_t client_id;
atomicGetIncr(server.next_client_id, client_id, 1); // 原子自增 server.next_client_id // 其它字段初始化
c->id = client_id;
//... // 把这个client实例添加到 server.clients尾巴上去
if (fd != -1) listAddNodeTail(server.clients,c); return c;
}

6 Redis通信协议

6.1 通信方式

客户端与服务器之间的通信方式有两种

  1. 一问一答式的(request-response pattern). 由客户端发起请求, 服务端处理逻辑, 然后回发回应
  2. 若客户端订阅了服务端的某个channel, 这时通信方式为单向的传输: 服务端会主动向客户端推送消息. 客户端无需发起请求

在单机数据库范畴中, 我们先不讨论Redis的发布订阅模式, 即在本文的讨论范围内, 客户端与服务器的通信始终是一问一答式的. 这种情况下, 还分为两种情况:

  1. 客户端每次请求中, 仅包含一个命令. 服务端在回应中, 也仅给出这一个命令的执行结果
  2. 客户端每次请求中, 包含多个有序的命令. 服务端在回应中, 按序给出这多个命令的执行结果

后者在官方文档中被称为 pipelining

6.2 通信协议

自上向下描述协议:

  1. 协议描述的是数据
  2. 数据有五种类型. 分别是短字符串, 长字符串, 错误信息, 数值, 数组. 其中数组是复合类型. 协议中, 数据与数据之间以\r\n两个字节作为定界符
  3. 短字符串: +Hello World\r\n+开头, 末尾的\r\n是定界符, 所以显然, 短字符串本身不支持表示\r\n, 会和定界符冲突
  4. 错误信息: -Error message\r\n-开头, 末尾的\r\n是定界符. 显然, 错误信息里也不支持包含\r\n
  5. 数值: :9527\r\n:开头, ASCII字符表示的数值, 仅支持表示整数
  6. 长字符串: $8\r\nfuck you\r\n 先以$开头, 后面跟着字符串的总长度, 以ASCII字符表示. 再跟一个定界符, 接下来的8个字节就是长字符串的内容, 随后再是一个定界符.
  7. 数组: 这是一个复合类型, 以*开头, 后面跟着数组的容量, 以ASCII字符表示, 再跟一个定界符. 然后后面跟着多个其它数据. 比如 *2\r\n$3\r\nfoo\r\n+Hello World\r\n表示的是一个容量为2的数组, 第一个数组元素是一个长字符串, 第二个数组元素是一个短字符串.

注意:

0. 一般情况下, 协议体是一个数组

0. 长字符串在表示长度为0的字符串时, 是$0\r\n\r\n

0. 还有一个特殊值, $-1\r\n, 它用来表示语义中的null

协议本身是支持二进制数据传输的, 只需要将二进制数据作为长字符串传输就可以. 所以这并不能算是一个严格意义上的字符协议. 但如果传输的数据多数是可阅读的数据的话, 协议本身的可读性是很强的

6.3 请求体

客户端向服务端发送请求时, 语法格式基本如下:

6.3.1 单命令请求

单命令请求体, 就是一个数组, 数组中将命令与参数各作为数组元素存储, 如下是一个 LLEN mylist 的请求

*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n

这个数组容量为2, 两个元素均为长字符串, 内容分别是LLENmylist

6.3.2 多命令请求(pipelining request)

多命令请求体, 直接把多个单命令请求体拼起来就行了, 每个命令是一个数组, 即请求体中有多个数组

6.3.3 内联命令

如果按协议向服务端发送请求, 可以看到, 请求体的第一字节始终要为*. 为了方便一些弱客户端与服务器交互, REDIS支持所谓的内联命令. 内联命令仅是请求体的另一种表达方式.

内联命令的使用场合是:

  1. 手头没有redis-cli
  2. 手写请求体太麻烦
  3. 只想做一些简单操作

这种情况下, 直接把适用于redis-cli的字符命令, 发送给服务端就行了. 内联命令支持通过telnet发送.

6.4 回应体

Redis服务端接收到任何一个有效的命令时, 都会给服务端写回应. 请求-回应的最小单位是命令, 所以对于多命令请求, 服务端会把这多个请求的对应的回应, 按顺序返回给客户端.

回应体依然遵守通信协议规约.

6.5 手撸协议示例

单命令请求

[root@localhost ~]# echo -e '*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n' | nc localhost 6379
:2
[root@localhost ~]#

多命令请求

[root@localhost ~]# echo -e '*1\r\n$4\r\nPING\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n' | nc localhost 6379
+PONG
:4
:5
:6
[root@localhost ~]#

内联命令请求

[root@localhost ~]# echo -e 'PING' | nc localhost 6379
+PONG
[root@localhost ~]# echo -e 'INCR counter' | nc localhost 6379
:7
[root@localhost ~]# echo -e 'INCR counter' | nc localhost 6379
:8
[root@localhost ~]#

通过telnet发送内联命令请求

[root@localhost ~]# telnet localhost 6379
Trying ::1...
Connected to localhost.
Escape character is '^]'. PING
+PONG INCR counter
:9 INCR counter
:10 INCR counter
:11 ^]
telnet> quit
Connection closed.
[root@localhost ~]#

6.6 协议总结及注意事项

优点:

  1. 可读性很高, 也可以容纳二进制数据(长字符串类型)
  2. 协议简单, 解析容易, 解析性能也很高. 得益于各种数据类型头字节的设计, 以及长字符串类型自带长度字段, 所以解析起来基本飞快, 就算是二流程序员写出来的协议解析器, 跑的也飞快
  3. 支持多命令请求, 特定场合下充分发挥这个特性, 可以提高命令的吞吐量
  4. 支持内联命令请求. 这种请求虽然限制颇多(不支持二进制数据类型, 不支持\r\n这种数据, 不支持多命令请求), 但对于运维人员来说, 可读性更高, 对阅读更友好

注意事项:

  1. 非内联命令请求, 请求体中的每个数据都是长字符串类型. 即便是在表达数值, 也需要写成长字符串形式(数值的ASCII表示). 这给服务端解析协议, 以及区分请求命令是否为内联命令带来了很大的便利
  2. 短字符串, 数值, 错误信息这三种数据类型, 仅出现在回应体中

7. 服务端处理请求及写回应的过程

当客户端与服务端建立连接后, 双方就要进行协议交互. 简单来说就是以下几步:

  1. 客户端发送请求
  2. 服务端解析请求, 进行逻辑处理
  3. 服务端写回包

显然, 从三层(tcp或unix)上接收到的数据, 首先要解析成协议数据, 再进一步解析成命令, 服务端才能进行处理. 三层上的协议是流式协议, 一个请求体可能要分多次才能完整接收, 这必然要涉及到一个接收缓冲机制. 先来看一看struct client结构中与接收缓冲机制相关的一些字段:

type struct client {
uint64_t id; /* Client incremental unique ID. */ // 客户端ID, 自增, 唯一
int fd; /* Client socket. */ // 客户端数据连接的底层文件描述符
redisDb *db; /* Pointer to currently SELECTed DB. */ // 指向客户端当前选定的DB
robj *name; /* As set by CLIENT SETNAME. */ // 客户端的名称. 由命令 CLIENT SETNAME 设定
sds querybuf; /* Buffer we use to accumulate client queries. */ // 请求体接收缓冲区
sds pending_querybuf; /* If this is a master, this buffer represents the // cluster特性相关的一个缓冲区, 暂忽视
yet not applied replication stream that we
are receiving from the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ // 近期(100ms或者更长时间中)接收缓冲区大小的峰值
int argc; /* Num of arguments of current command. */ // 当前处理的命令的参数个数
robj **argv; /* Arguments of current command. */ // 当前处理的命令的参数列表
struct redisCommand *cmd, *lastcmd; /* Last command executed. */ // 当前正在处理的命令字, 以及上一次最后执行的命令字
int reqtype; /* Request protocol type: PROTO_REQ_* */ // 区分客户端是否以内联形式上载请求
int multibulklen; /* Number of multi bulk arguments left to read. */ // 命令字剩余要读取的参数个数
long bulklen; /* Length of bulk argument in multi bulk request. */ // 当前读取到的参数的长度
} client;

7.1 通过事件处理器, 在传输层获取客户端发送的数据

我们先从宏观上看一下, 服务端是如何解析请求, 并进行逻辑处理的. 先从服务端数据连接的处理回调readQueryFromClient看起:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen;
UNUSED(el);
UNUSED(mask); readlen = PROTO_IOBUF_LEN;
// 对于大参数的优化
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
// 如果读取到了一个超大参数, 那么先把这个超大参数读进来
// 保证缓冲区中超大参数之后没有其它数据
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
} // 接收数据
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
// ... sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
// client的缓冲区中数据如果超长了, 最有可能的原因就是服务端处理能力不够, 数据堆积了
// 或者是受恶意攻击了
// 这种情况下, 干掉这个客户端
// ...
return;
} // 处理缓冲区的二进制数据
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
// cluster特性相关的操作, 暂时忽略
// ...
}
}

这里有一个优化写法, 是针对特大参数的读取的. 在介绍这个优化方法之前, 先大致的看一下作为单机数据库, 服务端请求客户端请求的概览流程(你可能需要在阅读完以下几节之后再回头来看这里):

一般情况下, 请求体的长度不会超过16kb(即使包括多个命令), 那么处理流程(假设请求是协议数据, 而非内联命令)是这样的:

Redis中单机数据库的实现

而正常情况下, 一个请求体的长度不应该超过16kb, 而如果请求体的长度超过16kb的话, 有下面两种可能:

  1. 请求中的命令数量太多, 比如平均一个命令及其参数, 占用100字节, 而这个请求体中包含了200个命令
  2. 请求中命令数量不多, 但是有一个命令携带了超大参数, 比如, 只是一个SET单命令请求, 但其值参数是一个长度为40kb的二进制数据

附带多个命令, 导致请求体超过16kb时

在第一种情况下, 如果满足以下条件的话, 这个请求会被服务端当作两个请求去处理:

初次数据连接的可读回调接收了16kb的数据, 这16kb的数据恰好是多个请求. 没有请求被截断! 即是一个20kb的请求体, 被天然的拆分成了两个请求, 第一个请求恰好16kb, 一字节不多, 一字节不少, 第二个请求是剩余的4kb. 那么服务端看来, 这就是两个多命令请求

这种情况极其罕见, Redis中没有对这种情况做处理.

一般情况下, 如果发送一个20kb的多命令请求, 且请求中没有超大参数的话(意味着命令特别多, 上百个), 那么总会有一个命令被截断, 如果被阶段, 第一次的处理流程就会在处理最后一个被截断的半截命令时, 在如图红线处流程终止, 而仅在第二次处理流程, 即处理剩余的4kb数据时, 才会走到蓝线处. 即两次收包, 一次写回包:

Redis中单机数据库的实现

附带超大参数, 导致请求体超过16kb

在第二种情况下, 由于某几个参数特别巨大(大于PROTO_MBULK_BIG_ARG宏的值, 即32kb), 导致请求体一次不能接收完毕. 势必也要进行多次接收. 本来, 朴素的思想上, 只需要在遇到超大参数的时候, 调整16kb这个阈值即可. 比如一个参数为40kb的二进制数据, 那么在接收到这个参数时, 破例调用read时一次性把这个参数读取完整即可. 除此外不需要什么特殊处理.

即是, 处理流程应当如下:

Redis中单机数据库的实现

但这样会有一个问题: 当在processMultiBulkBuffer中, 将超大参数以string对象的形式放在client->argv中时, 创建这个string对象, 底层是拷贝操作:

int processMultibulkBuffer(client * c) {
// ....
c->argv[c->argc++] = createStringObject(c->querybuf+pos, c->bulklen);
// ....
}

对于超大参数来说, 这种string对象的创建过程, 涉及到的内存分配与数据拷贝开销是很昂贵的. 为了避免这种数据拷贝, Redis把对于超大参数的处理优化成了如下:

Redis中单机数据库的实现

7.2 对二进制数据进行协议解析

从传输层获取到数据之后, 下一步就是传递给processInputBuffer函数, 来进行协议解析, 如下:

void processInputBuffer(client * c) {
server.current_client = c;
// 每次循环, 解析并执行出一个命令字(及其参数), 并执行相应的命令
// 直到解析失败, 或缓冲区数据耗尽为止
while(sdslen(c->querybuf)) {
// 在某些场合下, 不做处理
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
if (c->flags & CLIENT_BLOCKED) break;
if (c->flag & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; // 如果client的reqtype字段中没有标明该客户端传递给服务端的请求类型(是内联命令, 还是协议数据)
// 那么分析缓冲区中的第一个字节来决定. 这一般发生在客户端向服务端发送第一个请求的时候
if (!c->reqtype) {
if (c->querybuf[0] == '*') c->reqtype = PROTO_REQ_MULTIBULK;
else c->reqtype = PROTO_REQ_INLINE;
} // 按类型解析出一个命令, 及其参数
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknow request type")
} // 执行命令逻辑
if (c->argc == 0) {
// 当argc的值为0时, 代表本次解析没有解析出任何东西, 数据已耗尽
resetClient(c);
} else {
// 执行逻辑
if(processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
c->reploff = c->read_reploff - sdslen(c->querybuf);
} if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) {
resetClient(c);
}
}
if (server.current_client == NULL) break;
}
} server.current_client = NULL;
}

这一步做了分流处理, 如果是内联命令, 则通过processInlineBuffer进行协议解析, 如果不是内联命令, 则通过processMultibulkBuffer进行协议解析. 协议解析完成后, 客户端上载的数据就被转换成了command, 然后接下来调用processCommand来执行服务端逻辑.

我们略过内联命令的解析, 直接看非内联命令请求的协议解析过程

int processMultibulkBuffer(client * c) {
//... // 解析一个请求的头部时, 即刚开始解析一个请求时, multibulklen代表着当前命令的命令+参数个数, 该字段初始值为0
// 以下条件分支将读取一个命令的参数个数, 比如 '*1\r\n$4\r\nPING\r\n'
// 以下分支完成后, 将有:
// c->multibulklen = 1
// c->argv = zmalloc(sizeof(robj*))
// pos = <指向$>
if (c->multibulklen == 0) {
// ... // 读取当前命令的参数个数, 即赋值c->multibulklen
newline = strchr(c->querybuf, '\r');
if (newline == NULL) {
// 请求体不满足协议格式
// ...
return C_ERR:
} // ... ok = string2ll(c->querybuf+1, newline-(c->querybuf+1), &ll)
if (!ok || ll > 1024 * 1024) {
// 解析数值出错, 或数值大于1MB
return C_ERR;
}
// 现在pos指向的是命令字
pos = (newline-c->querybuf) + 2
if (ll <= 0) {
// 解出来一个空命令字, 什么也不做, 把缓冲区头部移除掉, 并且返回成功
sdsrange(c->querybuf, pos, -1);
return C_OK;
} c->multibulklen = ll; // 为命令字及其参数准备存储空间: 将它们都存储在c->argv中, 且是以redisObject的形式存储着
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
} // ... // 读取解析命令字本身与其所有参数
while(c->multibulklen) {
// 读取当前长字符串的长度
if (c->bulklen == -1) {
// ... // 如果当前指针指向的参数, 是一个超大参数, 则把接收缓冲区修剪为该参数
pos += newline-(c->querybuf+pos) + 2; // 现在pos指向的是参数, 或命令字的起始位置
if (ll >= PROTO_MBULK_BIG_ARG) {
size_t qblen;
sdsrange(c->querybuf, pos, -1); // 将接收缓冲区修剪为该参数
pos = 0;
qblen = sdslen(c->querybuf);
if (qblen < (size_t)ll + 2) {
// 如果这个超大参数还未接收完毕, 则扩充接收缓冲区, 为其预留空间
c->querybuf = sdsMakeRoomFor(c->querybuf, ll+2-qblen);
}
} c->bulklen == ll;
} // 读取长字符串内容, 即是命令字本身, 或某个参数
if (sdslen(c->querybuf) - pos < (size_t)(c->bulklen+2)) {
// 当前缓冲区中, 该长字符串的内容尚未接收完毕, 跳出解析
break;
} else {
if (pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) {
// 对于超大参数, 为了避免数据复制的开销, 直接以接收缓冲区为底料, 创建一个字符串对象
// 而对于接收缓冲区, 为其重新分配内存
// 注意, 如果当前指针指向的是一个超大参数, 接收机制保证了, 这个超大参数之后, 缓冲区没有其它数据
c->argv[c->argc++] = createObject(OBJ_STRING, c->querybuf)
sdsIncrLen(c->querybuf, -2); // 移除掉超大参数末尾的\r\n
c->querybuf = sdsnewlen(NULL, c->bulklen + 2); // 为接收缓冲区重新分配内存
sdsclear(c->querybuf);
pos = 0;
} else {
// 对于普通参数或命令字, 以拷贝形式新建字符串对象
c->argv[c->argc++] = createStringObject(c->querybuf+pos, c->bulklen);
pos += c->bulklen+2; // pos指向下一个参数
} c->bulklen = -1;
c->multibulklen--;
}
} // 至此, 一个命令, 及其所有参数, 均被存储在 c->argv中, 以redisObject形式存储着, 对象的类型是string
// 扔掉所有已经解析过的二进制数据
if (pos) sdsrange(c->querybuf, pos, -1); if (c->multibulklen == 0) return C_OK return C_ERR;
}

7.3 命令的执行

一个命令及其所有参数如果被成功解析, 则processMultibulkBuffer函数会返回C_OK, 并在随后调用processCommand执行这个命令. processCommand内部包含了将命令的回应写入c->buf的动作(封装在函数addReply中. 如果命令或命令中的参数在解析过程出错, 或参数尚未接收完毕, 则会返回C_ERR, 退栈, 重新返回, 直至再次从传输层读取数据, 补齐所有参数, 再次回到这里.

接下来我们来看执行命令并写回包的过程, processCommand中的关键代码如下:

int processCommand(client * c) {
if (!strcasecmp(c->argv[0]->ptr, "quit")) {
// 对 quit 命令做单独处理, 调用addReply后, 返回
addReply(c, shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
} // 拿出c->argv[0]中存储的string对象中的ptr字段(其实是个sds字符串)
// 与server.commands中的命令字表进行比对, 查找出对应的命令, 即 struct redisCommand* 句柄
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
// 查无该命令, 进行错误处理
flagTransaction(c); // 如果这是一个在事务中的命令, 则将 CLIENT_DIRTY_EXEC 标志位贴到 c-flags 中, 指示着事务出错
addReplyErrorFormat(c, "unknown command '%s'", (char *)c->argv[0]->ptr)
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) {
// 对比命令所需要的参数数量, 是否和argc中的数量一致, 如果不一致, 说明错误
flagTransaction(c); // 依然同上, 事务出错, 则打 CLIENT_DIRTY_EXEC 标志位
addReplyErrorFormat(c, "wrong number of arguments for '%s' command", c->cmd->name);
return C_OK;
} // 检查用户的登录态, 即在服务端要求登录认证时, 若当前用户还未认证, 且当前命令不是认证命令的话, 则报错
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
flagTransaction(c);
addReply(c, shared.noautherr);
return C_OK;
} // 与集群相关的代码
// 如果集群模式开启, 则通常情况下需要将命令重定向至其它实例, 但在以下两种情况下不需要重定向:
// 1. 命令的发送者是master
// 2. 命令没有键相关的参数
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { int hashslot, error_code;
clusterNode * n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code); if (n == NULL || n != server.cluster->myseld) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
} clusterRedirectClient(c, n, hashslot, error_code); // 重定向
return C_OK;
}
} // 在正式执行命令之前, 首先先尝试的放个屁, 挤点内存出来
if (server.maxmemory) {
int retval = freeMemoryIfNeeded(); if (server.current_client == NULL) return C_ERR;
if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR ) {
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;
}
} // 在持久化出错的情况下, master不接受写操作
if (
(
(server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR
) &&
server.masterhost == NULL &&
(c->cmdd->flags & CMD_WRITE || c->cmd->proc == pingCommand)
) { flagTransaction(c);
if (server.aof_last_write_status == C_OK) {
addReply(c, shared.bgsaveerr);
} else {
addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno)));
}
return C_OK;
} // 还有几种不接受写操作的场景, 这里代码省略掉, 这几种场景包括:
// 1. 用户配置了 min_slaves-to_write配置项, 而符合要求的slave数量不达标
// 2. 当前是一个只读slave
// ... // 在发布-订阅模式中, 仅支持 (P)SUBSCRIBE/(P)UNSUBSCRIBE/PING/QUIT 命令 if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand
) {
addReplyError(c, "only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
} // 当用户配置slave-serve-stale-data为no, 且当前是一个slave, 且与master中断连接的情况下, 仅支持 INFO 和 SLAVEOF 命令
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_server_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) {
flagTransaction(c);
addReply(c, shared.masterdownerr);
return C_OK;
} // 如果当前正在进行loading操作, 则仅接受 LOADING 相关命令
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
return C_OK;
} // 如果是lua脚本操作, 则支持受限数量的命令
// ... // 终于到了终点: 执行命令
// 对于事务中的命令: 进队列, 但不执行
// 对于事务结束标记EXEC: 调用call, 执行队列中的所有命令
// 对于非事务命令: 也是调用call, 就地执行
if (
c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != watchCommand) {
// 如果是事务, 则将命令先排进队列
queueMultiCommand(c);
addReply(c, shared.queued);
} else {
// 如果是非事务执行, 则直接执行
call (c, CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys)) {
handleClientsBlockedOnLists();
}
} return C_OK;
}

需要注意的点有:

  1. 服务端全局变量server中, 持有着一个命令表, 它本身是一个dict结构, 其key是为字符串, 即Redis命令的字符串, value则是为redisCommand结构, 描述了一个Redis中的命令. 可以看到processCommand一开始做的第一件事, 就是用c->argv[0]->ptr去查询对应的redisCommand句柄
  2. Redis中的事务是以一个MULTI命令标记开始, 由EXEC命令标记结束的. 如果一个命令处于事务之中, 那么processCommand内部并不会真正执行这个命令, 仅当最终EXEC命令来临时, 将事务中的所有命令全部执行掉. 而实际执行命令的函数, 是为call, 这是Redis命令执行流程中最核心的一个函数.
  3. 客户端可以以pipelining的形式, 通过一次请求发送多个命令, 但这不是事务, 如果没有将命令包裹在MULTIEXEC中, 那么这多个命令其实是一个个就地执行的.

有关事务的更详细细节我们会在稍后讨论, 但目前, 我们先来看一看这个最核心的call函数, 以下代码隐藏了无关细节

void call(client * c, int flags) {
// ... // 与主从复制相关的代码: 将命令分发给所有MONITOR模式下的从实例
// ... // 调用命令处理函数
// ...
c->cmd->proc(c);
// ... // AOF, 主从复制相关代码
// ...
}

拨开其它特性不管, 单机数据库在这一步其实很简单, 就是调用了对应命令字的回调函数来处理

我们挑其中一个命令SET来看一下, 命令处理回调中都干了些什么:

void setCommand(client * c) {
int j;
robj * expire = NULL;
int uint = UINT_SECONDS;
int flags = OBJ_SET_NO_FLAGS; for (j = 3; i < c->argc; j++) {
// 处理SET命令中的额外参数
// ...
} c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL)
} void setGenericCommand(client * c, int flags, robj * key, robj, * val, robj * expire, int unit, robj * ok_reply, robj * abort_reply) {
long long milliseconds = 0; if (expire) {
// 从string对象中读取数值化的过期时间
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return; if (milliseconds <= 0){
addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
return;
} if (unit == UNIT_SECONDS) milliseconds *= 1000;
} if (
(flags & OBJ_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db, key) == NULL)
) {
// NX 与 XX 时, 判断键是否存在
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
} // 执行所谓的set操作, 即把键值对插入至数据库键空间去, 如果键已存在, 则替换值
setKey(c->db, key, val); server.dirty++; // 统计数据更新 // 设置过期时间
if(expire) setExpire(c, c->db, key, kstime() + milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING, "set", key, c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);
} void setKey(redisDb * db, robj * key, robj * val) {
if (lookupKeyWrite(db, key) == NULL) {
dbAdd(db, key, val); // 对 dictAdd 的封装
} else {
dbOverwrite(db, key, val); // 对 dictReplace的封装
} incrRefCount(val);
removeExpire(db, key);
signalModifiedKey(db, key);
}

setCommand->setGenericCommand->setKey->dictXXX这样一路追下来, 可以看到最终的操作, 是对于某个数据库实例中, 键空间的操作. 逻辑比较清晰. 其它的命令也基本类似, 鉴于Redis中的命令众多, 这里没有篇幅去一个一个的介绍, 也没有必要.

至此, 用户的命令经由服务端接收, 解析, 以及执行在了某个数据库实例上. 我们在一路跟代码的过程中, 随处可见的错误处理中都有addReplyXXX系列的函数调用, 在SET命令的最终执行中, 在setGenericCommand中, setKey操作成功后, 也会调用addReply. 显然, 这个函数是用于将服务端的回应数据写入到某个地方去的.

从逻辑上来讲, 服务端应当将每个命令的回应写入到一个缓冲区中, 然后在适宜的时刻, 序列化为回应体(符合协议规约的二进制数据), 经由网络IO回发给客户端. 接下来, 我们就从addReply入手, 看一看回包的流程.

7.4 写回包流程

addReply系列函数众多, 有十几个, 其它函数诸如addReplyBulk, addReplyError, addReplyErrorFormat等, 只是一些简单的变体. 这一系列函数完成的功能都是类似的: 总结起来就是两点:

  1. 将回包信息写入回包缓冲区

addReply如下:

void addReply(client * c, robj * obj) {
// 检查客户端是否可以写入回包数据, 这个函数在每个 addReplyXXX 函数中都会被首先调用
// 它的职责有:
// 0. 多数情况下, 对于正常的客户端, 它会返回 C_OK. 并且如果该客户端的可写事件没有在事件处理器中注册回调的话, 它会将这个客户端先挂在 server.clients_pending_write 这个链表上
// 0. 在特殊情况下, 对于不能接收回包的客户端(比如这是一个假客户端, 没有数据连接sockfd, 这种客户端在AOF特性中有有到), 返回 C_ERR
if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) {
// 如果回应数据, 是RAW或EMBSTR编码的string对象
if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK) {
// 先尝试调用 _addReplyToBuffer, 将回应数据写进 client->buf 缓冲区中
// 如果失败了, 则把回应数据挂到 client->reply 这个链表上去
_addReplyObjectToList(c, obj);
}
} else if (obj->encoding == OBJ_ENCODING_INT)
// 如果回应数据是INT编码的string对象, 这里有优化操作
if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
char buf[32];
int len; // 由于INT编码的string对象内部是使用 redisObject->ptr 指针直接存储的数值
// 所以直接读这个指针的值, 将其转换为字符表示, 尝试存进 client->buf 缓冲区中
len = ll2string(buf, sizeof(buf), (long)obj->ptr); if (_addReplyToBuffer(c, buf, len) == C_OK) return;
} // 如果存进 client->buf 失败, 就把对应的数值转换为 EMBSTR 或RAW 编码的string对象
// 挂在 client->reply 链表上
obj = getDecodedObject(obj);
if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c, obj); decrRefCount(obj);
} else {
serverPanic("Wrong obj->encoding in addReply()")
}
}

这里会看到, 回应缓冲区有两个:

  1. client->buf是一个二进制的回应缓冲区, 但它的长度是有限的. 默认长度也是16KB, 受宏PROTO_REPLY_CHUNK_BYTES的值限定
  2. client->reply是一个对象缓冲区, 它是一个链表, 无容量限制, 该链表中挂的应该都是以 RAW 或 EMBSTR 编码的 string 对象.

我们在addReply中会看到, 总是试图先把回应数据先通过_addReplyToBuffer添加到client->buf中去, 如果不成功的话, 再把数据通过_addReplyObjectToList挂在client->reply这个链表中去.

这个也很好理解, 当client->buf写满的时候, 再写不下下一个addReply要添加的数据时, 就会退而求其次, 将这个回应数据先挂在client->reply中去. client->buf就是每次调用网络IO时, 一次性写入的数据量. 如果超过这个量了, 会分多次写回应.

现在问题来了: addReplyXXX系列函数只是将回应数据写向二进制缓冲区或链表队列上, 并把客户端添加到server->clients_pending_write列表中去. 那么, 到底是什么时机, 将数据回写给客户端呢?

这个实现就非常苟了!!

按常理思维, 应该在整个 收包->解析->执行命令->写回包至缓冲区或队列这个流程中, 在整个请求体被处理结束后, 给客户端的数据连接的可写事件绑定一个事件回调, 用于写回包. 但Redis的实现, 并不是这样.

还记得, 在服务端启动流程中, 开启服务端事件循环的时候, 有这么一段代码:

int main(int argc, char ** argv) {
//... aeSetBeforeSleepProc(server.el, beforeSleep); // 就是这里
aeSetAfterSleepProc(server.el, afterSleep);
aeMain(server.el); aeDeleteEventLoop(server.el); return 0;
}

服务端的事件处理器, 有一个事前回调, 也有一个事后回调, 而处理向客户端写回包的代码, 就在事前回调中. 来看这个beforeSleep函数:

void beforeSleep(struct aeEventLoop * eventLoop) {
// ... handleClientsWithPendingWrites(); // 就是在这里, 处理所有客户端的写回包 // ...
}

这是什么操作? 这意味着服务端的事件处理器, 每次处理一个事件之前, 都要检查一遍是否还有回应需要给客户端写. 而为什么Redis在写回包的时候, 不使用传统的注册可写事件->在可写事件回调中写回包->写成功后注销可写事件这种经典套路, Redis在handleClientsWithPendingWrites函数实现的注释中, 给出了理由: 大意就是, 不使用事件注册这种套路, 就避免了事件处理机制中会产生的系统调用开销.

我们从handleClientsWithPendingWrites函数开始, 简单的捋一下写回包是怎么实现的


int handleClientsWithPendingWrites(void) {
// ... while((ln = listNext(&li))) {
client * c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write, ln); // 将client->buf及client->reply中的数据写至客户端
if (writeToClient(c->fd, c, 0) == C_ERR) continue; // 如果client->reply中还有数据, 则借助事件处理器来写后续回包
// 正常情况下, 在上面的writeToClient中, 该写的数据已经全写到客户端了
// 仅有在网络异常的情况下, writeToClient处理写回包失败, 导致数据没写完时, 才会进入这个分支
// 这个逻辑是正确的: 网络有异常的情况下, 通过注册事件的形式, 待稍后再重试写回包
// 网络正常的情况下, 一次性把所有数据都写完
if (clienetHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE; if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) {
ae_flags |= AE_BARRIER;
} if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) {
freeClientAsync(c);
}
}
} // ...
}

writeToClient函数中就写的很直白了, 就是调用write向客户端写回包. 如果client->reply上还有数据, 也一个个的写给客户端. 这里就不展示这个函数的实现了, 很简单, 没什么可说的.

sendReplyToClient几乎等同于writeToClient, 只是将后者包装成了符合事件处理回调函数签名的形式.

写回包过程中有两点需要注意:

  1. 如果writeToClient由于一些异常原因(特别是网络波动), 导致在写client->reply中的某个结点的数据失败(writeEAGAIN时), writeToClient是会返回 C_OK的, 但实际上数据未发送完毕. 这种情况下, 后续会再通过事件处理器, 来执行重试.
  2. 如果在writeToClient中, 把数据发送结束了, 在函数末尾, 是会主动删除掉对于client->fdWRITEABLE事件的监听的.

至此, 从客户端发送请求, 到服务端接收二进制数据, 解析二进制数据, 执行命令, 再写回包的整个流程就结束了.

8. 总结

  1. Redis服务端的运行机制的核心在于事件处理器. 其实现相当朴素, 不难读懂. 事件处理器会自动选择当前操作系统平台上最优的多路IO复用接口, 这就是是为什么Redis服务端虽然是单进程单线程在跑, 但依然快成狗的原因
  2. Redis服务端的核心代码是xxxCommand一系列命令处理函数. 这部分命令处理函数分布在t_hash.c, t_list.c, t_set.c, t_string.c, t_zset.cserver.c中, 分别是各种Redis Value Type相关的命令实现.
  3. Redis服务端的核心数据结构实例是全局变量server. 就单机范畴的讨论来说, clients字段及其它相关字段, 持有着客户端. el字段持有着事件处理器, db字段持有着数据库
  4. Redis协议是一个设计简单, 且阅读友好的协议
  5. Redis服务端的启动过程其实十分简单, 在单机数据库范畴里, 主要是初始化事件处理器el与数据库db字段. 其中db字段就是初始化了一坨dict, 然后监听服务端口.
  6. Redis服务端有一个优化点, 本文没有涉及, 即是服务端启动过程中会调用一个名为createSharedObjects()函数. 这个函数中创建了大量的string对象, 用于全局复用
  7. 客户端与服务端的交互, 主要涉及二进制数据读取, 协议解析, 命令执行, 与写回包. 其中对于超大参数的读取有特殊优化
  8. 服务端写回包没有使用事件处理器监听数据连接的可写事件, 而是以一种轮询的方式, 在每次事件处理器运行之前, 处理所有clients的待回包数据, 将其回写. 仅在网络波动等异常情况下导致需要重试发送的场合下, 写回包才涉及事件处理器.