Libevent源码分析(一)--- 基本数据结构

时间:2022-08-25 10:22:13

event结构体

libevent是一个基于事件触发的网络库,它的特色之一是把I/O事件,定时器事件和信号量事件统一处理,对上层用户基本是透明的。在libevent内部这三种事件都用event结构体来表示,event是libevent中最基本的数据结构:

struct event {
TAILQ_ENTRY(event) ev_active_next;
TAILQ_ENTRY(event) ev_next;
/* for managing timeouts */
union {
TAILQ_ENTRY(event) ev_next_with_common_timeout;
int min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;

struct event_base *ev_base;

union {
/* used for io events */
struct {
TAILQ_ENTRY(event) ev_io_next;
struct timeval ev_timeout;
} ev_io;

/* used by signal events */
struct {
TAILQ_ENTRY(event) ev_signal_next;
short ev_ncalls;
/* Allows deletes in callback */
short *ev_pncalls;
} ev_signal;
} _ev;

short ev_events;
short ev_res; /* result passed to event callback */
short ev_flags;
ev_uint8_t ev_pri; /* smaller numbers are higher priority */
ev_uint8_t ev_closure;
struct timeval ev_timeout;

/* allows us to adopt for different types of events */
void (*ev_callback)(evutil_socket_t, short, void *arg);
void *ev_arg;
};

libevent中有三种比较常用的数据结构,分别是链表,小根堆和哈希表。event中的前几个变量都是用于在这些结构体中存储event结构的。ev_active_next用于存储事件激活之后在激活链表中的位置,ev_next用于event_base存储所有的event事件。ev_timeout_pos主要用于时间事件,libevent中时间的存储有两种方式,分别是小根堆和链表,具体实现后续会详细分析。ev_timeout_pos是一个union类型,ev_next_with_common_timeout和min_heap_idx分别存储链表和小根堆中对应的位置。_ev主要用于信号量事件和io事件,由于这两个事件互斥,所以同样用一个union类型表示。ev_fd存储event对应的套接字,ev_base指定所属的event_base,ev_events表示改事件的类型,主要有以下几种:

/** Indicates that a timeout has occurred. It's not necessary to pass
* this flag to event_for new()/event_assign() to get a timeout. */

#define EV_TIMEOUT 0x01
/** Wait for a socket or FD to become readable */
#define EV_READ 0x02
/** Wait for a socket or FD to become writeable */
#define EV_WRITE 0x04
/** Wait for a POSIX signal to be raised*/
#define EV_SIGNAL 0x08
/**
* Persistent event: won't get removed automatically when activated.
*
* When a persistent event with a timeout becomes activated, its timeout
* is reset to 0.
*/

#define EV_PERSIST 0x10
/** Select edge-triggered behavior, if supported by the backend. */
#define EV_ET 0x20

ev_res代表一个事件的激活的原因,可以设置为上述事件中的前四种。
ev_flags用来标识event当前的不同状态,可以设置为以下几个值之一或者组合:

#define EVLIST_TIMEOUT 0x01
#define EVLIST_INSERTED 0x02
#define EVLIST_SIGNAL 0x04
#define EVLIST_ACTIVE 0x08
#define EVLIST_INTERNAL 0x10
#define EVLIST_INIT 0x80

ev_pri代表该事件的优先级,ev_closure标志一个事件结束时的处理方式,信号量事件,普通事件和EV_PERSIST 事件的处理方式不同:

/* Possible values for ev_closure in struct event. */
#define EV_CLOSURE_NONE 0
#define EV_CLOSURE_SIGNAL 1
#define EV_CLOSURE_PERSIST 2

ev_timeout存储时间的绝对值,这个时间是一个时间戳,如果该事件是EV_PERSIST 一个事件,ev_io中的ev_timeout则用于存储该事件的相对于,比如该事件的timeout值设置为5秒并且是EV_PERSIST 事件,则ev_timeout存储当前值加上五秒之后的值,ev_io中的ev_timeout存储五秒这个值。所以EV_SIGNAL 和EV_PERSIST不能同时使用。最后两个值用于事件触发时的回调。

event_base 结构体

libevent使用event_base管理event,应用程序一般会创建一个event_base或者为每一个线程创建一个event_base,一个event_base可以管理一系列的event事件,下面是event_base结构体的定义:

struct event_base {
/** Function pointers and other data to describe this event_base's
* backend. */

const struct eventop *evsel;
/** Pointer to backend-specific data. */
void *evbase;

/** List of changes to tell backend about at next dispatch. Only used
* by the O(1) backends. */

struct event_changelist changelist;

/** Function pointers used to describe the backend that this event_base
* uses for signals */

const struct eventop *evsigsel;
/** Data to implement the common signal handelr code. */
struct evsig_info sig;

/** Number of virtual events */
int virtual_event_count;
/** Number of total events added to this event_base */
int event_count;
/** Number of total events active in this event_base */
int event_count_active;

/** Set if we should terminate the loop once we're done processing
* events. */

int event_gotterm;
/** Set if we should terminate the loop immediately */
int event_break;
/** Set if we should start a new instance of the loop immediately. */
int event_continue;

/** The currently running priority of events */
int event_running_priority;

/** Set if we're running the event_base_loop function, to prevent
* reentrant invocation. */

int running_loop;

/* Active event management. */
/** An array of nactivequeues queues for active events (ones that
* have triggered, and whose callbacks need to be called). Low
* priority numbers are more important, and stall higher ones.
*/

struct event_list *activequeues;
/** The length of the activequeues array */
int nactivequeues;

/* common timeout logic */

/** An array of common_timeout_list* for all of the common timeout
* values we know. */

struct common_timeout_list **common_timeout_queues;
/** The number of entries used in common_timeout_queues */
int n_common_timeouts;
/** The total size of common_timeout_queues. */
int n_common_timeouts_allocated;

/** List of defered_cb that are active. We run these after the active
* events. */

struct deferred_cb_queue defer_queue;

/** Mapping from file descriptors to enabled (added) events */
struct event_io_map io;

/** Mapping from signal numbers to enabled (added) events. */
struct event_signal_map sigmap;

/** All events that have been enabled (added) in this event_base */
struct event_list eventqueue;

/** Stored timeval; used to detect when time is running backwards. */
struct timeval event_tv;

/** Priority queue of events with timeouts. */
struct min_heap timeheap;

/** Stored timeval: used to avoid calling gettimeofday/clock_gettime
* too often. */

struct timeval tv_cache;

#if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
/** Difference between internal time (maybe from clock_gettime) and
* gettimeofday. */

struct timeval tv_clock_diff;
/** Second in which we last updated tv_clock_diff, in monotonic time. */
time_t last_updated_clock_diff;
#endif

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
/* threading support */
/** The thread currently running the event_loop for this base */
unsigned long th_owner_id;
/** A lock to prevent conflicting accesses to this event_base */
void *th_base_lock;
/** The event whose callback is executing right now */
struct event *current_event;
/** A condition that gets signalled when we're done processing an
* event with waiters on it. */

void *current_event_cond;
/** Number of threads blocking on current_event_cond. */
int current_event_waiters;
#endif

#ifdef WIN32
/** IOCP support structure, if IOCP is enabled. */
struct event_iocp_port *iocp;
#endif

/** Flags that this base was configured with */
enum event_base_config_flag flags;

/* Notify main thread to wake up break, etc. */
/** True if the base already has a pending notify, and we don't need
* to add any more. */

int is_notify_pending;
/** A socketpair used by some th_notify functions to wake up the main
* thread. */

evutil_socket_t th_notify_fd[2];
/** An event used by some th_notify functions to wake up the main
* thread. */

struct event th_notify;
/** A function used to wake up the main thread from another thread. */
int (*th_notify_fn)(struct event_base *base);
};

前两个变量主要用于后端的reactor,libevent支持多种IO多路复用技术,比如poll,epoll,select,kequeu等。libevent同样支持iocp,但是是单独实现的,后面章节会详细分析。changelist用于记录reactor两次循环之间所有状态变更的fd,比如有时程序对一个event执行了添加和删除操作,但是这两个操作间隔分长短,是在一个循环内进行的,那么下次循环就可以忽略这个事件。这样可以提高效率,但是不是所有的IO多路复用技术都支持changelist,目前epoll和kqueue支持这种方式。接下来的两个变量是用于处理信号量事件的。然后是一些程序运行的状态值。common_timeout_queues,n_common_timeouts和n_common_timeouts_allocated用于相同时间间隔的时间事件,defer_queue是一个延迟处理的队列。io和sigmap是两个存储io事件和sigmap事件的结构体。signalmap是一个数组的结构体,event_io_map在一些平台上和signalmap实现方式相同,在另一些平台上则使用哈希表实现,具体实现方式之后会详细分析。eventqueue用于记录所有的event事件。event_tv用于校验系统时间,timeheap是一个时间的小根堆,tv_cache是时间缓存。接下来是两个时间变量,关于libevent的时间之后也会详细分析。然后是一组用于线程间同步的变量。如果系统需要使用iocp则需要使用接下来的iocp变量。flags是初始化event_base时需要用到的变量,可以取以下的值:

enum event_base_config_flag {
/** Do not allocate a lock for the event base, even if we have
locking set up. */
EVENT_BASE_FLAG_NOLOCK = 0x01,
/** Do not check the EVENT_* environment variables when configuring
an event_base */
EVENT_BASE_FLAG_IGNORE_ENV = 0x02,
/** Windows only: enable the IOCP dispatcher at startup

If this flag is set then bufferevent_socket_new() and
evconn_listener_new() will use IOCP-backed implementations
instead of the usual select-based one on Windows.
*/
EVENT_BASE_FLAG_STARTUP_IOCP = 0x04,
/** Instead of checking the current time every time the event loop is
ready to run timeout callbacks, check after each timeout callback.
*/
EVENT_BASE_FLAG_NO_CACHE_TIME = 0x08,

/** If we are using the epoll backend, this flag says that it is
safe to use Libevent's internal change-list code to batch up
adds and deletes in order to try to do as few syscalls as
possible. Setting this flag can make your code run faster, but
it may trigger a Linux bug: it is not safe to use this flag
if you have any fds cloned by dup() or its variants. Doing so
will produce strange and hard-to-diagnose bugs.

This flag can also be activated by settnig the
EVENT_EPOLL_USE_CHANGELIST environment variable.

This flag has no effect if you wind up using a backend other than
epoll.
*/
EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST = 0x10
};

最后几个变量用于多线程环境中唤醒event_base所在的线程,实现方式是使用socketpair,这和之前分析的zeromq的实现方式非常类似。

event_signal_map

libevent使用event_signal_map结构来管理signal类型的event,由于在各个系统上signal都是整数类型,所以libevent用一个动态数组存储所有的信号量类型的事件,并且直接用signal对应的整形作为索引:

struct event_signal_map {
/* An array of evmap_io * or of evmap_signal *; empty entries are
* set to NULL. */
void **entries;
/* The number of entries available in entries */
int nentries;
};

entries的每个节点都是个一个evmap_signal类型,每个evmap_signal都包含一个event_list链表,这样所有监视相同signal的事件都会被存储在一个链表中:
struct evmap_signal {
struct event_list events;
};
在linux等一下平台上,套接字同样是整数类型,所以在这些平台上io事件的管理也是使用动态数组管理event_list链表,但是在windows平台上,套接字类型是指针类型,所以实现方式是哈希表。下面分别分析这两种数据结构:

event_list

event_list采用双向链表方式实现,双向链表是libevent中使用最频繁的数据结构。下面是双向链表的两个定义:

#define TAILQ_HEAD(name, type) \
struct name { \
struct type *tqh_first; /* first element */ \
struct type **tqh_last; /* addr of last next element */ \
}

#define TAILQ_ENTRY(type) \
struct { \
struct type *tqe_next; /* next element */ \
struct type **tqe_prev; /* address of previous next element */ \
}

其中TAILQ_ENTRY是一个匿名结构体,通常作为另一个结构体的成员使用,比如event结构体中前两个变量都是这种类型,链表的结构如下图:
Libevent源码分析(一)--- 基本数据结构
需要注意的是tqe_prev是指针的指针,指向的是前一个节点的tqe_next指针。。一些系统提供了queue头文件支持对双向链表的操作,另外为了兼容,libevent在libevent-2.0.22-stable\compat\sys中包含了一个queue文件用于不提供queue头文件的系统。除了queue文件中对双向链表的操作外,libevent还定义了以下操作:

/* Set the variable 'x' to the field in event_map 'map' with fields of type
'struct type *' corresponding to the fd or signal 'slot'. Set 'x' to NULL
if there are no entries for 'slot'. Does no bounds-checking. */
#define GET_SIGNAL_SLOT(x, map, slot, type) \
(x) = (struct type *)((map)->entries[slot])
/* As GET_SLOT, but construct the entry for 'slot' if it is not present,
by allocating enough memory for a 'struct type', and initializing the new
value by calling the function 'ctor' on it. Makes the function
return -1 on allocation failure.
*/
#define GET_SIGNAL_SLOT_AND_CTOR(x, map, slot, type, ctor, fdinfo_len) \
do { \
if ((map)->entries[slot] == NULL) { \
(map)->
entries[slot] = \
mm_calloc(1,sizeof(struct type)+fdinfo_len); \
if (EVUTIL_UNLIKELY((map)->entries[slot] == NULL)) \
return (-1); \
(ctor)((struct type *)(map)->entries[slot]); \
} \
(x) = (struct type *)((map)->entries[slot]); \
} while (0)
/** Expand 'map' with new entries of width 'msize' until it is big enough
to store a value in 'slot'.
*/

static int evmap_make_space(struct event_signal_map *map, int slot, int msize)
{
if (map->nentries <= slot) {
int nentries = map->
nentries ? map->nentries : 32;
void **tmp;

while (nentries <= slot)
nentries <<= 1;

tmp = (void **)mm_realloc(map->entries, nentries * msize);
if (tmp == NULL)
return (-1);

memset(&tmp[map->nentries], 0,
(nentries - map->nentries) * msize);

map->nentries = nentries;
map->entries = tmp;
}

return (0);
}

前两个宏用于查找一个fd对应的event双向链表,如果是空需要创建一个evmap_signal,双向链表每次扩容都是扩展为之前的二倍。
下面是event_signal_map的插入操作:

int evmap_signal_add(struct event_base *base, int sig, struct event *ev)
{
const struct eventop *evsel = base->evsigsel;
struct event_signal_map *map = &base->sigmap;
struct evmap_signal *ctx = NULL;

if (sig >= map->nentries) {
if (evmap_make_space(
map, sig, sizeof(struct evmap_signal *)) == -1)
return (-1);
}
GET_SIGNAL_SLOT_AND_CTOR(ctx, map, sig, evmap_signal, evmap_signal_init,
base->evsigsel->fdinfo_len);

if (TAILQ_EMPTY(&ctx->events)) {
if (evsel->add(base, ev->ev_fd, 0, EV_SIGNAL, NULL)
== -1)
return (-1);
}

TAILQ_INSERT_TAIL(&ctx->events, ev, ev_signal_next);

return (1);
}

evmap_signal_init是evmap_signal的初始化方法,比较简单,只是初始化一个eventlist双向链表。如果之前sig对应的双向链表是空的话需要使用base->evsigsel添加一下EV_SIGNAL事件。sig和ev->ev_fd是一个值,虽然在win32中ev_fd是一个intptr_t,但是intptr_t的定义如下,它可以和int类型相互转换。

#ifdef _WIN64
typedef __int64 intptr_t;
#else
typedef _W64 int intptr_t;
#endif

event_io_map

在使用整形标识fd的操作系统上,event_io_map的实现和event_signal_map相同,io的相关操作都转换成了信号量的相关操作:

#define GET_IO_SLOT(x,map,slot,type) GET_SIGNAL_SLOT(x,map,slot,type)
#define GET_IO_SLOT_AND_CTOR(x,map,slot,type,ctor,fdinfo_len) \
GET_SIGNAL_SLOT_AND_CTOR(x,map,slot,type,ctor,fdinfo_len)

但是在windows操作系统上,由于套接字不是从零开始的整形,所以libevent使用hash来实现。具体实现方式在ht-internal.h文件中。hash实现方式比链表方式复杂一些,需要多一次转换,因为可能有多个fd的哈希值对应同一个槽。

#define HT_HEAD(name, type) \
struct name { \
/* The hash table itself. */ \
struct type **hth_table; \
/* How long is the hash table? */ \
unsigned hth_table_length; \
/* How many elements does the table contain? */ \
unsigned hth_n_entries; \
/* How many elements will we allow in the table before resizing it? */ \
unsigned hth_load_limit; \
/* Position of hth_table_length in the primes table. */ \
int hth_prime_idx; \
}

// 初始化event_io_map结构体
HT_HEAD(event_io_map, event_map_entry);

struct event_map_entry {
HT_ENTRY(event_map_entry) map_node;
evutil_socket_t fd;
union { /* This is a union in case we need to make more things that can
be in the hashtable. */
struct evmap_io evmap_io;
} ent;
};

#define HT_ENTRY(type) \
struct { \
struct type *hte_next; \
}

struct evmap_io {
struct event_list events;
ev_uint16_t nread;
ev_uint16_t nwrite;
};

event_io_map在使用hash实现的方式中包含一个hth_table的数组指针,每一个指针都指向一个event_map_entry结构体,event_map_entry又包含hte_next指向下一个event_map_entry。这样具有相同的哈希值的fd组成了一个event_map_entry单向链表。hash表的实现在libevent中都是采用宏定义实现的,看起来可能比较麻烦,但是如果万变不离其总,相比信号量的数组方式只是多了一次查找,只要懂得上述代码的关系,理解起来也就比较容易了。

#define GET_IO_SLOT(x, map, slot, type)                 \
do { \
struct event_map_entry _key, *_ent; \
_key.fd = slot; \
_ent = HT_FIND(event_io_map, map, &_key); \
(x) = _ent ? &_ent->ent.type : NULL; \
} while (0);

#define GET_IO_SLOT_AND_CTOR(x, map, slot, type, ctor, fdinfo_len) \
do { \
struct event_map_entry _key, *_ent; \
_key.fd = slot; \
_HT_FIND_OR_INSERT(event_io_map, map_node, hashsocket, map, \
event_map_entry, &_key, ptr, \
{ \
_ent = *ptr; \
}, \
{ \
_ent = mm_calloc(1,sizeof(struct event_map_entry)+fdinfo_len); \
if (EVUTIL_UNLIKELY(_ent == NULL)) \
return (-1); \
_ent->fd = slot; \
(ctor)(&_ent->ent.type); \
_HT_FOI_INSERT(map_node, map, &_key, _ent, ptr) \
}); \
(x) = &_ent->ent.type; \
} while (0)

同使用数组的方式对应,使用哈希表同样有两个类似的宏,实现的功能都相同,只是内部实现略有区别。
这一节主要分析了libevent中最基本的两个结构体以及几种常用的数据结构的实现方式,libevent中用到的数据结构还有很多,比如小根堆,下一节分析libevent中的时间管理时会详细分析。