设置完需要监听的事件之后,就开始event loop了。在Libev中,该工作由ev_run函数完成。它的大致流程如下:
int
ev_run (EV_P_ int flags)
{
do
{
/* 执行EV_FORK类型事件 */ /* 执行EV_PREPARE类型事件 */ /* 遍历fdchanges数组,使用户关注的事件和epoll关注的事件保持一致 */
fd_reify (EV_A); /* 计算epoll阻塞时间 */ /* 事件驱动机制阻塞等待事件发生 */
backend_poll (EV_A_ waittime); /* 调整管理时间的堆 */
timers_reify (EV_A); /* relative timers called last */ /* 将EV_CHECK事件放入pendings数组中 */
if (expect_false (checkcnt))
queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK); /* 调用pendings数组中watcher的回调函数 */
EV_INVOKE_PENDING;
} return activecnt;
}
上一篇文章中说到fd监听的内容被修改后需要放入fdchanges数组中,将来的某个时刻再在事件驱动机制中更新,这个时刻就是调用fd_reify函数的时候。fd_reify函数代码如下:
inline_size void
fd_reify (EV_P)
{
int i; /* 1.遍历fdchanges数组中所有的fd,找到对应的ANFD
* 2.取出一个ANFD中所有watcher关注的事件
* 3.epoll_ctl更新
* 4.fdchanges数组清空
*/
for (i = ; i < fdchangecnt; ++i)
{
int fd = fdchanges [i];
ANFD *anfd = anfds + fd;
ev_io *w; unsigned char o_events = anfd->events;
unsigned char o_reify = anfd->reify; anfd->reify = ; /*if (expect_true (o_reify & EV_ANFD_REIFY)) probably a deoptimisation */
{
anfd->events = ; for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)
anfd->events |= (unsigned char)w->events; if (o_events != anfd->events)
o_reify = EV__IOFDSET; /* actually |= */
} if (o_reify & EV__IOFDSET)
backend_modify (EV_A_ fd, o_events, anfd->events); /* epoll_modify */
} fdchangecnt = ; /* fdchanges数组清空 */
}
前面说过,一个fd对应一个ANFD,一个ANFD可能包含多个watcher,上面函数就是遍历fdchanges数组,将数组中每个fd对应的所有watcher所关注的事件全部添加到事件驱动中,backend_modify宏内部最终会调用事件驱动机制对应的修改函数,例如epoll的epoll_modify函数。
回到ev_run函数,接下来调用backend_poll函数,这个函数最终会调用事件驱动机制,阻塞等待事件的发生。当监控的事件发生时,Libev提取这些事件,将它们放入一个pendings数组中,这个数组存储所有发生了但未处理的事件,放入过程如下:
/* 待运行的事件放入pendings数组中 */
void noinline
ev_feed_event (EV_P_ void *w, int revents) EV_THROW
{
W w_ = (W)w;
int pri = ABSPRI (w_); if (expect_false (w_->pending))
pendings [pri][w_->pending - ].events |= revents;
else
{
w_->pending = ++pendingcnt [pri];
array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);
pendings [pri][w_->pending - ].w = w_;
pendings [pri][w_->pending - ].events = revents;
} pendingpri = NUMPRI - ;
}
pendings是个二维数组,第一维是watcher的优先级,第二维是同等优先级的watcher的不同下标。pendings中的元素类型为ANPENDING,定义如下:
typedef struct
{
W w; /* 关联的watcher */
int events; /* 发生的事件集合 */
} ANPENDING;
回到ev_run函数。这时,事件驱动机制检测到有事件发生后返回,并将事件对应的watcher保存在了pendings数组中。接下来就要执行事件对应的回调函数,也就是对“消费”该事件,EV_INVOKE_PENDING宏负责该动作。EV_INVOKE_PENDING宏最终会调用ev_invoke_pending函数,该函数定义如下:
/* 遍历pendings二维数组,执行事件触发回调函数 */
void noinline
ev_invoke_pending (EV_P)
{
pendingpri = NUMPRI; while (pendingpri) /* pendingpri possibly gets modified in the inner loop */
{
--pendingpri; while (pendingcnt [pendingpri])
{
ANPENDING *p = pendings [pendingpri] + --pendingcnt [pendingpri]; p->w->pending = ;
EV_CB_INVOKE (p->w, p->events);
EV_FREQUENT_CHECK;
}
}
}
外层while循环是按优先级递减,内层while循环同一优先级的ANPENDING按数组顺序进行遍历,遍历到的每一个ANPENDING都对应一个watcher,得到watcher之后就执行该watcher对应的回调函数。
至此,event loop完成一轮循环。