目录
- 1. Redis 异步方式
- 1.1 同步连接
- 优点
- 缺点
- 示例:访问 Redis,并对 `counter` 实现自增1000次,统计用时
- 1.2 异步连接
- 优点
- 缺点
- 1.2.1 Redis 驱动
- 1.2.2 示例
- 第1步:实现 Reactor
- 第2步:实现 Redis 适配器
- 第3步:实现主体代码
- 参考
1. Redis 异步方式
Redis 提供了两种主要的连接方式:同步连接和异步连接。选择合适的连接方式可以显著影响应用的性能和响应能力。
1.1 同步连接
同步连接是指客户端与 Redis 服务器之间的一种简单的请求-响应模式。当使用同步连接时,客户端发送一个命令给 Redis 服务器,并在收到服务器响应之后才能继续执行后续的操作。在同步连接中,客户端在执行命令时会阻塞当前线程,等待服务器的响应,直到响应返回或超时。
优点
- 实现简单:代码逻辑直观,易于理解和维护。
- 业务逻辑连贯:操作按顺序执行,业务逻辑没有割裂。
缺点
- 阻塞当前线程:每个命令执行都需要等待响应,可能导致线程阻塞,影响性能。
- 效率较低:在高并发场景下,频繁的阻塞和唤醒线程会增加系统开销。
示例:访问 Redis,并对 counter
实现自增1000次,统计用时
以下是一个使用 hiredis 库的 C 程序示例,该程序连接到 Redis 服务器,对 counter
键进行 1000 次自增操作,并统计所用时间。
编译命令:
gcc redis-test-sync.c -o redis-test-sync -lhiredis
代码示例 (redis-test-sync.c
):
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <hiredis/hiredis.h>
// 获取当前时间的毫秒数
int current_tick() {
int t = 0;
struct timespec ti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (int)ti.tv_sec * 1000;
t += ti.tv_nsec / 1000000;
return t;
}
int main(int argc, char **argv) {
redisContext *c;
redisReply *reply;
const char *hostname = "127.0.0.1";
int port = 6379;
// 设置连接超时时间为1.5秒
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
// 连接到Redis服务器
c = redisConnectWithTimeout(hostname, port, timeout);
if (c == NULL || c->err) {
if (c) {
printf("Connection error: %s\n", c->errstr);
redisFree(c);
} else {
printf("Connection error: can't allocate redis context\n");
}
exit(1);
}
// 获取自增次数,默认为1000
int num = (argc > 1) ? atoi(argv[1]) : 1000;
int before = current_tick();
// 执行自增操作
for (int i = 0; i < num; i++) {
reply = redisCommand(c, "INCR counter"); // +1操作
printf("INCR counter: %lld\n", reply->integer);
freeReplyObject(reply);
}
int used = current_tick() - before;
printf("After %d exec redis commands, used %d ms\n", num, used);
// 断开连接
redisFree(c);
return 0;
}
输出示例:
INCR counter: 1
INCR counter: 2
...
INCR counter: 1000
After 1000 exec redis commands, used 150 ms
解释:
-
连接Redis服务器:使用
redisConnectWithTimeout
函数连接到 Redis 服务器,设置超时时间为 1.5 秒。 -
执行自增操作:循环执行
INCR counter
命令,次数由命令行参数决定,默认为 1000 次。 - 统计用时:记录操作前后的时间差,计算总耗时。
- 输出结果:每次自增操作的结果,以及总耗时。
1.2 异步连接
异步连接允许客户端在发送命令后不等待响应,可以继续执行其他操作,适用于高性能和高并发场景。异步连接通常通过事件驱动机制实现,可以充分利用多核 CPU 和网络资源。
优点
- 非阻塞操作:不会阻塞当前线程,提升系统的并发处理能力。
- 更高的效率:适合高并发和低延迟要求的应用场景。
- 灵活的业务逻辑:可以在等待响应的同时处理其他任务。
缺点
- 实现复杂:代码逻辑较为复杂,需要处理事件循环和回调函数。
- 业务逻辑割裂:异步操作可能导致业务逻辑分散,难以维护。
- 依赖事件库:通常需要结合事件驱动库(如 libevent、libuv)使用。
1.2.1 Redis 驱动
实现异步连接需要一个 Redis 驱动,该驱动需要将 Redis 连接融入项目中的 Reactor 模式进行管理。具体步骤如下:
-
实现Redis驱动:
- 服务端使用异步连接,需要自己实现 Redis 驱动,即将 Redis 连接与项目中的 Reactor 进行融合管理。
-
设计Redis适配器:
- 构建Redis事件对象:包括 hiredis 事件对象和 Reactor 事件对象。
- 适配事件控制:复用项目中的 Reactor 事件循环。
-
hiredis 的封装规则:
- Reactor 的实现:所有的 IO 由用户实现。
- 适配器的实现:hiredis 提供事件操作接口,用户需要适配这些接口。不同的网络库和平台对事件操作接口的实现可能不同。
用户需要适配的 hiredis 事件接口包括:
-
addRead
:添加读事件 -
delRead
:删除读事件 -
addWrite
:添加写事件 -
delWrite
:删除写事件 -
cleanup
:事件对象释放 scheduleTimer
1.2.2 示例
以下示例展示了如何实现 Redis 异步连接,包括 Reactor 的实现、Redis 适配器以及主体代码。
第1步:实现 Reactor
文件:reactor.h
#ifndef _MARK_REACTOR_
#define _MARK_REACTOR_
#include <sys/epoll.h>
#include <stdio.h>
#include <unistd.h> // read write
#include <fcntl.h> // fcntl
#include <sys/types.h> // listen
#include <sys/socket.h> // socket
#include <errno.h> // errno
#include <arpa/inet.h> // inet_addr htons
#include <assert.h> // assert
#include <stdlib.h> // malloc
#include <string.h> // memcpy memmove
#include "chainbuffer/buffer.h" // 自定义缓冲区实现
#define MAX_EVENT_NUM 512 // 每次用户拷贝事件的最大数目
#define MAX_CONN ((1<<16)-1) // 事件对象的最大数目:65535
typedef struct event_s event_t;
typedef void (*event_callback_fn)(int fd, int events, void *privdata);
typedef void (*error_callback_fn)(int fd, char * err);
// Reactor对象,管理IO的全局变量
typedef struct {
int epfd; // epoll文件描述符
int listenfd; // 监听的文件描述符
int stop; // 停止循环标记
event_t *events; // 存储所有事件对象,存储在堆上,记得释放
int iter; // 用于遍历events,获取未被使用的位置
struct epoll_event fire[MAX_EVENT_NUM]; // 用户态数组,用于拷贝IO事件到用户态
} reactor_t;
// 事件对象,sockitem,保存每个fd对应的IO状态
struct event_s {
int fd; // 事件对应的文件描述符
reactor_t *r; // 指向Reactor全局对象
buffer_t in; // 读缓冲,待读取
buffer_t out; // 写缓冲,待发送
event_callback_fn read_fn; // 读回调函数
event_callback_fn write_fn; // 写回调函数
error_callback_fn error_fn; // 错误回调函数
};
// 函数声明
int event_buffer_read(event_t *e);
int event_buffer_write(event_t *e, void * buf, int sz);
reactor_t * create_reactor();
void release_reactor(reactor_t * r);
event_t * new_event(reactor_t *R, int fd,
event_callback_fn rd,
event_callback_fn wt,
error_callback_fn err);
void free_event(event_t *e);
int set_nonblock(int fd);
int add_event(reactor_t *R, int events, event_t *e);
int del_event(reactor_t *R, event_t *e);
int enable_event(reactor_t *R, event_t *e, int readable, int writeable);
void eventloop_once(reactor_t * r, int timeout);
void stop_eventloop(reactor_t * r);
void eventloop(reactor_t * r);
int create_server(reactor_t *R, short port, event_callback_fn func);
int event_buffer_read(event_t *e);
int event_buffer_write(event_t *e, void * buf, int sz);
#endif
文件:reactor.c
#include "reactor.h"
// 创建Reactor对象
reactor_t *
create_reactor() {
reactor_t *r = (reactor_t *)malloc(sizeof(*r));
r->epfd = epoll_create(1);
r->listenfd = 0;
r->stop = 0;
r->iter = 0;
r->events = (event_t*)malloc(sizeof(event_t)*MAX_CONN);
memset(r->events, 0, sizeof(event_t)*MAX_CONN);
memset(r->fire, 0, sizeof(struct epoll_event) * MAX_EVENT_NUM);
return r;
}
// 释放Reactor对象
void
release_reactor(reactor_t * r) {
free(r->events);
close(r->epfd);
free(r);
}
// 获取Reactor的事件堆event上的空闲事件对象
static event_t *
_get_event_t(reactor_t *r) {
r->iter++;
while (r->events[r->iter & MAX_CONN].fd > 0) {
r->iter++;
}
return &r->events[r->iter];
}
// 创建事件对象
event_t *
new_event(reactor_t *R, int fd,
event_callback_fn rd,
event_callback_fn wt,
error_callback_fn err) {
assert(rd != 0 || wt != 0 || err != 0);
// 获取空闲事件对象
event_t *e = _get_event_t(R);
// 初始化事件对象
e->r = R;
e->fd = fd;
buffer_init(&e->in, 1024*16);
buffer_init(&e->out, 1024*16);
e->read_fn = rd;
e->write_fn = wt;
e->error_fn = err;
return e;
}
// 释放事件对象分配的buffer空间
void
free_event(event_t *e) {
buffer_free(&e->in);
buffer_free(&e->out);
}
// 设置非阻塞的fd
int
set_nonblock(int fd) {
int flag = fcntl(fd, F_GETFL, 0);
return fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
// 添加事件
int
add_event(reactor_t *R, int events, event_t *e) {
struct epoll_event ev;
ev.events = events;
ev.data.ptr = e;
if (epoll_ctl(R->epfd, EPOLL_CTL_ADD, e->fd, &ev) == -1) {
printf("Add event error, fd = %d\n", e->fd);
return 1;
}
return 0;
}
// 删除事件
int
del_event(reactor_t *R, event_t *e) {
epoll_ctl(R->epfd, EPOLL_CTL_DEL, e->fd, NULL);
free_event(e);
return 0;
}
// 修改事件(读事件 or 写事件)
int
enable_event(reactor_t *R, event_t *e, int readable, int writeable) {
struct epoll_event ev;
ev.events = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0);
ev.data.ptr = e;
if (epoll_ctl(R->epfd, EPOLL_CTL_MOD, e->fd, &ev) == -1) {
return 1;
}
return 0;
}
// 一次事件循环
void
eventloop_once(reactor_t * r, int timeout) {
int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout);
for (int i = 0; i < n; i++) {
struct epoll_event *e = &r->fire[i];
int mask = e->events;
if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT;
if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT;
event_t *et = (event_t*) e->data.ptr;
// 处理读事件
if (mask & EPOLLIN) {
if (et->read_fn)
et->read_fn(et->fd, EPOLLIN, et);
}
// 处理写事件
if (mask & EPOLLOUT) {
if (et->write_fn)
et->write_fn(et->fd, EPOLLOUT, et);
else {
uint8_t * buf = buffer_write_atmost(&et->out);
event_buffer_write(et, buf, buffer_len(&et->out));
}
}
}
}
// 停止事件循环
void
stop_eventloop(reactor_t * r) {
r->stop = 1;
}
// 事件循环
void
eventloop(reactor_t * r) {
while (!r->stop) {
eventloop_once(r, -1); // 阻塞等待事件
}
}
// 创建服务器
int
create_server(reactor_t *R, short port, event_callback_fn func) {
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0) {
printf("Create listenfd error!\n");
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
// 设置地址重用,允许快速重启服务器
int reuse = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(int)) == -1) {
printf("Reuse address error: %s\n", strerror(errno));
return -1;
}
if (bind(listenfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
printf("Bind error: %s\n", strerror(errno));
return -1;
}
if (listen(listenfd, 5) < 0) {
printf("Listen error: %s\n", strerror(errno));
return -1;
}
if (set_nonblock(listenfd) < 0) {
printf("Set nonblock error: %s\n", strerror(errno));
return -1;
}
R->listenfd = listenfd;
event_t *e = new_event(R, listenfd, func, 0, 0);
add_event(R, EPOLLIN, e);