Libevent源码分析-event处理流程

时间:2022-04-08 00:03:49

event处理流程

Libevent处理时间的大概流程为
1、设置event_base(即初始化Reactor)
2、设置event事件(初始化event)
3、将event和event_base关联(将event注册到event_base)
4、进入循环,等待事件
5、事件发生,处理事件。
用UML序列图可以表示为:
Libevent源码分析-event处理流程

#include <iostream>
#include <sys/time.h>
#include <event.h>
struct event ev;
struct timeval tv;

void time_cb(int fd, short event, void* argc)
{
std::cout<<"Timer wakeup"<<std::endl;
event_add(&ev, &tv);
}

int main()
{

struct event_base* base=event_base_new();


tv.tv_sec=10;
tv.tv_usec=0;
evtimer_set(&ev, time_cb, NULL);//设置定时器事件
event_base_set(base, &ev);//将event和event_base关联
event_add(&ev, &tv);//注册事件
event_base_dispatch(base);//进入loop循环,等待事件

return 0;
}

首先创建了event_base作为Reactor,设置了定时器事件作为event。随后将ev和base关联,注册ev到事件分发器(evsel,在event_base中)。最后进入loop循环,等待事件发生。

看一下event_base_set(base, &ev)

int event_base_set(struct event_base *base, struct event *ev)
{
/* Only innocent events may be assigned to a different base */
if (ev->ev_flags != EVLIST_INIT)//确保event已经初始化
return (-1);

_event_debug_assert_is_setup(ev);

ev->ev_base = base;//设置event_base为event的Reactor
ev->ev_pri = base->nactivequeues/2;//event的优先级

return (0);
}

可以看出这一步只是将event和event_base关联,即在event中保存它所在的Reactor。

再来看看event_add(&ev, &tv)

int event_add(struct event *ev, const struct timeval *tv)
{
int res;

if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
event_warnx("%s: event has no event_base set.", __func__);
return -1;
}

EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

res = event_add_internal(ev, tv, 0);//这里才是主体部分

EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

return (res);
}

再来看一下event_add_internal(ev, tv, 0),略去了部分代码

static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;


if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {//tv不为空,且要添加到time堆中,则为time堆扩容
if (min_heap_reserve(&base->timeheap,
1 + min_heap_size(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}

/*如果事件是IO或信号事件,且事件已经添加或激活,则插入到相应队列
*/

if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
if (ev->ev_events & (EV_READ|EV_WRITE))//IO事件
res = evmap_io_add(base, ev->ev_fd, ev);//添加到IO事件队列
else if (ev->ev_events & EV_SIGNAL)//信号事件
res = evmap_signal_add(base, (int)ev->ev_fd, ev);//添加到信号事件队列
if (res != -1)
event_queue_insert(base, ev, EVLIST_INSERTED);//将事件插入到队列base->eventqueue
if (res == 1) {
/* evmap says we need to notify the main thread. */
notify = 1;
res = 0;
}
}

gettime(base, &now);//更新base中的time

event_queue_insert(base, ev, EVLIST_TIMEOUT);//添加到定时器小根堆


/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))//如果当前线程不是loop所在线程,唤醒loop线程。
evthread_notify_base(base);

return (res);
}

最后看一下event_base_dispatch(base);

int event_base_dispatch(struct event_base *event_base)
{
return (event_base_loop(event_base, 0));
}

看一下event_base_loop(event_base, 0)

int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;//事件分发器 event demultiplexer
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;


if (base->running_loop) {//loop循环只能在event_base所在线程
event_warnx("%s: reentrant invocation. Only one event_base_loop"
" can run on each event_base at once.", __func__);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return -1;
}

base->running_loop = 1;//表明even_base在运行loop,防止其他线程运行

clear_time_cache(base);//置零tv_cache

if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)//如果信号事件设置
evsig_set_base(base);//设置signal_pair[0]位信号源

done = 0;

base->event_gotterm = base->event_break = 0;

while (!done) {//进入loop循环主体
base->event_continue = 0;

/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}

timeout_correct(base, &tv);//更新时间

tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
/*
* if we have active events, we just poll new events
* without waiting.
*/
evutil_timerclear(&tv);
}

/* If we have no events, we just exit */
if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;//用了goto?
}

/* update last old time */
gettime(base, &base->event_tv);

clear_time_cache(base);

res = evsel->dispatch(base, tv_p);//epoll_wait等,阻塞等待事件

if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}

update_time_cache(base);

timeout_process(base);//处理到时事件

if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);//处理事件,处理过程中有优先级
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}

done://goto用的label
clear_time_cache(base);
base->running_loop = 0;

EVBASE_RELEASE_LOCK(base, th_base_lock);

return (retval);
}