Libevent源码分析(四)--- libevent事件机制

时间:2022-04-09 00:15:44

之前几个章节都是分析libevent的辅助功能,这一节将要详细分析libevent处理事件的流程和机制,在分析之前先看一下libevent的使用方法,本文也将以libevent的使用方式入手来分析libevent的工作机制。

void cb_func(evutil_socket_t fd, short what, void *arg)
{
const char *data = arg;
printf("Got an event on socket %d:%s%s%s%s [%s]",
(int) fd,
(what&EV_TIMEOUT) ? " timeout" : "",
(what&EV_READ) ? " read" : "",
(what&EV_WRITE) ? " write" : "",
(what&EV_SIGNAL) ? " signal" : "",
data);
}

void main_loop(evutil_socket_t fd1, evutil_socket_t fd2)
{
struct event *ev1, *ev2;
struct timeval five_seconds = {5,0};
struct event_base *base = event_base_new();

/* The caller has already set up fd1, fd2 somehow, and make them
nonblocking. */


ev1 = event_new(base, fd1, EV_TIMEOUT|EV_READ|EV_PERSIST, cb_func,
(char*)"Reading event");
ev2 = event_new(base, fd2, EV_WRITE|EV_PERSIST, cb_func,
(char*)"Writing event");

event_add(ev1, &five_seconds);
event_add(ev2, NULL);
event_base_dispatch(base);
}

使用libevent,必须先初始化一个event_base结构体,event_base结构体之前分析过,下面是它的初始化代码:

struct event_base *
event_base_new_with_config(const struct event_config *cfg)
{
int i;
struct event_base *base;
int should_check_environment;

#ifndef _EVENT_DISABLE_DEBUG_MODE
event_debug_mode_too_late = 1;
#endif

if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {
event_warn("%s: calloc", __func__);
return NULL;
}
detect_monotonic();
gettime(base, &base->event_tv);

min_heap_ctor(&base->timeheap);
TAILQ_INIT(&base->eventqueue);
base->sig.ev_signal_pair[0] = -1;
base->sig.ev_signal_pair[1] = -1;
base->th_notify_fd[0] = -1;
base->th_notify_fd[1] = -1;

event_deferred_cb_queue_init(&base->defer_queue);
base->defer_queue.notify_fn = notify_base_cbq_callback;
base->defer_queue.notify_arg = base;
if (cfg)
base->flags = cfg->flags;

evmap_io_initmap(&base->io);
evmap_signal_initmap(&base->sigmap);
event_changelist_init(&base->changelist);

base->evbase = NULL;

should_check_environment =
!(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));

for (i = 0; eventops[i] && !base->evbase; i++) {
if (cfg != NULL) {
/* determine if this backend should be avoided */
if (event_config_is_avoided_method(cfg,
eventops[i]->name))
continue;
if ((eventops[i]->features & cfg->require_features)
!= cfg->require_features)
continue;
}

/* also obey the environment variables */
if (should_check_environment &&
event_is_method_disabled(eventops[i]->name))
continue;

base->evsel = eventops[i];

base->evbase = base->evsel->init(base);
}

if (base->evbase == NULL) {
event_warnx("%s: no event mechanism available",
__func__);
base->evsel = NULL;
event_base_free(base);
return NULL;
}

if (evutil_getenv("EVENT_SHOW_METHOD"))
event_msgx("libevent using: %s", base->evsel->name);

/* allocate a single active event queue */
if (event_base_priority_init(base, 1) < 0) {
event_base_free(base);
return NULL;
}

/* prepare for threading */

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
if (EVTHREAD_LOCKING_ENABLED() &&
(!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
int r;
EVTHREAD_ALLOC_LOCK(base->th_base_lock,
EVTHREAD_LOCKTYPE_RECURSIVE);
base->defer_queue.lock = base->th_base_lock;
EVTHREAD_ALLOC_COND(base->current_event_cond);
r = evthread_make_base_notifiable(base);
if (r<0) {
event_warnx("%s: Unable to make base notifiable.", __func__);
event_base_free(base);
return NULL;
}
}
#endif

#ifdef WIN32
if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP))
event_base_start_iocp(base, cfg->n_cpus_hint);
#endif

return (base);
}

该初始化函数除了初始化event_base之外还要决定使用哪个io多路复用模型,所有的io多路复用模型都定义在eventops结构体中:

static const struct eventop *eventops[] = {
#ifdef _EVENT_HAVE_EVENT_PORTS
&evportops,
#endif
#ifdef _EVENT_HAVE_WORKING_KQUEUE
&kqops,
#endif
#ifdef _EVENT_HAVE_EPOLL
&epollops,
#endif
#ifdef _EVENT_HAVE_DEVPOLL
&devpollops,
#endif
#ifdef _EVENT_HAVE_POLL
&pollops,
#endif
#ifdef _EVENT_HAVE_SELECT
&selectops,
#endif
#ifdef WIN32
&win32ops,
#endif
NULL
};

event_base_new_with_config会从上到下进行选择,尽量选用系统支持的最高效率的模型,event_config也可以配置选择哪种模型。
初始化完event_base之后就可以开始添加事件了。

struct event * event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
{
struct event *ev;
ev = mm_malloc(sizeof(struct event));
if (ev == NULL)
return (NULL);
if (event_assign(ev, base, fd, events, cb, arg) < 0) {
mm_free(ev);
return (NULL);
}

return (ev);
}
int event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
if (!base)
base = current_base;

_event_debug_assert_not_added(ev);

ev->ev_base = base;

ev->ev_callback = callback;
ev->ev_arg = arg;
ev->ev_fd = fd;
ev->ev_events = events;
ev->ev_res = 0;
ev->ev_flags = EVLIST_INIT;
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;

if (events & EV_SIGNAL) {
if ((events & (EV_READ|EV_WRITE)) != 0) {
event_warnx("%s: EV_SIGNAL is not compatible with "
"EV_READ or EV_WRITE", __func__);
return -1;
}
ev->ev_closure = EV_CLOSURE_SIGNAL;
} else {
if (events & EV_PERSIST) {
evutil_timerclear(&ev->ev_io_timeout);
ev->ev_closure = EV_CLOSURE_PERSIST;
} else {
ev->ev_closure = EV_CLOSURE_NONE;
}
}

min_heap_elem_init(ev);

if (base != NULL) {
/* by default, we put new events into the middle priority */
ev->ev_pri = base->nactivequeues / 2;
}

_event_debug_note_setup(ev);

return 0;
}

event_new用来创建一个event,event_assign用来初始化一个event。这里比较重要的是ev_flags的取值,他用来标记ev_flags的状态,EVLIST_INIT表明这是一个新创建的事件,只完成了初始化,还没有加入到任何队列中。下面是ev_flags的所有状态:

#define EVLIST_TIMEOUT 0x01
#define EVLIST_INSERTED 0x02
#define EVLIST_SIGNAL 0x04
#define EVLIST_ACTIVE 0x08
#define EVLIST_INTERNAL 0x10
#define EVLIST_INIT 0x80

初始化event之后就可以添加到队列中了:

int
event_add(struct event *ev, const struct timeval *tv)
{
int res;

if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
event_warnx("%s: event has no event_base set.", __func__);
return -1;
}

EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

res = event_add_internal(ev, tv, 0);

EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

return
(res);
}

static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;

EVENT_BASE_ASSERT_LOCKED(base);
_event_debug_assert_is_setup(ev);

event_debug((
"event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%scall %p",
ev,
EV_SOCK_ARG(ev->ev_fd),
ev->ev_events & EV_READ ? "EV_READ " : " ",
ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
tv ? "EV_TIMEOUT " : " ",
ev->ev_callback));

EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));

/*
* prepare for timeout insertion further below, if we get a
* failure on any step, we should not change any state.
*/
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve(&base->timeheap,
1 + min_heap_size(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}

/* If the main thread is currently executing a signal event's
* callback, and we are not the main thread, then we want to wait
* until the callback is done before we mess with the event, or else
* we can race on ev_ncalls and ev_pncalls below. */
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
&& !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
#endif

if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
if (ev->ev_events & (EV_READ|EV_WRITE))
res = evmap_io_add(base, ev->ev_fd, ev);
else if (ev->ev_events & EV_SIGNAL)
res = evmap_signal_add(base, (int)ev->ev_fd, ev);
if (res != -1)
event_queue_insert(base, ev, EVLIST_INSERTED);
if (res == 1) {
/* evmap says we need to notify the main thread. */
notify = 1;
res = 0;
}
}

/*
* we should change the timeout state only if the previous event
* addition succeeded.
*/
if (res != -1 && tv != NULL) {
struct timeval now;
int common_timeout;

/*
* for persistent timeout events, we remember the
* timeout value and re-add the event.
*
* If tv_is_absolute, this was already set.
*/
if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;

/*
* we already reserved memory above for the case where we
* are not replacing an existing timeout.
*/
if (ev->ev_flags & EVLIST_TIMEOUT) {
/* XXX I believe this is needless. */
if (min_heap_elt_is_top(ev))
notify = 1;
event_queue_remove(base, ev, EVLIST_TIMEOUT);
}

/* Check if it is active due to a timeout. Rescheduling
* this timeout before the callback can be executed
* removes it from the active list. */
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {
if (ev->ev_events & EV_SIGNAL) {
/* See if we are just active executing
* this event in a loop
*/
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
}

event_queue_remove(base, ev, EVLIST_ACTIVE);
}

gettime(base, &now);

common_timeout = is_common_timeout(tv, base);
if (tv_is_absolute) {
ev->ev_timeout = *tv;
} else if (common_timeout) {
struct timeval tmp = *tv;
tmp.tv_usec &= MICROSECONDS_MASK;
evutil_timeradd(&now, &tmp, &ev->ev_timeout);
ev->ev_timeout.tv_usec |=
(tv->tv_usec & ~MICROSECONDS_MASK);
} else {
evutil_timeradd(&now, tv, &ev->ev_timeout);
}

event_debug((
"event_add: timeout in %d seconds, call %p",
(int)tv->tv_sec, ev->ev_callback));

event_queue_insert(base, ev, EVLIST_TIMEOUT);
if (common_timeout) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
if (ev == TAILQ_FIRST(&ctl->events)) {
common_timeout_schedule(ctl, &now, ev);
}
} else {
/* See if the earliest timeout is now earlier than it
* was before: if so, we will need to tell the main
* thread to wake up earlier than it would
* otherwise. */
if (min_heap_elt_is_top(ev))
notify = 1;
}
}

/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);

_event_debug_note_add(ev);

return
(res);
}

事件的添加和删除需要添加锁,因为操作可能是通过其他线程调用的。event_add_internal的第二个参数代表这是一个和时间相关的值,第三个参数则用来表示第二个参数是绝对值还是相对值,即是具体的某一个时间点还是一个时间间隔。如果时间不为空并且EVLIST_TIMEOUT没有被设置,则需要在小根堆中预留一个位置。base->current_event代表主线程正在处理该event的callback,此时的添加和删除操作都需要等待处理完成的通知。之后就要把事件插入到对应的数据结构中了,如果是EV_READ或者EV_WRITE事件,则插入到evmap_io中,如果是EV_SIGNAL事件则插入到evmap_signal中。EV_SIGNAL事件和其他两个事件是互斥的。之后调用event_queue_insert方法把事件插入到event_base中的双向链表中,event_queue_insert根据传入的queue标记把event添加相应的状态并且插入到对应的数据结构中。此时改event的ev_flags标记变为了EVLIST_INIT|EVLIST_INSERTED,下面是event_queue_insert函数:

static void
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
EVENT_BASE_ASSERT_LOCKED(base);

if (ev->ev_flags & queue) {
/* Double insertion is possible for active events */
if (queue & EVLIST_ACTIVE)
return;

event_errx(1, "%s: %p(fd "EV_SOCK_FMT") already on queue %x", __func__,
ev, EV_SOCK_ARG(ev->ev_fd), queue);
return;
}

if (~ev->ev_flags & EVLIST_INTERNAL)
base->event_count++;

ev->ev_flags |= queue;
switch (queue) {
case EVLIST_INSERTED:
TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
break;
case EVLIST_ACTIVE:
base->event_count_active++;
TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
ev,ev_active_next);
break;
case EVLIST_TIMEOUT: {
if (is_common_timeout(&ev->ev_timeout, base)) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
insert_common_timeout_inorder(ctl, ev);
} else
min_heap_push(&base->timeheap, ev);
break;
}
default:
event_errx(1, "%s: unknown queue %x", __func__, queue);
}
}

继续看event_add_internal,当调用event_queue_insert之后需要设置notify标记,该标记用于在非主线程操作时唤醒主线乘。因为可能新的event设置的timeout时间小于当前io模型的timeout时间,唤醒的方式依旧是通过socketpair,因为socketpari中的其中一个套接字已经作为一个内部event添加到event_base中,只要有写事件会马上返回,停止睡眠。
接下来的代码都是处理时间相关的,首先如果这是一个persist事件并且时间设置的是相对时间,则需要保存这个相对时间,ev_io_timeout用于存储该时间,persist和signal事件是互斥的。ev_timeout会存储一个相对值,之后再次调用event_queue_insert将event存储到小跟堆或者是common list列表中。这里需要注意的是因为新加入到的时间时间如果是common时间并且新加入的event在commonlist 列表的第一个则需要调整common_timeout_list的timeout_event,timeout_event可能之前在小根堆中(队列之前不为空,并且新加入的event的timeout时间小于timeout_event的过期时间,在前面的章节中分析过这在理论上是不可能的,但是libevent还是做了一次检查),也可能不在小根队中。

static void
common_timeout_schedule(struct common_timeout_list *ctl,
const struct timeval *now, struct event *head)
{
struct timeval timeout = head->ev_timeout;
timeout.tv_usec &= MICROSECONDS_MASK;
event_add_internal(&ctl->timeout_event, &timeout, 1);
}

event_add_internal方法中有判断,如果ev_flags有标记EVLIST_TIMEOUT,则会调用 event_queue_remove(base, ev, EVLIST_TIMEOUT)先从小根堆(common_timeout_list的timeout_event在小根堆中)中移除,然后重新添加到小根堆中。
最后event_add_internal会根据情况判断是否唤醒主线程。

event_add分析完成之后就是event_base_dispatch函数了,该函数是event_base的主循环。内部实际运行的是event_base_loop方法:

int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;

/* Grab the lock. We will release it inside evsel.dispatch, and again
* as we invoke user callbacks. */

EVBASE_ACQUIRE_LOCK(base, th_base_lock);

if (base->running_loop) {
event_warnx("%s: reentrant invocation. Only one event_base_loop"
" can run on each event_base at once.", __func__);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return -1;
}

base->running_loop = 1;

clear_time_cache(base);

if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base(base);

done = 0;

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif

base->event_gotterm = base->event_break = 0;

while (!done) {
base->event_continue = 0;

/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
break;
}

if (base->event_break) {
break;
}

timeout_correct(base, &tv);

tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
/*
* if we have active events, we just poll new events
* without waiting.
*/

evutil_timerclear(&tv);
}

/* If we have no events, we just exit */
if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}

/* update last old time */
gettime(base, &base->event_tv);

clear_time_cache(base);

res = evsel->dispatch(base, tv_p);

if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}

update_time_cache(base);

timeout_process(base);

if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));

done:
clear_time_cache(base);
base->running_loop = 0;

EVBASE_RELEASE_LOCK(base, th_base_lock);

return (retval);
}

evsig_set_base方法使得libevent中只有最后调用event_base_dispatch的event_base才能支持信号量事件。在进入到while循环之前,event_base_loop会获取全局锁:EVBASE_ACQUIRE_LOCK(base, th_base_lock);进入while循环之后会进行时间的校正,这在之前的博客中分析过。之后是一些状态的判断和时间的设置,接着就调用了evsel->dispatch(base, tv_p),如果当前有激活事件,tv_p则为空,如果没有tv_p则设置为小根堆中的最小时间。 该方法会调用对应的io模型的dispach方法用于检测io事件,如果有事件则调用 evmap_io_active(base, i, res),该方法定义如下:

void
evmap_io_active(struct event_base *base, evutil_socket_t fd, short events)
{
struct event_io_map *io = &base->io;
struct evmap_io *ctx;
struct event *ev;

#ifndef EVMAP_USE_HT
EVUTIL_ASSERT(fd < io->nentries);
#endif
GET_IO_SLOT(ctx, io, fd, evmap_io);

EVUTIL_ASSERT(ctx);
TAILQ_FOREACH(ev, &ctx->events, ev_io_next) {
if (ev->ev_events & events)
event_active_nolock(ev, ev->ev_events & events, 1);
}
}

evmap_io_active会遍历所有与该fd相关的event,如果fd上的事件是event监听的事件,则调用event_active_nolock方法:

void
event_active_nolock(struct event *ev, int res, short ncalls)
{
struct event_base *base;

event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p",
ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback));


/* We get different kinds of events, add them together */
if (ev->ev_flags & EVLIST_ACTIVE) {
ev->ev_res |= res;
return;
}

base = ev->ev_base;

EVENT_BASE_ASSERT_LOCKED(base);

ev->ev_res = res;

if (ev->ev_pri < base->event_running_priority)
base->event_continue = 1;

if (ev->ev_events & EV_SIGNAL) {
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
#endif
ev->ev_ncalls = ncalls;
ev->ev_pncalls = NULL;
}

event_queue_insert(base, ev, EVLIST_ACTIVE);

if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
}

该函数会调用event_queue_insert(base, ev, EVLIST_ACTIVE),用于把event添加到激活列表中,event_queue_insert内部会判断是否已经有了EVLIST_ACTIVE标记,如果有则不会重复添加。
继续看event_base_loop函数,处理完IO事件之后会接着处理小根堆定时器事件。

/* Activate every event whose timeout has elapsed. */
static void
timeout_process(struct event_base *base)
{
/* Caller must hold lock. */
struct timeval now;
struct event *ev;

if (min_heap_empty(&base->timeheap)) {
return;
}

gettime(base, &now);

while ((ev = min_heap_top(&base->timeheap))) {
if (evutil_timercmp(&ev->ev_timeout, &now, >))
break;

/* delete this event from the I/O queues */
event_del_internal(ev);

event_debug(("timeout_process: call %p",
ev->ev_callback));
event_active_nolock(ev, EV_TIMEOUT, 1);
}
}

timeout_process会把小根堆中所有超时事件都调用event_del_internal,如果这是一个EV_PERSIST事件,之后在event_persist_closure还会添加回来,最后调用event_active_nolock把事件加入到激活事件链表。最后当激活链表中有事件时会调用event_process_active(base)来处理所有的激活事件:

static int
event_process_active(struct event_base *base)
{
/* Caller must hold th_base_lock */
struct event_list *activeq = NULL;
int i, c = 0;

for (i = 0; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
base->event_running_priority = i;
activeq = &base->activequeues[i];
c = event_process_active_single_queue(base, activeq);
if (c < 0) {
base->event_running_priority = -1;
return -1;
} else if (c > 0)
break; /* Processed a real event; do not
* consider lower-priority events */

/* If we get here, all of the events we processed
* were internal. Continue. */

}
}

event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
base->event_running_priority = -1;
return c;
}

event_process_active会根据优先级顺序调用event_process_active_single_queue处理已激活状态的事件,deferred_callbacks将在后面event_buffer中详细分析:

static int
event_process_active_single_queue(struct event_base *base, struct event_list *activeq)
{
struct event *ev;
int count = 0;

EVUTIL_ASSERT(activeq != NULL);

for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
if (ev->ev_events & EV_PERSIST)
event_queue_remove(base, ev, EVLIST_ACTIVE);
else
event_del_internal(ev);
if (!(ev->ev_flags & EVLIST_INTERNAL))
++count;

event_debug((
"event_process_active: event: %p, %s%scall %p",
ev,
ev->ev_res & EV_READ ? "EV_READ " : " ",
ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_callback));

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->current_event = ev;
base->current_event_waiters = 0;
#endif
switch (ev->ev_closure) {
case EV_CLOSURE_SIGNAL:
event_signal_closure(base, ev);
break;
case EV_CLOSURE_PERSIST:
event_persist_closure(base, ev);
break;
default:
case EV_CLOSURE_NONE:
EVBASE_RELEASE_LOCK(base, th_base_lock);
(*ev->ev_callback)(
ev->ev_fd, ev->ev_res, ev->ev_arg);
break;
}
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->current_event = NULL;
if (base->current_event_waiters) {
base->current_event_waiters = 0;
EVTHREAD_COND_BROADCAST(base->current_event_cond);
}
#endif

if (base->event_break)
return -1;
if (base->event_continue)
break;
}
return count;
}

如果事件是EV_PERSIST类型只需要从激活队列中移除即可,否则就会执行event_del_internal。如果事件是定时器事件,那么该事件在timeout_process中已经移除过一次了,但是当时事件不是激活状态的。所以此时的event_queue_remove和event_del_internal作用相同,都是从激活列表中移除。接下来需要特殊处理的就是信号量事件和EV_PERSIST事件,信号量事件需要使用event_signal_closure关闭,EV_PERSIST需要调用event_persist_closure进行清理,普通事件直接调用回调即可:

static inline void
event_persist_closure(struct event_base *base, struct event *ev)
{
/* reschedule the persistent event if we have a timeout. */
if (ev->ev_io_timeout.tv_sec || ev->ev_io_timeout.tv_usec) {
/* If there was a timeout, we want it to run at an interval of
* ev_io_timeout after the last time it was _scheduled_ for,
* not ev_io_timeout after _now_. If it fired for another
* reason, though, the timeout ought to start ticking _now_. */
struct timeval run_at, relative_to, delay, now;
ev_uint32_t usec_mask = 0;
EVUTIL_ASSERT(is_same_common_timeout(&ev->ev_timeout,
&ev->ev_io_timeout));
gettime(base, &now);
if (is_common_timeout(&ev->ev_timeout, base)) {
delay = ev->ev_io_timeout;
usec_mask = delay.tv_usec & ~MICROSECONDS_MASK;
delay.tv_usec &= MICROSECONDS_MASK;
if (ev->ev_res & EV_TIMEOUT) {
relative_to = ev->ev_timeout;
relative_to.tv_usec &= MICROSECONDS_MASK;
} else {
relative_to = now;
}
} else {
delay = ev->ev_io_timeout;
if (ev->ev_res & EV_TIMEOUT) {
relative_to = ev->ev_timeout;
} else {
relative_to = now;
}
}
evutil_timeradd(&relative_to, &delay, &run_at);
if (evutil_timercmp(&run_at, &now, <)) {
/* Looks like we missed at least one invocation due to
* a clock jump, not running the event loop for a
* while, really slow callbacks, or
* something. Reschedule relative to now.
*/
evutil_timeradd(&now, &delay, &run_at);
}
run_at.tv_usec |= usec_mask;
event_add_internal(ev, &run_at, 1);
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
}

该函数主要是重置时间,不管激活事件是不是因为timeout引起的都需要重置时间然后重新添加事件。event_add_internal会进行判断如果事件已经在evmap_io中或者evmap_signal中则不处理,但是如果在小根堆活着commonlist中则需要移除在添加。

以上就是libevent处理事件的流程,下面是作者在网上找的一幅流程图:
Libevent源码分析(四)--- libevent事件机制