对libevent中bufferevent的深入理解

时间:2021-05-17 00:18:34

对libevent中bufferevent的深入理解

在libevent 2.x.x中,新增了bufferevent来封装了socket操作,用来简化编码操作。

当前项目有个点实在是有点儿绕,我自己在libevent基础上做了一个网络引擎的wrapper,未来打算替换,在实现的过程中,发现了一些问题,于是这些日子在空闲的时候阅读了下libevent的实现以及从前没有接触过的epoll模型,心里也算是有点数了。

在bufferevent中,封装了两个event,分别是read和write,同时也有对应的read buffer和write buffer,这也是常用的设计。然而bufferevent对于read和write事件的处理却有很大的差异。

读事件

首先我们谈谈比较容易理解的read事件,通常的步骤就是新建一个bufferevent

bufferevent_socket_new

然后我们设置对应的写回调函数,同时要记住必须调用

bufferevent_enable(pEv, EV_READ)

这个函数是将对应的event通过event_add添加到eventbase中的,我们知道在libevent的设计中,所有的事件必须加入eventbase中才会获得事件通知。

bufferevent_enable 函数中,会设置enabled变量的对应的标志位,并且将对应的event添加到eventbase中。

不然我们的读事件都会被libevent忽略,然后我们只要静静的等待着读回调即可。在libevent层面,它属于proactor模式,我们只需要从缓冲区中把socket数据取出来即可,不用调用recv/read。

从libevent的内部实现来讲,在调用bufferevent_socket_new的时候已经初始化了读事件,并且设置了读回调,这个回调是属于libevent内部的回调,用于实现proactor的机制。

static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
struct evbuffer *input;
int res = 0;
short what = BEV_EVENT_READING;
ev_ssize_t howmuch = -1, readmax=-1;

_bufferevent_incref_and_lock(bufev);

if (event == EV_TIMEOUT) {
/* Note that we only check for event==EV_TIMEOUT. If
* event==EV_TIMEOUT|EV_READ, we can safely ignore the
* timeout, since a read has occurred */
what |= BEV_EVENT_TIMEOUT;
goto error;
}

input = bufev->input;

/*
* If we have a high watermark configured then we don't want to
* read more data than would make us reach the watermark.
*/
if (bufev->wm_read.high != 0) {
howmuch = bufev->wm_read.high - evbuffer_get_length(input);
/* we somehow lowered the watermark, stop reading */
if (howmuch <= 0) {
bufferevent_wm_suspend_read(bufev);
goto done;
}
}
readmax = _bufferevent_get_read_max(bufev_p);
if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
* uglifies this code. XXXX */
howmuch = readmax;
if (bufev_p->read_suspended)
goto done;

evbuffer_unfreeze(input, 0);
res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
evbuffer_freeze(input, 0);

if (res == -1) {
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))
goto reschedule;
/* error case */
what |= BEV_EVENT_ERROR;
} else if (res == 0) {
/* eof case */
what |= BEV_EVENT_EOF;
}

if (res <= 0)
goto error;

_bufferevent_decrement_read_buckets(bufev_p, res);

/* Invoke the user callback - must always be called last */
if (evbuffer_get_length(input) >= bufev->wm_read.low)
_bufferevent_run_readcb(bufev);

goto done;

reschedule:
goto done;

error:
bufferevent_disable(bufev, EV_READ);
_bufferevent_run_eventcb(bufev, what);

done:
_bufferevent_decref_and_unlock(bufev);
}

可以看到最终会调用用户的回调函数,而对于这些函数来说,libevent已经封装了read/recv等细节,只需要从evbuffer中获取数据就可以了。

写事件

读事件非常好理解,就是有数据可读了,而读事件有点儿不一样。读事件是属于缓冲区可写事件,在socket的缓冲区未写满的时候会一直通知,所以bufferevent的写处理有点儿不一样。

在初始化bufferevent的时候,bufferevent会默认自动设置enabled的EV_WRITE标志位,这是很重要的,而EV_READ的标志位必须自己设置。

为何只是设置EV_READ标志位而不是调用 bufferevent_enable 呢?上面已经谈到了这个问题,即当socket缓冲区可写的时候,会不断的回调,而这是不需要的,我们的目的只是有用户数据还未写入socket缓冲区的时候,我们才需要这个通知,于是在正常的情况下, bufferevent_enable(pEv, EV_WRITE) 是不会设置的,这是有悖于bufferevent的设计思路的。

为了解决这个问题,我们就不处理socket的可写事件,当用户调用了 bufferevent_write 的时候才进行处理。当用户请求发送数据的时候,libevent是写入写缓冲的,大概的设计逻辑如下:

// 添加到写缓冲 ...省略

evbuffer_invoke_callbacks(buf);

查看了下libevent的实现,仅仅只是把数据append到写缓冲内,没有任何的发送机制,可实际上这段数据还是被发送出去了,这是怎么实现的呢?所以这个工作只能是 evbuffer_invoke_callbacks(buf); 这个来实现的,这个是注册到evbuffer的一个事件回调,当有数据写入的时候会被调用。于是我们继续追溯这个调用链,我们从头追溯,我们在初始化bufferevent对象的时候,有这么一句调用:

evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

我们对写缓冲设置了这么一个回调,那么在我们把数据写入了写缓冲,这个回调就会被调用,我们来看下这个的实现:

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo,
void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

if (cbinfo->n_added &&
(bufev->enabled & EV_WRITE) &&
!event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
!bufev_p->write_suspended) {
/* Somebody added data to the buffer, and we would like to
* write, and we were not writing. So, start writing. */
if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
/* Should we log this? */
}
}
}

没错,我们又看到了 be_socket_add 这个熟悉的身影,而这个函数就是添加对应的事件到eventbase中,我们来看下上述的判断条件,看到了enabled这个变量了,没错在 bufferevent_socket_new 中已经默认设置了EV_WRITE的标志位,所以当我们使用bufferevent_write写入写缓冲的时候,其实libevent会将可写事件监听,当有可写事件来临的时候,就执行到了之前在写event上注册的回调 bufferevent_writecb,在这个函数中,会尝试将写缓冲的数据写入socket缓冲区,假设写入完毕,则会删除读事件的监听,假设缓冲区已满,则等待下次缓冲区可写的通知。

这样写事件的逻辑也整理清楚了,写流程是属于有点儿复杂的,读相对而言会简单很多。

有感

我们目前项目中对于libevent的发送数据包的用法,有点儿绕,不知道是有意为之还是怎么的,我们现在的做法是主动的 bufferevent_enable EV_WRITE事件,而当缓冲区可用的时候,我们还没有把我们的数据包写入写缓冲,于是直接调用用户的回调函数,于是在用户的回调函数中再把数据给发送出去,当然还是调用了 bufferevent_write ,而在这个函数中,当缓冲区发完再把写事件从eventbase中删除,这个逻辑其实是多走了一点儿东西,其实不必等缓冲区可写才发送数据包,直接调用 bufferevent_write 就可以了,上述的逻辑会把发包的逻辑打乱,在回调的时候恢复当前的现场也是多写了很多代码。