libevent-1.1a源代码分析

时间:2022-02-22 16:54:04

原文:http://blog.163.com/ecy_fu/blog/static/444512620094291011129/       

       如果想深入理解memcached源代码,那么理解libevent的机制是非常重要的,多线程版的memcached中每个线程都要同1个 event_base,其对libevent的使用复杂度超过了单线程的libevent,所以分析libevent-1.1a也为分析多线程版的 memcached做好铺垫.这里给出1个比较好的学习方法吧,可以在memcached-1.21 for windows版的解决方案中添加libevent-1.1a这个项目,然后在memcached的linker选项中将依赖的路径指向libevent 项目生成的静态库,这样就可以将这两个项目结合起来调试了.

       由于memcahced只使用了libevent的定时器和普通套接字引发的事件机制,所以这里就不分析信号的相关代码了.事件处理相关的代码主要分布在event.c文件中,核心的函数并不多.

       首先看几个重要的数据结构吧.

event_base是1个全局的结构体,在event_init函数中会初始化1个event_base结构体,并使用current_base 指向该变量.从event_base的成员变量可以看出,它管理着libevent的所有相关的数据结果并注册有所有的操作集合.

struct event_base {
 const struct eventop *evsel; //指向事件处理函数集合
 void *evbase;
 int event_count;  /* counts number of total events */ //事件的总数,包括定时器,普通事件和信号
 int event_count_active; /* counts number of active events */ //被激活事件的总数

 int event_gotterm;  /* set to terminate loop */

 /* active event management */
 struct event_list **activequeues; //激活事件链表,注意它是1个指向指针的指针.因为事件按照优先级被分成了很

                                                    //多队列,默认情况下libevent只使用1个优先级,所以也只有1个激活队列.
 int nactivequeues; //激活事件的总数

 struct event_list eventqueue; //事件队列
 struct timeval event_tv; //保存libevent被初始化时刻的时间

 rb_head(event_tree, event) timetree; //所有的定时器都用红黑树来保存
};

每个事件对应的结构体如下:

struct event {
 tailq_entry (event) ev_next;
 tailq_entry (event) ev_active_next;
 tailq_entry (event) ev_signal_next;
 rb_entry (event) ev_timeout_node; //上面这4个结构体用于将本事件加入event_base管理的数据结构中

 struct event_base *ev_base; //指向全局的event_base变量,其等于current_base
 int ev_fd; //本事件对应的描述符号,定时器对应的描述符为-1
 short ev_events; //对应ev_timeout,ev_read,ev_write,...,ev_persist用于指示事件类型
 short ev_ncalls;
 short *ev_pncalls; /* allows deletes in callback */

 struct timeval ev_timeout; //超时时间

 int ev_pri;  /* smaller numbers are higher priority */ //事件的优先级

 void (*ev_callback)(int, short, void *arg); //事件被激活后对应的回调函数
 void *ev_arg; //回调函数使用的附加数据

 int ev_res;  /* result passed to event callback */
 int ev_flags; //对应于evlist_timeout,evlist_inserted,...,evlist_init用于指示链表操作
};

接下来看看event_base结构体类型中evsel指向何方神圣吧.

 #ifdef have_select
extern const struct eventop selectops;
#endif
#ifdef have_poll
extern const struct eventop pollops;
#endif
#ifdef have_rtsig
extern const struct eventop rtsigops;
#endif
#ifdef have_epoll
extern const struct eventop epollops;
#endif
#ifdef have_working_kqueue
extern const struct eventop kqops;
#endif
#ifdef have_devpoll
extern const struct eventop devpollops;
#endif
#ifdef win32
extern const struct eventop win32ops;

这里很显然在根据宏定义来决定使用哪个eventop变量.本来我以为1般的操作系统都会支持select和poll,而在windows系统 下,win32宏肯定也被定义了,所以这里会有3个eventop结构体变量,分别是selectops,pollops,win32ops,所以我也就 认为下面这个数组会有4项,但是实际上却不是这样的.调试的时候可以清楚地看到下面这个数组只有两项,上面起作用的也只有win32这个宏,所以这个数组 的两项分别为&win32ops和null.

const struct eventop *eventops[] = {
#ifdef have_working_kqueue
 &kqops,
#endif
#ifdef have_epoll
 &epollops,
#endif
#ifdef have_devpoll
 &devpollops,
#endif
#ifdef have_rtsig
 &rtsigops,
#endif
#ifdef have_poll
 &pollops,
#endif
#ifdef have_select
 &selectops,
#endif
#ifdef win32
 &win32ops,
#endif
 null
};

确定了libevent-1.1a在windows下使用的是win32.c的相关函数.

struct eventop {
 char *name;
 void *(*init)(void);
 int (*add)(void *, struct event *);
 int (*del)(void *, struct event *);
 int (*recalc)(struct event_base *, void *, int);
 int (*dispatch)(struct event_base *, void *, struct timeval *);
};

//可以看出win32ops被定义为eventop 结构体的1个变量,它的成员变量为win32.c文件定义的相关函数,以后对上面4个函数的调用都将是对下面这4个函数的调用.

struct eventop win32ops = {
 "win32",
 win32_init,
 win32_insert,
 win32_del,
 win32_recalc,
 win32_dispatch
};

1般使用libevent的事件机制的流程为event_init->event_set->event_add->event_dispatch.接下来就分析这几个和事件相关的函数吧.

void *
event_init(void)
{
 int i;

//申请1个event_base变量,将current_base指向它,这个current_base是1个全局指针

 if ((current_base = calloc(1, sizeof(struct event_base))) == null)
  event_err(1, "%s: calloc");

 event_sigcb = null;
 event_gotsig = 0;


 gettimeofday(&current_base->event_tv, null);
 

//初始化event_base管理的几个主要的数据结构
 rb_init(&current_base->timetree);
 tailq_init(&current_base->eventqueue);
 tailq_init(&signalqueue);
 
 current_base->evbase = null;

//这个循环在windows下只会执行1次,因为eventops只有1项.
 for (i = 0; eventops[i] && !current_base->evbase; i++) {

  printf("hi!\n");
  //将evsel指向操作集合
  current_base->evsel = eventops[i];

//调用win32_init来初始化win32op这个结构体,它里面包含的是1些文件描述符的相关信息

//event_base中的evbase指向这个被初始化好的win32op结构体变量

  current_base->evbase = current_base->evsel->init();
 }

 if (current_base->evbase == null)
  event_errx(1, "%s: no event mechanism available", __func__);

 if (getenv("event_show_method")) 
  event_msgx("libevent using: %s\n",
      current_base->evsel->name);

 /* allocate a single active event queue */

//初始化激活队列,这里只有1个优先级,可以使用event_priority_init来修改优先级的数目
 event_base_priority_init(current_base, 1);

 return (current_base);
}

如果在event_init最后加入1下几句:

    printf("===============================\n");
    printf("use %s\n", current_base->evsel->name);
    printf("===============================\n");
就可以使用本平台libevent使用何种多路监听模型,在linux下确实使用了epoll.

接下来看看win32_init的代码吧,里面用的win32op结构体声明如下:

struct win32op {
 int fd_setsz;
 struct win_fd_set *readset_in;
 struct win_fd_set *writeset_in;
 struct win_fd_set *readset_out;
 struct win_fd_set *writeset_out;
 struct win_fd_set *exset_out;
 int n_events;
 int n_events_alloc;
 struct event **events;
};

win_fd_set声明如下:

struct win_fd_set {
 u_int fd_count;
 socket fd_array[1];
};

而windows的winsock2.h中的fd_set声明如下:

#ifndef fd_setsize
#define fd_setsize      ******
#endif /* fd_setsize */

typedef struct fd_set {
        u_int fd_count;               /* how many are set? */
        socket  fd_array[fd_setsize];   /* an array of sockets */
} fd_set;
形式稍稍有点不同,反正c语言的数组可以越界访问,所以libevent声明方式具有更好的灵活性,它在后面初始化时随意指定数组大小,接下来就看看win32_init的代码吧.

#define nevent ******
void *
win32_init(void)
{
 struct win32op *winop;
 size_t size;
 
 if (!(winop = calloc(1, sizeof(struct win32op))))
  return null;
 
 winop->fd_setsz = nevent;
 

//#define fd_set_alloc_size(n) ((sizeof(struct win_fd_set) + ((n)-1)*sizeof(socket)))

//除了申请1个struct win_fd_set的空间,另外申请63个文件描述符需要的内存空间
 size = fd_set_alloc_size(nevent);
 
 if (!(winop->readset_in = malloc(size)))
  goto err;
 if (!(winop->writeset_in = malloc(size)))
  goto err;
 if (!(winop->readset_out = malloc(size)))
  goto err;
 if (!(winop->writeset_out = malloc(size)))
  goto err;
 if (!(winop->exset_out = malloc(size)))
  goto err;
 
 winop->n_events = 0;
 winop->n_events_alloc = nevent;
 
 if (!(winop->events = malloc(nevent*sizeof(struct event*))))
  goto err;
 
 winop->readset_in->fd_count = winop->writeset_in->fd_count = 0;
 winop->readset_out->fd_count = winop->writeset_out->fd_count
  = winop->exset_out->fd_count = 0;

 return (winop);
 err:
        xfree(winop->readset_in);
        xfree(winop->writeset_in);
        xfree(winop->readset_out);
        xfree(winop->writeset_out);
        xfree(winop->exset_out);
        xfree(winop->events);
        xfree(winop);
        return (null);
}

//event_set初始化1个event结构体

void
event_set(struct event *ev, int fd, short events,
   void (*callback)(int, short, void *), void *arg)
{
 /* take the current base - caller needs to set the real base later */
 ev->ev_base = current_base;

 ev->ev_callback = callback;
 ev->ev_arg = arg;
 ev->ev_fd = fd;
 ev->ev_events = events;
 ev->ev_flags = evlist_init; //事件初始化时ev_flags等于evlist_init
 ev->ev_ncalls = 0;
 ev->ev_pncalls = null;

 /* by default, we put new events into the middle priority */
 ev->ev_pri = current_base->nactivequeues/2;
}

//event_add则将事件纳入event_base的管理中

int
event_add(struct event *ev, struct timeval *tv)
{
 struct event_base *base = ev->ev_base;
 const struct eventop *evsel = base->evsel;
 void *evbase = base->evbase;

 event_debug((
   "event_add: event: %p, %s%s%scall %p",
   ev,
   ev->ev_events & ev_read ? "ev_read " : " ",
   ev->ev_events & ev_write ? "ev_write " : " ",
   tv ? "ev_timeout " : " ",
   ev->ev_callback));

 assert(!(ev->ev_flags & ~evlist_all));

 if (tv != null) {
  struct timeval now;

  if (ev->ev_flags & evlist_timeout)
   event_queue_remove(base, ev, evlist_timeout);

  /* check if it is active due to a timeout.  rescheduling
   * this timeout before the callback can be executed
   * removes it from the active list. */
  if ((ev->ev_flags & evlist_active) &&
      (ev->ev_res & ev_timeout)) {
   /* see if we are just active executing this
    * event in a loop
    */
   if (ev->ev_ncalls && ev->ev_pncalls) {
    /* abort loop */
    *ev->ev_pncalls = 0;
   }
   
   event_queue_remove(base, ev, evlist_active);
  }

  gettimeofday(&now, null);

//将事件的超时时间设置为now+tv,这是1个绝对时间
  timeradd(&now, tv, &ev->ev_timeout);

  event_debug((
    "event_add: timeout in %d seconds, call %p",
    tv->tv_sec, ev->ev_callback));

//将定时器插入到红黑树中,如果tv为null,就意味着该事件没有超时时间

  event_queue_insert(base, ev, evlist_timeout);
 }

 if ((ev->ev_events & (ev_read|ev_write)) &&
     !(ev->ev_flags & (evlist_inserted|evlist_active))) {

//将事件插入到链表中,可以看到定时器,事件和信号在调用event_queue_insert函数时,最后1个参数用于标明

//类型
  event_queue_insert(base, ev, evlist_inserted);

  return (evsel->add(evbase, ev));
 } else if ((ev->ev_events & ev_signal) &&
     !(ev->ev_flags & evlist_signal)) {
  event_queue_insert(base, ev, evlist_signal);

  return (evsel->add(evbase, ev));
 }

 return (0);
}

分析下上面的event_queue_insert函数,其代码如下:

void
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
 int docount = 1;

 if (ev->ev_flags & queue) {
  /* double insertion is possible for active events */
  if (queue & evlist_active)
   return;

  event_errx(1, "%s: %p(fd %d) already on queue %x", __func__,
      ev, ev->ev_fd, queue);
 }

 if (ev->ev_flags & evlist_internal)
  docount = 0;

 if (docount)
  base->event_count++;

 ev->ev_flags |= queue; //如果是事件的话,ev_flags将变成evlist_init | evlist_inserted
 switch (queue) {

 case evlist_active:
  if (docount)
   base->event_count_active++;
  tailq_insert_tail(base->activequeues[ev->ev_pri],
      ev,ev_active_next);
  break;

 case evlist_signal:
  tailq_insert_tail(&signalqueue, ev, ev_signal_next);
  break;

 case evlist_timeout: {
  struct event *tmp = rb_insert(event_tree, &base->timetree, ev); //定时器要加入到红黑树中
  assert(tmp == null);
  break;
 }

 case evlist_inserted:
  tailq_insert_tail(&base->eventqueue, ev, ev_next);
  break; //将事件加入到base->eventqueue管理的事件双向链表中

 default:
  event_errx(1, "%s: unknown queue %x", __func__, queue);
 }
}

event_add函数在调用event_queue_insert函数之后,接下来就会执行"evsel->add(evbase, ev)"这1句(添加定时器时没有),因此我们需要看1下win32_insert的代码.

int
win32_insert(struct win32op *win32op, struct event *ev)
{
 int i;

//windows下信号是什么搞不懂

 if (ev->ev_events & ev_signal) {
  if (ev->ev_events & (ev_read|ev_write))
   event_errx(1, "%s: ev_signal incompatible use",
              __func__);
  if((int)signal(event_signal(ev), signal_handler) == -1)
   return (-1);

  return (0);
 }

//事件类型不为读就为写
 if (!(ev->ev_events & (ev_read|ev_write)))
  return (0);

 for (i=0;i<win32op->n_events;++i) {
  if(win32op->events[i] == ev) {
   event_debug(("%s: event for %d already inserted.",
         __func__, (int)ev->ev_fd));
   return (0);
  }
 }
 event_debug(("%s: adding event for %d", __func__, (int)ev->ev_fd));

//根据事件的类型将该事件对应的fd加入到相应的监听集合中
 if (ev->ev_events & ev_read) {
  if (do_fd_set(win32op, ev->ev_fd, 1)<0)
   return (-1);
 }
 if (ev->ev_events & ev_write) {
  if (do_fd_set(win32op, ev->ev_fd, 0)<0)
   return (-1);
 }

 if (win32op->n_events_alloc == win32op->n_events) {
  size_t sz;
  win32op->n_events_alloc *= 2;
  sz = sizeof(struct event*)*win32op->n_events_alloc;
  if (!(win32op->events = realloc(win32op->events, sz)))
   return (-1);
 }

//事件总数加1
 win32op->events[win32op->n_events++] = ev;

 return (0);
}

下面就分析下关键的event_base_loop函数吧.event_dispatch->event_loop->event_base_loop,所有的业务逻辑都是在event_base_loop中处理的.

int
event_base_loop(struct event_base *base, int flags)
{
 const struct eventop *evsel = base->evsel;
 void *evbase = base->evbase;
 struct timeval tv;
 int res, done;

 /* calculate the initial events that we are waiting for */
 if (evsel->recalc(base, evbase, 0) == -1)
  return (-1);

 done = 0;
 while (!done) {
  /* terminate the loop if we have been asked to */
  if (base->event_gotterm) {
   base->event_gotterm = 0;
   break;
  }

  /* you cannot use this interface for multi-threaded apps */
  while (event_gotsig) {
   event_gotsig = 0;
   if (event_sigcb) {
    res = (*event_sigcb)();
    if (res == -1) {
     errno = eintr;
     return (-1);
    }
   }
  }

  /* check if time is running backwards */
  gettimeofday(&tv, null);

//正常清空下当前时间肯定大于event_tv,若是人为地修改了系统时间,那么就可能导致当前时间小于event_tv

//这个时候就需要校正event_tv了,然后校正所有事件的到期时间.

  if (timercmp(&tv, &base->event_tv, <)) {
   struct timeval off;
   event_debug(("%s: time is running backwards, corrected",
        __func__));
   timersub(&base->event_tv, &tv, &off);
   timeout_correct(base, &off);
  }

//这里会更新event_tv

  base->event_tv = tv;

  if (!base->event_count_active && !(flags & evloop_nonblock))
   timeout_next(base, &tv);
  else
   timerclear(&tv);
  
  /* if we have no events, we just exit */
  if (!event_haveevents(base)) {
   event_debug(("%s: no events registered.", __func__));
   return (1);
  }

  res = evsel->dispatch(base, evbase, &tv);

  if (res == -1)
   return (-1);

  timeout_process(base);

  if (base->event_count_active) {
   event_process_active(base);
   if (!base->event_count_active && (flags & evloop_once))
    done = 1;
  } else if (flags & evloop_nonblock)
   done = 1;

  if (evsel->recalc(base, evbase, 0) == -1)
   return (-1);
 }

 event_debug(("%s: asked to terminate loop.", __func__));
 return (0);
}

这里有3个比较关键的函数,分别为:evsel->dispatch即win32_dispatch,timeout_process和event_process_active.

int
win32_dispatch(struct event_base *base, struct win32op *win32op,
        struct timeval *tv)
{
 int res = 0;
 int i;
 int fd_count;

//每次都这样繁琐地复制是为了不影响原来的描述符集合.如果直接使用xxx_in这些描述符集合,select函数运行之

//后,这些xxx_in集合中就只有激活的描述符了,这样就丢失了未激活的描述符了.

 fd_set_copy(win32op->readset_out, win32op->readset_in);
 fd_set_copy(win32op->exset_out, win32op->readset_in);
 fd_set_copy(win32op->writeset_out, win32op->writeset_in);

 fd_count =
           (win32op->readset_out->fd_count > win32op->writeset_out->fd_count) ?
     win32op->readset_out->fd_count : win32op->writeset_out->fd_count;

 if (!fd_count) {
  /* windows doesn't like you to call select() with no sockets */
  sleep(timeval_to_ms(tv));
  signal_process();
  return (0);
 }

//使用select系统调用来从readset_out,writeset_out,exset_out这3个文件描述符集合中获得被激活的描述符

 res = select(fd_count,
       (struct fd_set*)win32op->readset_out,
       (struct fd_set*)win32op->writeset_out,
       (struct fd_set*)win32op->exset_out, tv);

 event_debug(("%s: select returned %d", __func__, res));

 if(res <= 0) {
  signal_process();
  return res;
 }

//遍历所有的事件,判断事件是因为read被激活还是因为write被激活

 for (i=0;i<win32op->n_events;++i) {
  struct event *ev;
  int got = 0; //got初始化为0
  ev = win32op->events[i];
  if ((ev->ev_events & ev_read)) {
   if (fd_isset(ev->ev_fd, win32op->readset_out) ||
       fd_isset(ev->ev_fd, win32op->exset_out)) {
    got |= ev_read;
   }
  }
  if ((ev->ev_events & ev_write)) {
   if (fd_isset(ev->ev_fd, win32op->writeset_out)) {
    got |= ev_write;
   }
  }
  if (!got)
   continue;

//如果事件不是永久的,那么先将事件删除掉.这就是为什么memcached memcached.c中的conn_new函数调用

//如:conn_new(sfd, conn_read, ev_read | ev_persist, data_buffer_size, 0),这样就免去事件被删除,

//然后又需要在事件处理回调函数中重新调用event_add函数添加1次.
  if (!(ev->ev_events & ev_persist)) {
   event_del(ev);
  }
  event_active(ev,got,1); //这个函数将事件添加到激活队列链表中
 }

 if (signal_recalc() == -1)
  return (-1);

 return (0);
}

timeout_process函数遍历红黑树,找到到时的定时器,然后将其加入到激活队列中
void
timeout_process(struct event_base *base)
{
 struct timeval now;
 struct event *ev, *next;

 gettimeofday(&now, null);

 for (ev = rb_min(event_tree, &base->timetree); ev; ev = next) {
  if (timercmp(&ev->ev_timeout, &now, >))
   break;
  next = rb_next(event_tree, &base->timetree, ev);

//将定时器从红黑树中移除

  event_queue_remove(base, ev, evlist_timeout);

  /* delete this event from the i/o queues */

//再将定时器从激活链表中删除
  event_del(ev);

  event_debug(("timeout_process: call %p",
    ev->ev_callback));
  event_active(ev, ev_timeout, 1);
 }
}

event_process_active函数遍历激活队列,对每个被激活的事情调用其回调函数.

static void
event_process_active(struct event_base *base)
{
 struct event *ev;
 struct event_list *activeq = null;
 int i;
 short ncalls;

 if (!base->event_count_active)
  return;

//这里并不会1次将所有的激活队列处理完,按优先级顺序遍历所有激活的链表,如果1个链表不为空就退出循环,

//处理该链表中的事件,因为event_base_loop是1个循环,因此其余的事件也能得到处理.这里隐含了1个问题,

//如果不断有高优先级的事件被加入进来,低优先级的事件都得不到处理.事件的优先级可以通过

//event_priority_set这个函数来设置.

 for (i = 0; i < base->nactivequeues; ++i) {
  if (tailq_first(base->activequeues[i]) != null) {
   activeq = base->activequeues[i];
   break;
  }
 }

//将优先级列队中的所有事件从激活链表中删除,下面才会处理该事件

 for (ev = tailq_first(activeq); ev; ev = tailq_first(activeq)) {
  event_queue_remove(base, ev, evlist_active);
  
  /* allows deletes to work */
  ncalls = ev->ev_ncalls;
  ev->ev_pncalls = &ncalls;
  while (ncalls) { //回调函数可以被调用多次,不过好象不用
   ncalls--;
   ev->ev_ncalls = ncalls;
   (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg); //事件处理回调函数在此处被调用
  }
 }
}

        以上的分析比较清晰地理了下libevent整体脉络,因为c没有模板机制,所以大量的宏让人比较晕,但是这些数据结构是有必要分析1下的.linux下 还可以分析下epoll的使用.如果完全理解了libevent-1.1a就可以去分析比较新的1.4.9版本了,该版本添加不少东西,我都不明白 libevent中的http,rpc等用来做什么.