libevent中bufferevent的使用

时间:2022-12-30 00:17:26

以下面的bufferevent-test.c为例,在windows下单步调试,分析bufferevent的基本使用。

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <assert.h>
#include <winsock2.h>

#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/event_compat.h>
#include <event2/bufferevent.h>
#include <event2/bufferevent_struct.h>
#include <event2/buffer.h>
#include <event2/Buffer_compat.h>
#include <event2/bufferevent_compat.h>

#define SERVER_PORT 5150
struct client {
int fd;
struct bufferevent *buf_ev;
};

void setnonblock(int fd)
{
unsigned long ul=1;
ioctlsocket(fd,FIONBIO,&ul);
}

void buf_read_callback(struct bufferevent *incoming ,void *arg)
{
struct evbuffer *evreturn;
char *req;
req= evbuffer_readln(incoming->input,NULL,EVBUFFER_EOL_ANY);

//构造待发送数据,并将其写入到输出缓冲区output
evreturn = evbuffer_new();
evbuffer_add_printf(evreturn,"You said %s\n",req);
bufferevent_write_buffer(incoming,evreturn);
evbuffer_free(evreturn);
free(req);
}

void buf_write_callback(struct bufferevent *bev,void *arg)
{
}

void buf_error_callback(struct bufferevent *bev,short what,void *arg)
{
struct client *client = (struct client *)arg;
bufferevent_free(client->buf_ev);
closesocket(client->fd);
free(client);
}

void accept_callback(int fd,short ev,void *arg)
{
int client_fd;
struct sockaddr_in client_addr;
int client_len = sizeof(client_addr);
struct client *client;

client_fd = accept(fd,(struct sockaddr *)&client_addr,&client_len);
setnonblock(client_fd);

client = (struct client *)malloc(sizeof(*client));
client->fd = client_fd;

//创建一个bufferevent socket,设置回调函数,并将读事件加入到监听列表
client->buf_ev = bufferevent_socket_new(NULL,client_fd,0);
bufferevent_setcb(client->buf_ev, buf_read_callback, buf_write_callback, buf_error_callback, client);
bufferevent_enable(client->buf_ev, EV_READ);
}

int main(int argc,
char **argv)
{
int socketlisten;
struct sockaddr_in addresslisten;
struct event accept_event;
char reuse = 1;
struct event_base *base;

WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);

base=event_init();
printf("%d\n",sizeof(struct event));

socketlisten = socket(AF_INET, SOCK_STREAM, 0);

memset(&addresslisten, 0, sizeof(addresslisten));
addresslisten.sin_family = AF_INET;
addresslisten.sin_addr.s_addr = INADDR_ANY;
addresslisten.sin_port = htons(SERVER_PORT);

bind(socketlisten,(struct sockaddr *)&addresslisten,sizeof(addresslisten));
listen(socketlisten, 5);

setsockopt(socketlisten,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));
setnonblock(socketlisten);

event_set(&accept_event,socketlisten,EV_READ|EV_PERSIST,accept_callback,NULL);
event_add(&accept_event,NULL);

event_dispatch();

closesocket(socketlisten);
return 0;
}

首先通过监听socketlisten等待连接,当有连接请求到达时,调用该事件的回调函数accept_callback()。在该函数中,为每个成功连接的socket创建一个bufferevent事件,监听该fd上的数据读写。当读、写事件触发时,调用其对应的回调函数。

bufferevent的基本结构如下所示:
libevent中bufferevent的使用
bufferevent结构体中主要有:
1)两个事件(读、写):基本的event,等待被触发然后加入到base的活动队列,然后调用事件对应的回调函数(bufferevent_readcb()或bufferevent_writecb()完成数据的发送和接收)
2)两个缓冲区(input、output):存储读取到和待发送的数据
3)三个回调函数(read、write、error):用户自定义
4)两个超时时间(读、写超时)

1、bufferevent事件的创建、设置

当有连接请求到来时,会触发accept_callback(),在该函数中,为每个连接socket创建一个bufferevent事件:

  client->buf_ev = bufferevent_socket_new(NULL,client_fd,0);  //创建一个bufferevent事件
bufferevent_setcb(client->buf_ev, buf_read_callback, buf_write_callback, buf_error_callback, client); //设置回调函数
bufferevent_enable(client->buf_ev, EV_READ);//将读事件加入到监听列表

首先调用bufferevent_socket_new(NULL,fd,0);创建一个bufferevent事件

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,int options)
{
struct bufferevent_private *bufev_p;
struct bufferevent *bufev;

if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
return NULL;
//初始化bufferevent结构体的成员变量
if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,options) < 0) {
mm_free(bufev_p);
return NULL;
}
bufev = &bufev_p->bev;
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
//分别创建读、写事件,且事件为永久事件,并设置其回调函数。当事件被触发时,调用对应的回调函数,在该回调函数中,系统会将读到和要发送的数据写到对应缓冲区,此时只是初始化事件,但是还没有调用event_add将事件加入到监听列表中
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
//为output缓冲区添加一个回调函数,当该缓冲区发生变化时,可调用该回调函数,默认监听输出缓冲区(当输出缓冲区发生变化时,调用此函数将写事件加入到监听列表,等待事件被触发然后发送数据)
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

return bufev;
}

然后调用bufferevent_setcb()设置bufferevent的回调函数

    bufev->readcb = readcb;
bufev->writecb = writecb;
bufev->errorcb = eventcb;

bufev->cbarg = cbarg;

最后调用bufferevent_enable()使能读事件EV_READ,该函数会调用bufferevent_ops中的be_socket_enable(),最终会调用到event_add()将读事件加入到监听列表,等待数据的到来

2、有数据到来时触发ev_read

当有数据到来时,会触发ev_read事件,并调用该事件对应的回调函数bufferevent_readcb(),该函数最终调用evbuffer_read()完成数据的读取,并调用_bufferevent_run_readcb()来调用回调函数(本例中为buf_read_callback())

int evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
n = get_n_bytes_readable_on_socket(fd);//获得要读取的数据总大小
if (howmuch < 0 || howmuch > n)
howmuch = n;

/* 为数据的读取做准备,初始化缓冲区的数据链input->chain */
if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1) {
result = -1;
goto done;
} else {
IOV_TYPE vecs[NUM_READ_IOVEC];
//初始化适用于WSA的接收缓冲区ev_vecs
struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];
nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs, 2,
&chainp, 1);

for (i=0; i < nvecs; ++i)
WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);
{
DWORD bytesRead;
DWORD flags=0;
//接收数据
if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL)) {
/* read failed */
} else
n = bytesRead;
}
}
/*根据读取到的数据大小,更改缓冲区的变量*/
......

evbuffer_invoke_callbacks(buf);//调用该缓冲区对应的回调函数,本例中input缓冲区未设置回调函数
result = n;
}

然后调用回调函数buf_read_callback(),这个函数由用户自定义,本例中其主要操作为:

  req= evbuffer_readln(incoming->input,NULL,EVBUFFER_EOL_ANY);//从接收缓冲区中读取出一行数据,同时会清除缓冲区中的已读数据
evreturn = evbuffer_new();//初始化一个缓冲区
evbuffer_add_printf(evreturn,"You said %s\n",req);//构造要发送的数据
bufferevent_write_buffer(incoming,evreturn);//将待发送数据写入发送缓冲区,即bufferevent->output

在bufferevent_write_buffer()中最终会调用evbuffer_add_buffer()复制缓冲区,并调用缓冲区对应的回调函数

int
evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
{
//更新缓冲区变量
in_total_len = inbuf->total_len;
out_total_len = outbuf->total_len;

if (out_total_len == 0) {
evbuffer_free_all_chains(outbuf->first);
COPY_CHAIN(outbuf, inbuf); //复制缓冲区
} else {
APPEND_CHAIN(outbuf, inbuf);
}

inbuf->n_del_for_cb += in_total_len;
outbuf->n_add_for_cb += in_total_len;
//调用缓冲区对应的回调函数,在output缓冲区上设置了回调函数bufferevent_socket_outbuf_cb(见bufferevent_socket_new()),因此最终会调用这个函数
evbuffer_invoke_callbacks(inbuf);
evbuffer_invoke_callbacks(outbuf);
}

bufferevent_socket_outbuf_cb()函数最终调用event_add()将写事件加入到监听列表中,当客户端调用recv函数要接收数据时,就会触发bufferevent的写事件

3、ev_write写事件触发

写事件触发时会调用其对应的回调函数bufferevent_writecb(),在该函数中,最终会调用evbuffer_write_atmost()完成数据的发送

int evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,
ev_ssize_t howmuch)
{
if (howmuch < 0 || (size_t)howmuch > buffer->total_len)
howmuch = buffer->total_len;

if (howmuch > 0) {
n = evbuffer_write_iovec(buffer, fd, howmuch); //最终调用WSASend完成数据的发送
}
if (n > 0)
evbuffer_drain(buffer, n);//该函数会调用evbuffer_chain_free释放发送缓冲区中的数据链,并调用缓冲区上对应的回调函数
}

总结

1、首先创建一个事件event监听端口上的连接请求,当成功建立连接后,为每个连接创建一个bufferevent,监听该连接上数据的发送和接收
2、当读事件ev_read被触发时(客户端发送数据),调用回调函数bufferevent_readcb()完成数据的接收,并调用用户的读回调函数,构造待发送数据并写入发送缓冲区
3、当写事件ev_write被触发时(客户端等待接收数据),调用回调函数bufferevent_writecb()完成数据的发送

相关文章