定时器事件的创建
Libevent 一般调用evtimer_new来定义一个定时器事件
#define evtimer_new(b, cb, arg) event_new((b), -1, 0, (cb), (arg))
从宏定义来看,这个事件和io、signal事件的区别在于fd项为-1,表示并不关注, 并且events项为0, 并不是想象中的EV_TIMEOUT.
evtimer_new只是初始化了一个一般的超时事件,而真正将一个事件进行超时处理是在
event_add函数的第二个参数必须指定一个超时时间。
总结来说,不管event_new创建了一个什么类型的event,如果在event_add的第二个参数添加了一个超时时间,则该事件就会进行超时处理了。
定时器事件实例
gwwu@hz-dev2.aerohive.com:~/test/libevent/my_libevent_test>more libevent_test_timeout.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <event.h>
#include <time.h>
void do_timeout(evutil_socket_t fd, short event, void *arg)
{
printf("do timeout (time: %ld)!\n", time(NULL));
}
void create_timeout_event(struct event_base *base)
{
struct event *ev;
struct timeval timeout;
//ev = evtimer_new(base, do_timeout, NULL);
ev = event_new(base, -1, EV_PERSIST, do_timeout, NULL);
if (ev) {
timeout.tv_sec = 5;
timeout.tv_usec = 0;
event_add(ev, &timeout);
}
}
int main(int argc, char *argv[])
{
struct event_base *base;
base = event_base_new();
if (!base) {
printf("Error: Create an event base error!\n");
return -1;
}
create_timeout_event(base);
event_base_dispatch(base);
return 0;
}
event_add 对定时器事件的处理
event_add ——>event_add_internal
/*如果 tv_is_absolute不为0, 则tv表示绝对时间, 而不是间隔差值*/
static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;
......
......
/*
* prepare for timeout insertion further below, if we get a
* failure on any step, we should not change any state.
*/
/*如果新添加的事件处理器是定时器,且它尚未被添加到通用定时器队列或时间堆中,则为该定时器
* 在时间堆上预留一个位置,如果当前时间堆数组大小够了,min_heap_reserve直接返回,如果不够,resize数组大小,
* 保证可以插入新的堆节点
* ev->ev_flags 为EVLIST_TIMEOUT, 在本函数中通过event_queue_insert(base, ev, EVLIST_TIMEOUT);
* 如果eve->ev_flags 为EVLIST_TIMEOUT 说明该事件已经在time堆中了*/
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve(&base->timeheap,
1 + min_heap_size(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}
......
......
/*处理没有激活的READ WRITE SIGNAL 事件*/
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
.......
}
/*
* we should change the timeout state only if the previous event
* addition succeeded.
*/
/*将事件处理器添加到通用定时器队列或者事件堆中。
* res != -1 表示对I/O事件和信号事件的添加成功,没有出错
* tv != NULl, 表示设置了超时事件*/
if (res != -1 && tv != NULL) {
struct timeval now;
int common_timeout;
/*
* for persistent timeout events, we remember the
* timeout value and re-add the event.
*
* If tv_is_absolute, this was already set.
*/
/*对于persist的时间事件,如果是相对时间参数,用ev_io_timeout记录这个相对值,
* 因为每一次的起始时间是不一样的,需要在不同的起始时间加上相对时间值*/
if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
/*
* we already reserved memory above for the case where we
* are not replacing an existing timeout.
*/
/*如果该事件处理器已经被插入通用定时器队列或时间堆中,则先删除它*/
if (ev->ev_flags & EVLIST_TIMEOUT) {
/* XXX I believe this is needless. */
if (min_heap_elt_is_top(ev))
notify = 1;
event_queue_remove(base, ev, EVLIST_TIMEOUT);
}
/* Check if it is active due to a timeout. Rescheduling
* this timeout before the callback can be executed
* removes it from the active list. */
/*如果待处理的事件已经被激活,且原因是超时, 则从活动事件队列中删除它,
* 以避免其回调函数被执行。
* 对于信号事件处理器,必要时还需将其ncalls成为设置为0,使得干净地终止信号
* 事件的处理*/
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {
if (ev->ev_events & EV_SIGNAL) {
/* See if we are just active executing
* this event in a loop
*/
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
}
event_queue_remove(base, ev, EVLIST_ACTIVE);
}
/*获取当前事件now*/
gettime(base, &now);
common_timeout = is_common_timeout(tv, base);
/*计算绝对事件,当前时间加上时间间隔*/
if (tv_is_absolute) {
ev->ev_timeout = *tv;
} else if (common_timeout) {
struct timeval tmp = *tv;
tmp.tv_usec &= MICROSECONDS_MASK;
evutil_timeradd(&now, &tmp, &ev->ev_timeout);
ev->ev_timeout.tv_usec |=
(tv->tv_usec & ~MICROSECONDS_MASK);
} else {
evutil_timeradd(&now, tv, &ev->ev_timeout);
}
event_debug((
"event_add: timeout in %d seconds, call %p",
(int)tv->tv_sec, ev->ev_callback));
/*将定时器事件添加到通用事件队列或者最小堆中*/
event_queue_insert(base, ev, EVLIST_TIMEOUT);
if (common_timeout) {
/*如果是通用定时器,并且是尾队列头节点时,将ctl结构中的timeout_event事件放入最小堆中*/
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
if (ev == TAILQ_FIRST(&ctl->events)) {
common_timeout_schedule(ctl, &now, ev);
}
} else {
/* See if the earliest timeout is now earlier than it
* was before: if so, we will need to tell the main
* thread to wake up earlier than it would
* otherwise. */
if (min_heap_elt_is_top(ev))
notify = 1;
}
}
/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
_event_debug_note_add(ev);
return (res);
}
最小堆 min_heap_reserve/min_heap_size
最小堆实现是用一个数组实现,类似c++ 里面的vector
一般添加的定时器事件都是最小堆的形式存储。
struct event_base {
.....
truct min_heap timeheap;
.....
};
typedef struct min_heap
{
struct event** p; -------------struct event*结构的数组
unsigned n, a; -------------a:当前分配的总个数;n---当前已经使用的数组个数
} min_heap_t;
unsigned min_heap_size(min_heap_t* s) { return s->n; } -----返回当前已使用的堆个数,即数组个数
int min_heap_reserve(min_heap_t* s, unsigned n)
{
/* 如果要插入的个数大于堆的总大小, 重新分配堆数组的个数
* 如果要插入的个数小于堆的总大小,表示堆数组有空间可以新的节点*/
if (s->a < n)
{
struct event** p;
unsigned a = s->a ? s->a * 2 : 8; /*第一次分配为8, 后续每次重分配的大小为上一次的2倍*/
/*如果2倍的新空间大小还比n小,设置数据大小为n*/
if (a < n)
a = n;
if (!(p = (struct event**)mm_realloc(s->p, a * sizeof *p))) -----分配大小为a的struct event *数组内存
return -1;
s->p = p;
s->a = a;
}
return 0;
}
通用时间
由于tv_usec是32比特长度, 而表示微秒数只需要20位,(因为微秒数不能超过999999, 2的20次~= 1048576), 所以用32bits的最后20bits表示微秒数,用最前面的4bits表示magic号,中间8bits表示event_base结构中的通用定时器common_timeout_queues的数组索引,所以数组元素最多256个
判断是否为通用时间的方法是:
最高8bit的值为5.
通用时间一般是人为手动添加的。
#define COMMON_TIMEOUT_MAGIC 0x50000000
#define COMMON_TIMEOUT_MASK 0xf0000000
/** Return true iff if 'tv' is a common timeout in 'base' */
static inline int
is_common_timeout(const struct timeval *tv, const struct event_base *base)
{
int idx;
if ((tv->tv_usec & COMMON_TIMEOUT_MASK) != COMMON_TIMEOUT_MAGIC)
return 0;
idx = COMMON_TIMEOUT_IDX(tv);
return idx < base->n_common_timeouts;
}
event_base_dispatch/event_base_loop
int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
/* Grab the lock. We will release it inside evsel.dispatch, and again
* as we invoke user callbacks. */
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
/*一个event_base只允许运行一个事件循环*/
if (base->running_loop) {
event_warnx("%s: reentrant invocation. Only one event_base_loop"
" can run on each event_base at once.", __func__);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return -1;
}
base->running_loop = 1; /*标记该event_base已经开始运行*/
clear_time_cache(base); /*清除event_base的系统时间缓存*/
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base(base);
done = 0;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif
base->event_gotterm = base->event_break = 0;
while (!done) {
base->event_continue = 0;
/*查看是否需要跳出循环, 程序可以调用event_loopexit_cb() 设置event_gotterm标记
* 调用event_base_loopbreak()设置event_break 标记*/
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}
/*校准系统时间*/
timeout_correct(base, &tv);
tv_p = &tv;
/*base里面目前激活的事件数目为0,并且为阻塞性的I/O复用, 则取时间堆上面的最小堆节点的超时时间作为epoll的超时时间*/
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p); /*获取时间堆上堆顶元素的超时值, 即I/O复用系统调用本次应该设置的超时值*/
} else {
/*
* if we have active events, we just poll new events
* without waiting.
*/
/*如果有就绪事件尚未处理, 则将I/O复用系统调用的超时时间"置0"。
* 这样I/O复用系统调用直接返回, 程序也就可以立即处理就绪事件了*/
evutil_timerclear(&tv);
}
/*如果event_base 中没有注册任何事件, 则直接退出事件循环*/
/* If we have no events, we just exit */
if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}
/* update last old time */
gettime(base, &base->event_tv); /*更新系统事件*/
/*之所以要在进入dispatch之前清零,是因为进入
*dispatch后,可能会等待一段时间。cache就没有意义了。
*如果第二个线程此时想add一个event到这个event_base里面,在
*event_add_internal函数中会调用gettime。如果cache不清零,
*那么将会取这个cache时间。这将取一个不准确的时间.*/
clear_time_cache(base); /*清除event_base的系统时间缓存*/
/*调用事件多路分发器的dispatch方法等待事件, 将就绪事件插入活动事件队列*/
res = evsel->dispatch(base, tv_p);
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
/*将时间缓存更新为当前系统事件*/
update_time_cache(base);
/* 检查时间堆上的到期事件并依次执行之 */
timeout_process(base);
if (N_ACTIVE_CALLBACKS(base)) {
/*调用event_process_active 函数依次处理就绪的信号事件和I/O事件*/
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));
done:
/*事件循环结束, 清空事件缓存, 并设置停止循环标志*/
clear_time_cache(base);
base->running_loop = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (retval);
}
timeout_next——根据Timer事件计算evsel->dispatch的最大等待时间
static int
timeout_next(struct event_base *base, struct timeval **tv_p)
{
/* Caller must hold th_base_lock */
struct timeval now;
struct event *ev;
struct timeval *tv = *tv_p;
int res = 0;
/*取出最小堆中的最小节点*/
ev = min_heap_top(&base->timeheap);
/*节点为空,直接返回 tv_p为NULL, epoll永远阻塞*/
if (ev == NULL) {
/* if no time-based events are active wait for I/O */
*tv_p = NULL;
goto out;
}
/*获取当前时间*/
if (gettime(base, &now) == -1) {
res = -1;
goto out;
}
/*如果定时器的时间值小于当前时间,表明该定时器值已经过期了,不能使用, epoll永远阻塞*/
if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {
evutil_timerclear(tv);
goto out;
}
/*计算时间的差值, 该差值作为epoll的超时时间*/
evutil_timersub(&ev->ev_timeout, &now, tv);
EVUTIL_ASSERT(tv->tv_sec >= 0);
EVUTIL_ASSERT(tv->tv_usec >= 0);
event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
out:
return (res);
}
timeout_process—处理超时事件,将超时事件插入到激活链表中
把最小堆中的最小节点的时间作为epoll的超时时间,如果超时了或者有事件发生,都循环判断一下最小堆中的事件是否超时了,如果是,则处理timeout事件
/* Activate every event whose timeout has elapsed. */
static void
timeout_process(struct event_base *base)
{
/* Caller must hold lock. */
struct timeval now;
struct event *ev;
/*如果时间堆为空,则退出*/
if (min_heap_empty(&base->timeheap)) {
return;
}
/*获取当前时间*/
gettime(base, &now);
/*循环最小堆中的元素, 如果时间已经达到,则将event添加到active队列中, 并置标记EV_TIMEOUT*/
while ((ev = min_heap_top(&base->timeheap))) {
if (evutil_timercmp(&ev->ev_timeout, &now, >))
break;
/* delete this event from the I/O queues */
event_del_internal(ev);
event_debug(("timeout_process: call %p",
ev->ev_callback));
event_active_nolock(ev, EV_TIMEOUT, 1);
}
}