
本文是关于libevent库第一篇博文,主要由例子来说明如何利用该库。后续博文再深入研究该库原理。
libevent库简介
就如libevent官网上所写的“libevent - an event notification library”,libevent就是一个基于事件通知机制的库,支持/dev/poll、kqueue、event ports、select、poll和epoll事件机制,也因此它是一个跨操作系统的库(支持Linux、*BSD、Mac OS X、Solaris、Windows等)。目前应用该库的有Chromium、Memcached、NTP、tmux等应用。
libevent 库实际上没有更换select()、poll()或其他机制的基础,而是使用对于每个平台最高效的高性能解决方案,在其实现外加上一个包装器。
为了实际处理每个请求,libevent 库提供一种事件机制,它作为底层网络后端的包装器。事件系统让为连接添加处理函数变得非常简便,同时降低了底层 I/O 复杂性。这是 libevent 系统的核心。
libevent 库的其他组件提供其他功能,包括缓冲的事件系统(用于缓冲发送到客户端/从客户端接收的数据)以及 HTTP、DNS 和 RPC 系统的核心实现。
另外,libevent库非常轻量级,这让我们学习它的源码难度低了不少。关于源码分析具体可参考:
如果要生成libevent库的文档,可参考博文使用Doxygen生成libevent document(2.0.15)-- CHM格式。
回显服务端示例
简易流程
创建 libevent 服务器的基本方法是,注册当发生某一操作(比如接受来自客户端的连接)时应该执行的函数,然后调用主事件循环event_base_dispatch()
。执行过程的控制由 libevent系统处理。注册事件和将调用的函数之后,事件系统开始自治;在应用程序运行时,可以在事件队列中添加(注册)或删除(取消注册)事件。事件注册非常方便,可以通过它添加新事件以处理新打开的连接,从而构建灵活的网络处理系统。
例如,可以打开一个监听套接字,然后注册一个回调函数,每当需要调用accept()函数以打开新连接时调用这个回调函数,这样就创建了一个网络服务器。下边所示的代码片段说明了这个基本过程:
int main(int argc, char **argv)
{
/* Declare a socket file descriptor. */
evutil_socket_t listenfd; /* Setup listening socket */ /* Make the listen socket reuseable and non-blocking. */
evutil_make_listen_socket_reuseable(listenfd);
evutil_make_socket_nonblocking(listenfd); /* Declare an event_base to host events. */
struct event_base *base = event_base_new(); /* Register listen event. */
struct event *listen_event;
listen_event = event_new(base, listenfd, EV_READ | EV_PERSIST, do_accetp, (void *)base);
event_add(listen_event, NULL); /* Start the event loop. */
event_base_dispatch(base); /* End. */
close(listenfd);
return ;
}
下边详细介绍上边程序中用到的libevent中的API:
1)evutil_socket_t 定义于Util.h头文件中,用于跨平台表示socket的ID(在Linux下表示的是其文件描述符),如下所示:
/**
* A type wide enough to hold the output of "socket()" or "accept()". On
* Windows, this is an intptr_t; elsewhere, it is an int. */
#ifdef WIN32
#define evutil_socket_t intptr_t
#else
#define evutil_socket_t int
#endif
2)evutil_make_listen_socket_reuseable 函数声明于Util.h,实现于Evutil.c,用于跨平台将socket设置为可重用(实际上是将端口设为可重用,具体可参照博文Linux 套接字编程中的 5 个隐患中的第3个隐患),具体定义如下:
int
evutil_make_listen_socket_reuseable(evutil_socket_t sock)
{
#ifndef WIN32
int one = ;
/* REUSEADDR on Unix means, "don't hang on to this address after the
* listener is closed." On Windows, though, it means "don't keep other
* processes from binding to this address while we're using it. */
return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,
(ev_socklen_t)sizeof(one));
#else
return ;
#endif
}
同样,evutil_make_socket_nonblocking函数也声明于Util.h,实现于Evutil.c,用于跨平台将socket设置为非阻塞,具体定义如下:
int
evutil_make_socket_nonblocking(evutil_socket_t fd)
{
#ifdef WIN32
{
u_long nonblocking = ;
if (ioctlsocket(fd, FIONBIO, &nonblocking) == SOCKET_ERROR) {
event_sock_warn(fd, "fcntl(%d, F_GETFL)", (int)fd);
return -;
}
}
#else
{
int flags;
if ((flags = fcntl(fd, F_GETFL, NULL)) < ) {
event_warn("fcntl(%d, F_GETFL)", fd);
return -;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -) {
event_warn("fcntl(%d, F_SETFL)", fd);
return -;
}
}
#endif
return ;
}
3)event_base结构体定义在event_internal.h中,它记录了所有的等待和已激活的事件,并当有事件被激活时通知调用者。默认地,我们用event_base_new函数就可以新建一个event_base对象。event_base_new函数的定义如下:
struct event_base *
event_base_new(void)
{
struct event_base *base = NULL;
struct event_config *cfg = event_config_new();
if (cfg) {
base = event_base_new_with_config(cfg);
event_config_free(cfg);
}
return base;
}
也就是说实际上该函数调用了event_base_new_with_config来创建event_base对象,所以我们也可以利用event_config_new和event_base_new_with_config定制event_base对象。
4)event结构体定义在event_struct.h文件中,主要记录事件的相关属性。event_new函数用于创建一个event对象,具体定义如下:
struct event *
event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
{
struct event *ev;
ev = mm_malloc(sizeof(struct event));
if (ev == NULL)
return (NULL);
if (event_assign(ev, base, fd, events, cb, arg) < ) {
mm_free(ev);
return (NULL);
} return (ev);
}
// Parameters:
// base the event base to which the event should be attached.
// fd the file descriptor or signal to be monitored, or -1.
// events desired events to monitor: bitfield of EV_READ, EV_WRITE, EV_SIGNAL, EV_PERSIST, EV_ET.
// callback callback function to be invoked when the event occurs
// callback_arg an argument to be passed to the callback function
// Returns:
// a newly allocated struct event that must later be freed with event_free().
在上边程序中,cb是回调函数,其原型如下:
/**
A callback function for an event. It receives three arguments: @param fd An fd or signal
@param events One or more EV_* flags
@param arg A user-supplied argument. @see event_new()
*/
typedef void (*event_callback_fn)(evutil_socket_t, short, void *);
5)event_base_dispatch函数开启事件轮询(event_base_loop提供同样功能,不过更为灵活,实际event_base_dispatch只是event_base_loop的特例),定义如下:
int
event_base_dispatch(struct event_base *event_base)
{
return (event_base_loop(event_base, ));
}
实际例子
一个完整的服务器端的程序如下:
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <assert.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/types.h> #include <event2/event.h>
#include <event2/bufferevent.h> #define SERV_PORT 9877
#define LISTEN_BACKLOG 32
#define MAX_LINE 1024 void do_accetp(evutil_socket_t listenfd, short event, void *arg);
void read_cb(struct bufferevent *bev, void *arg);
void error_cb(struct bufferevent *bev, short event, void *arg);
void write_cb(struct bufferevent *bev, void *arg); int main(int argc, int **argv)
{
evutil_socket_t listenfd;
if((listenfd = socket(AF_INET, SOCK_STREAM, )) < )
{
perror("socket\n");
return ;
} struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT); if(bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < )
{
perror("bind\n");
return ;
}
if(listen(listenfd, LISTEN_BACKLOG) < )
{
perror("listen\n");
return ;
} printf("Listening...\n"); evutil_make_listen_socket_reuseable(listenfd);
evutil_make_socket_nonblocking(listenfd); struct event_base *base = event_base_new();
if(base == NULL)
{
perror("event_base\n");
return ;
}
const char *eventMechanism = event_base_get_method(base);
printf("Event mechanism used is %s\n", eventMechanism); struct event *listen_event;
listen_event = event_new(base, listenfd, EV_READ | EV_PERSIST, do_accetp, (void *)base);
event_add(listen_event, NULL);
event_base_dispatch(base); if(close(listenfd) < )
{
perror("close\n");
return ;
}
printf("The End\n");
return ;
} void do_accetp(evutil_socket_t listenfd, short event, void *arg)
{
struct event_base *base = (struct event_base *)arg;
evutil_socket_t fd;
struct sockaddr_in cliaddr;
socklen_t clilen;
fd = accept(listenfd, (struct sockaddr *) &cliaddr, &clilen);
if(fd < )
{
perror("accept\n");
return;
}
if(fd > FD_SETSIZE)
{
perror("fd > FD_SETSIZE");
if(close(fd) < )
{
perror("close\n");
return;
}
return;
} printf("Accept: fd = %u\n", fd); struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, read_cb, NULL, error_cb, arg);
bufferevent_enable(bev, EV_READ | EV_WRITE | EV_PERSIST);
} void read_cb(struct bufferevent *bev, void *arg)
{
char line[MAX_LINE + ];
int n;
evutil_socket_t fd = bufferevent_getfd(bev); while((n = bufferevent_read(bev, line, MAX_LINE)) > )
{
line[n] = '\0';
printf("fd = %u, read line: %s", fd, line);
bufferevent_write(bev, line, n);
}
} void error_cb(struct bufferevent *bev, short event, void *arg)
{
evutil_socket_t fd = bufferevent_getfd(bev);
printf("fd = %u, ", fd);
if(event & BEV_EVENT_TIMEOUT)
printf("Time out.\n"); // if bufferevent_set_timeouts() is called
else if(event & BEV_EVENT_EOF)
printf("Connection closed.\n");
else if(event & BEV_EVENT_ERROR)
printf("Some other error.\n");
bufferevent_free(bev);
} void write_cb(struct bufferevent *bev, void *arg)
{
// leave blank
}
注意:在Linux下编译时需要加libevent静态库event,即gcc ... -levent。
上边程序中用到的bufferevent值得再说明一下。bufferevent由一个底层的传输端口(如套接字)、一个读取缓冲区和一个写入缓冲区组成。与通常的事件在底层传输端口已经就绪,可以读取或者写入的时候执行回调不同的是,bufferevent在读取或者写入了足够量的数据之后调用用户提供的回调。详细可参考博文libevent参考手册第六章:bufferevent:概念和入门。
利用bufferevent的简易流程如下:
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, read_cb, NULL, error_cb, arg);
bufferevent_enable(bev, EV_READ | EV_WRITE | EV_PERSIST);
bufferevent_setcb使得我们可以定制我们自己的回调函数,这里我们只用到了读和错误回调函数。最后,我们要调用bufferevent_enable来使得bufferevent启动。
客户端示例
客户端用到的libevent的API跟服务端的基本一样。具体程序如下:
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/types.h> #include <event2/event.h>
#include <event2/bufferevent.h> #define SERV_PORT 9877
#define MAX_LINE 1024 void cmd_msg_cb(int fd, short event, void *arg);
void read_cb(struct bufferevent *bev, void *arg);
void error_cb(struct bufferevent *bev, short event, void *arg); int main(int argc, char *argv[])
{
if(argc < )
{
perror("usage: echocli <IPadress>");
return ;
} evutil_socket_t sockfd;
if((sockfd = socket(AF_INET, SOCK_STREAM, )) < )
{
perror("socket\n");
return ;
} struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
if(inet_pton(AF_INET, argv[], &servaddr.sin_addr) < )
{
perror("inet_ntop\n");
return ;
}
if(connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < )
{
perror("connect\n");
return ;
}
evutil_make_socket_nonblocking(sockfd); printf("Connect to server sucessfully!\n"); struct event_base *base = event_base_new();
if(base == NULL)
{
perror("event_base\n");
return ;
}
const char *eventMechanism = event_base_get_method(base);
printf("Event mechanism used is %s\n", eventMechanism);
printf("sockfd = %d\n", sockfd); struct bufferevent *bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE); struct event *ev_cmd;
ev_cmd = event_new(base, STDIN_FILENO, EV_READ | EV_PERSIST, cmd_msg_cb, (void *)bev);
event_add(ev_cmd, NULL); bufferevent_setcb(bev, read_cb, NULL, error_cb, (void *)ev_cmd);
bufferevent_enable(bev, EV_READ | EV_PERSIST); event_base_dispatch(base); printf("The End.");
return ;
} void cmd_msg_cb(int fd, short event, void *arg)
{
char msg[MAX_LINE];
int nread = read(fd, msg, sizeof(msg));
if(nread < )
{
perror("stdio read fail\n");
return;
} struct bufferevent *bev = (struct bufferevent *)arg;
bufferevent_write(bev, msg, nread);
} void read_cb(struct bufferevent *bev, void *arg)
{
char line[MAX_LINE + ];
int n;
evutil_socket_t fd = bufferevent_getfd(bev); while((n = bufferevent_read(bev, line, MAX_LINE)) > )
{
line[n] = '\0';
printf("fd = %u, read from server: %s", fd, line);
}
} void error_cb(struct bufferevent *bev, short event, void *arg)
{
evutil_socket_t fd = bufferevent_getfd(bev);
printf("fd = %u, ", fd);
if(event & BEV_EVENT_TIMEOUT)
printf("Time out.\n"); // if bufferevent_set_timeouts() is called
else if(event & BEV_EVENT_EOF)
printf("Connection closed.\n");
else if(event & BEV_EVENT_ERROR)
printf("Some other error.\n");
bufferevent_free(bev); struct event *ev = (struct event *)arg;
event_free(ev);
}