libevent源码学习_event_test

时间:2021-08-16 15:02:40

对应的sample文件中提供了event_test.c,里面就是关于事件的简单示例,具体如下:

 /*
* Compile with:
* cc -I/usr/local/include -o event-test event-test.c -L/usr/local/lib -levent
*/ #ifdef HAVE_CONFIG_H
#include "config.h"
#endif #include <sys/types.h>
#include <sys/stat.h>
#include <sys/queue.h>
#include <unistd.h>
#include <sys/time.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h> #include <event.h> static void
fifo_read(int fd, short event, void *arg)
{
char buf[];
int len;
struct event *ev = arg; /* Reschedule this event */
event_add(ev, NULL); fprintf(stderr, "fifo_read called with fd: %d, event: %d, arg: %p\n",
fd, event, arg); len = read(fd, buf, sizeof(buf) - ); if (len == -) {
perror("read");
return;
} else if (len == ) {
fprintf(stderr, "Connection closed\n");
return;
} buf[len] = '\0'; fprintf(stdout, "Read: %s\n", buf);
} int
main (int argc, char **argv)
{
struct event evfifo; struct stat st;
const char *fifo = "event.fifo";
int socket; if (lstat (fifo, &st) == ) {
if ((st.st_mode & S_IFMT) == S_IFREG) {
errno = EEXIST;
perror("lstat");
exit ();
}
} unlink (fifo);
if (mkfifo (fifo, ) == -) {
perror("mkfifo");
exit ();
} socket = open (fifo, O_RDWR | O_NONBLOCK, ); if (socket == -) {
perror("open");
exit ();
} fprintf(stderr, "Write data to %s\n", fifo); /* Initalize the event library */
event_init(); /* Initalize one event */
event_set(&evfifo, socket, EV_READ, fifo_read, &evfifo); /* Add it to the active events, without a timeout */
event_add(&evfifo, NULL); event_dispatch(); return ();
}

从这个例子中,我们可以看到使用libevent的基本步骤:

event_init --> event_set --> event_add --> event_dispatch

下面分步来解析这些函数。

1、函数event_init()

位于:event.c

 struct event_base *
event_init(void)
{
struct event_base *base = event_base_new(); if (base != NULL)
current_base = base; return (base);
}

比较简单,就是调用了函数event_base_new初始化一个event_base类型的变量,并且将这个变量赋值给一个全局变量current_base。

2、函数event_base_new()

位于:event.c

 struct event_base *
event_base_new(void)
{
int i;
struct event_base *base; /*
* 在堆上分配内存存储event_base,所有字段初始化为0
*/
if ((base = calloc(, sizeof(struct event_base))) == NULL)
event_err(, "%s: calloc", __func__); /*
* 设置use_monotonic变量
*/
detect_monotonic(); /*
* 得到当前时间
*/
gettime(base, &base->event_tv); /*
* 初始化小根堆
*/
min_heap_ctor(&base->timeheap); /*
* 初始化注册时间队列
*/
TAILQ_INIT(&base->eventqueue); /*
* 初始化socketpair
*/
base->sig.ev_signal_pair[] = -;
base->sig.ev_signal_pair[] = -; /*
* C语言实现多态
* 根据所支持的系统调用进行对应的初始化
*/
base->evbase = NULL;
for (i = ; eventops[i] && !base->evbase; i++) {
base->evsel = eventops[i]; base->evbase = base->evsel->init(base);
} if (base->evbase == NULL)
event_errx(, "%s: no event mechanism available", __func__); if (evutil_getenv("EVENT_SHOW_METHOD"))
event_msgx("libevent using: %s\n",
base->evsel->name); /*
* 设置优先级base->nactivequeues,分配数组base->activequeues
* 数组大小和优先级相同
*/
/* allocate a single active event queue */
event_base_priority_init(base, ); return (base);
}

主要做了以下事情:

(1)给event_base类型变量分配空间

(2)初始化小根堆【struct min_heap timeheap,位于位于结构体event_base】

(3)初始化注册事件队列【struct event_list eventqueue,位于位于结构体event_base】

(4)根据系统支持的系统调用初始化后面真正干活的eventop实例对象【void *evbase,位于结构体event_base】

(5)调用函数event_base_priority_init() 初始化优先队列,确认的说应该是活跃事件的队列,它是带优先级的,因为这里是最开始的初始化,所以就初始化一个队列,并且它的优先级为1,这个优先级就作为初始化队列的数量。具体见下面函数event_base_priority_init() 的分析

上面的init以epoll为例,位于:epoll.c

 static void *
epoll_init(struct event_base *base)
{
int epfd;
struct epollop *epollop; /* Disable epollueue when this environment variable is set */
if (evutil_getenv("EVENT_NOEPOLL"))
return (NULL); /* Initalize the kernel queue */
if ((epfd = epoll_create()) == -) {
if (errno != ENOSYS)
event_warn("epoll_create");
return (NULL);
} FD_CLOSEONEXEC(epfd); if (!(epollop = calloc(, sizeof(struct epollop))))
return (NULL); epollop->epfd = epfd; /* Initalize fields */
epollop->events = malloc(INITIAL_NEVENTS * sizeof(struct epoll_event));
if (epollop->events == NULL) {
free(epollop);
return (NULL);
}
epollop->nevents = INITIAL_NEVENTS; epollop->fds = calloc(INITIAL_NFILES, sizeof(struct evepoll));
if (epollop->fds == NULL) {
free(epollop->events);
free(epollop);
return (NULL);
}
epollop->nfds = INITIAL_NFILES; evsignal_init(base); return (epollop);
}

调用了系统调用epoll_create,创建出ep_fd,然后初始化了结构体epollop的成员变量。

 struct epollop {
struct evepoll *fds;
int nfds;
struct epoll_event *events;
int nevents;
int epfd;
};
 struct evepoll {
struct event *evread;
struct event *evwrite;
};

3、函数event_base_priority_init()

位于:event.c

 int
event_base_priority_init(struct event_base *base, int npriorities)
{
int i; /*
* 当前base上有活跃的events则不能设置优先级,返回
*/
if (base->event_count_active)
return (-); /*
* 不同,则先释放原先的activequeues数组
*/
if (base->nactivequeues && npriorities != base->nactivequeues) {
for (i = ; i < base->nactivequeues; ++i) {
free(base->activequeues[i]);
}
free(base->activequeues);
} /*
* 设置新的优先级
* 设置和优先级值相同大小的event_list数组
*/
/* Allocate our priority queues */
base->nactivequeues = npriorities;
base->activequeues = (struct event_list **)
calloc(base->nactivequeues, sizeof(struct event_list *)); if (base->activequeues == NULL)
event_err(, "%s: calloc", __func__); /*
* 初始化activequeues数组中每个元素
*/
for (i = ; i < base->nactivequeues; ++i) {
base->activequeues[i] = malloc(sizeof(struct event_list));
if (base->activequeues[i] == NULL)
event_err(, "%s: malloc", __func__);
TAILQ_INIT(base->activequeues[i]);
} return ();
}

初始化结构体event_base里面的struct event_list **activequeues成员,这是个2维数组,其中的元素activequeues[priority]是一个链表,这个链表里面对应的是相同优先级的事件。

为了更清晰上面的初始化,附上UML图如下:

libevent源码学习_event_test

4、函数event_set()

位于:event.c

 void
event_set(struct event *ev, int fd, short events,
void (*callback)(int, short, void *), void *arg)
{
/* Take the current base - caller needs to set the real base later */
ev->ev_base = current_base; ev->ev_callback = callback;
ev->ev_arg = arg;
ev->ev_fd = fd;
ev->ev_events = events;
ev->ev_res = ;
ev->ev_flags = EVLIST_INIT;
ev->ev_ncalls = ;
ev->ev_pncalls = NULL; min_heap_elem_init(ev); /* by default, we put new events into the middle priority */
if(current_base)
ev->ev_pri = current_base->nactivequeues/;
}

5、函数event_add()

位于:event.c

 int
event_add(struct event *ev, const struct timeval *tv)
{
/*
* 要注册到的event_base
* 得到ev对应的反应堆实例event_base
*/
struct event_base *base = ev->ev_base;
const struct eventop *evsel = base->evsel;
void *evbase = base->evbase;
int res = ; event_debug((
"event_add: event: %p, %s%s%scall %p",
ev,
ev->ev_events & EV_READ ? "EV_READ " : " ",
ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
tv ? "EV_TIMEOUT " : " ",
ev->ev_callback)); assert(!(ev->ev_flags & ~EVLIST_ALL)); /*
* ev->ev_events表示事件类型
* 如果ev->ev_events是读/写/信号事件,而且ev不在已注册队列或已就绪队列
* 那么调用evbase注册ev事件
*/
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
res = evsel->add(evbase, ev);
if (res != -)
/*
* 注册成功,插入event到已注册链表中
*/
event_queue_insert(base, ev, EVLIST_INSERTED);
} return (res);
}

只留下了关键逻辑,精简了和定时器相关的代码。

上面的add以epoll为例,位于:epoll.c

 static int
epoll_add(void *arg, struct event *ev)
{
struct epollop *epollop = arg;
struct epoll_event epev = {, {}};
struct evepoll *evep;
int fd, op, events; if (ev->ev_events & EV_SIGNAL)
return (evsignal_add(ev)); fd = ev->ev_fd;
if (fd >= epollop->nfds) {
/* Extent the file descriptor array as necessary */
if (epoll_recalc(ev->ev_base, epollop, fd) == -)
return (-);
} /*
* 获得地址,后面给它赋值
*/
evep = &epollop->fds[fd];
op = EPOLL_CTL_ADD;
events = ; /*
* 如果原先存在就EPOLL_CTL_MOD而不是EPOLL_CTL_ADD
*/
if (evep->evread != NULL) {
events |= EPOLLIN;
op = EPOLL_CTL_MOD;
}
if (evep->evwrite != NULL) {
events |= EPOLLOUT;
op = EPOLL_CTL_MOD;
} /*
* 设置关注的事件
*/
if (ev->ev_events & EV_READ)
events |= EPOLLIN;
if (ev->ev_events & EV_WRITE)
events |= EPOLLOUT; epev.data.fd = fd;
epev.events = events;
if (epoll_ctl(epollop->epfd, op, ev->ev_fd, &epev) == -)
return (-); /* Update events responsible */
if (ev->ev_events & EV_READ)
evep->evread = ev;
if (ev->ev_events & EV_WRITE)
evep->evwrite = ev; return ();
}

如果新加入的fd大小大于了之前分配的fd最大个数,则需要调用函数epoll_recalc()重新分配空间,否则就是更新相关的结构体变量,并调用系统调用epoll_ctl来EPOLL_CTL_ADD或EPOLL_CTL_MOD对应的事件。

函数epoll_recalc()解析如下:

 static int
epoll_recalc(struct event_base *base, void *arg, int max)
{
struct epollop *epollop = arg; /*
* 当前的fd大于了之前根据最大fd分配的结构体evepoll个数,重新分配,否则直接返回
*/
if (max >= epollop->nfds) {
struct evepoll *fds;
int nfds; /*
* 每次以2倍大小扩充
*/
nfds = epollop->nfds;
while (nfds <= max)
nfds <<= ; /*
* 扩充
*/
fds = realloc(epollop->fds, nfds * sizeof(struct evepoll));
if (fds == NULL) {
event_warn("realloc");
return (-);
} /*
* 更新成员变量的值,并且把新扩充的内存清空
*/
epollop->fds = fds;
memset(fds + epollop->nfds, ,
(nfds - epollop->nfds) * sizeof(struct evepoll));
epollop->nfds = nfds;
} return ();
}

在上面的add完毕后,要把对应的事件放到已注册事件的链表里面。

 void
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
/*
* ev可能已经在激活列表中了,避免重复插入
*/
if (ev->ev_flags & queue) {
/* Double insertion is possible for active events */
if (queue & EVLIST_ACTIVE)
return; event_errx(, "%s: %p(fd %d) already on queue %x", __func__,
ev, ev->ev_fd, queue);
} if (~ev->ev_flags & EVLIST_INTERNAL)
base->event_count++; /*
* 记录queue标记
*/
ev->ev_flags |= queue;
switch (queue) {
/*
* I/O或Signal事件,加入已注册事件链表
*/
case EVLIST_INSERTED:
TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
break;
/*
* 就绪事件,加入激活链表
*/
case EVLIST_ACTIVE:
base->event_count_active++;
TAILQ_INSERT_TAIL(base->activequeues[ev->ev_pri],
ev,ev_active_next);
break;
/*
* 定时事件,加入堆
*/
case EVLIST_TIMEOUT: {
min_heap_push(&base->timeheap, ev);
break;
}
default:
event_errx(, "%s: unknown queue %x", __func__, queue);
}
}

6、函数event_dispatch()

位于:event.c

 int
event_dispatch(void)
{
return (event_loop());
}
 /* not thread safe */

 int
event_loop(int flags)
{
return event_base_loop(current_base, flags);
}

可以看到,用了全局变量current_base,所以它并不是线程安全的。

 int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
void *evbase = base->evbase;
struct timeval tv;
struct timeval *tv_p;
int res, done; /* clear time cache */
base->tv_cache.tv_sec = ; done = ;
while (!done) {
/*
* 校正系统时间,如果系统使用的是非MONOTONIC时间,用户可能会向后调整了系统时间
* 在timeout_correct函数里,比较last wait time和当前时间
* 如果当前时间< last wait time 表明时间有问题
* 这时需要更新timer_heap中所有定时事件的超时时间。
*/
timeout_correct(base, &tv); /*
* 根据timer heap中事件的最小超时时间,计算系统I/O demultiplexer的最大等待时间
*/
tv_p = &tv;
if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
/*
* 依然有未处理的就绪事件,就让I/O demultiplexer立即返回,不必等待
* 下面会提到,在libevent中,低优先级的就绪事件可能不能立即被处理
*/
/*
* if we have active events, we just poll new events
* without waiting.
*/
evutil_timerclear(&tv);
} /* If we have no events, we just exit */
if (!event_haveevents(base)) {
event_debug(("%s: no events registered.", __func__));
return ();
} /* update last old time */
gettime(base, &base->event_tv); /* clear time cache */
base->tv_cache.tv_sec = ; /*
* 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select等;
* 在evsel->dispatch()中,会把就绪signal event、I/O event插入到激活链表中
*/
res = evsel->dispatch(base, evbase, tv_p); if (res == -)
return (-); /*
* 将time cache赋值为当前系统时间
*/
gettime(base, &base->tv_cache); /*
* 检查heap中的timer events
* 将就绪的timer event从heap上删除,并插入到激活链表中
*/
timeout_process(base); /*
* 调用event_process_active()处理激活链表中的就绪event,调用其回调函数执行事件处理
* 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表,
* 然后处理该链表中的所有就绪事件;
* 因此低优先级的就绪事件可能得不到及时处理
*
* 见函数event_process_active()的注释:
* Active events are stored in priority queues. Lower priorities are always
* process before higher priorities. Low priority events can starve high
* priority ones.
*
* 在函数event_process_active()里面就是寻找priority值最小的已就绪事件队列
* 找到一个就开始处理里面所有的事件回调了,其他的队列根本就不管了......
* 所以原作者用的Low priority events can starve high priority ones非常贴切
*/
if (base->event_count_active) {
/*
* 处理event_base的活跃链表中的事件
* 调用event的回调函数,优先级高的event先处理
*/
event_process_active(base); if (!base->event_count_active && (flags & EVLOOP_ONCE)) {
done = ;
}
} else if (flags & EVLOOP_NONBLOCK) {
done = ;
}
} /* clear time cache */
base->tv_cache.tv_sec = ; event_debug(("%s: asked to terminate loop.", __func__));
return ();
}

1、去掉了一些无用的业务逻辑代码。

2、调用epoll_dispatch来进行事件的分发,激活就绪的事件都弄到激活的链表里面去。

3、调用timeout_process把超时的事件也弄到激活的链表里面去。

4、调用event_process_active开始处理,调用对应事件的回调函数;需要注意的是:就是寻找priority值最小的已就绪事件队列,找到一个就开始处理里面所有的事件回调了,其他的队列根本就不管了......所以原作者用的Low priority events can starve high priority ones非常贴切。

5、关于定时器、时间相关的函数未仔细看。

函数epoll_dispatch(),位于epoll.c

 static int
epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv)
{
struct epollop *epollop = arg;
struct epoll_event *events = epollop->events;
struct evepoll *evep;
int i, res, timeout = -; if (tv != NULL)
timeout = tv->tv_sec * + (tv->tv_usec + ) / ; if (timeout > MAX_EPOLL_TIMEOUT_MSEC) {
/* Linux kernels can wait forever if the timeout is too big;
* see comment on MAX_EPOLL_TIMEOUT_MSEC. */
timeout = MAX_EPOLL_TIMEOUT_MSEC;
} res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); if (res == -) {
if (errno != EINTR) {
event_warn("epoll_wait");
return (-);
} evsignal_process(base);
return ();
} else if (base->sig.evsignal_caught) {
evsignal_process(base);
} event_debug(("%s: epoll_wait reports %d", __func__, res)); for (i = ; i < res; i++) {
int what = events[i].events;
struct event *evread = NULL, *evwrite = NULL;
int fd = events[i].data.fd; if (fd < || fd >= epollop->nfds)
continue;
evep = &epollop->fds[fd]; if (what & (EPOLLHUP|EPOLLERR)) {
evread = evep->evread;
evwrite = evep->evwrite;
} else {
if (what & EPOLLIN) {
evread = evep->evread;
} if (what & EPOLLOUT) {
evwrite = evep->evwrite;
}
} if (!(evread||evwrite))
continue; if (evread != NULL)
event_active(evread, EV_READ, );
if (evwrite != NULL)
event_active(evwrite, EV_WRITE, );
} if (res == epollop->nevents && epollop->nevents < MAX_NEVENTS) {
/* We used all of the event space this time. We should
be ready for more events next time. */
int new_nevents = epollop->nevents * ;
struct epoll_event *new_events; new_events = realloc(epollop->events,
new_nevents * sizeof(struct epoll_event));
if (new_events) {
epollop->events = new_events;
epollop->nevents = new_nevents;
}
} return ();
}

函数event_process_active()

 static void
event_process_active(struct event_base *base)
{
struct event *ev;
struct event_list *activeq = NULL;
int i;
short ncalls; /*
* 寻找最高优先级(priority值越小优先级越高)的已就绪事件队列
*/
for (i = ; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
activeq = base->activequeues[i];
break;
}
} assert(activeq != NULL); for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
/*
* 如果有persist标志,则只从激活队列中移除此事件
*/
if (ev->ev_events & EV_PERSIST) {
event_queue_remove(base, ev, EVLIST_ACTIVE);
}
/*
* 否则则从激活事件列表、已注册事件、监听事件的兴趣列表中全部干掉此事件
*/
else {
event_del(ev);
} /* Allows deletes to work */
ncalls = ev->ev_ncalls;
/*
* 每个事件的回调函数的调用次数
*/
ev->ev_pncalls = &ncalls;
while (ncalls) {
ncalls--;
ev->ev_ncalls = ncalls;
/*
* 回调
*/
(*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);
if (base->event_break)
return;
}
}
}

至此,看了2个下午的libevent事件处理流程收官!

event-test.c例子中使用一个命名管道(也被称为FIFO文件),它通过读的方式打开一个命名管道,并且监听这个命名管道是否有数据可读,当有数据可读时会执行fifo_read函数,把读取的内容打印出来。

可以搞一个往这个命名管道写内容的简单的程序,进行测试:

 #include <sys/types.h>
#include <sys/stat.h>
#include <sys/queue.h>
#include <sys/time.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h> int main(int argc, char **argv)
{
char *input = argv[];
if (argc != )
{
input = "hello";
}
int fd ;
fd = open("event.fifo",O_WRONLY);
if(fd == -){
perror("open error");
exit(EXIT_FAILURE);
} write(fd, input, strlen(input));
close(fd);
printf("write success\n");
return ;
}

本文参考自:

http://blog.csdn.net/lyh66/article/details/46328531

http://www.cnblogs.com/zxiner/category/1010504.html

http://blog.csdn.net/sparkliang/article/category/660506