/*
* Copyright (C) Igor Sysoev
*/
#ifndef _NGX_EVENT_POSTED_H_INCLUDED_
#define _NGX_EVENT_POSTED_H_INCLUDED_
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>
#if (NGX_THREADS)
extern ngx_mutex_t *ngx_posted_events_mutex;
#endif
//将ev插入到queue指向的队列的队头位置,用宏会更快,不使用函数
#define ngx_locked_post_event(ev, queue) \
\
if (ev->prev == NULL) { \
ev->next = (ngx_event_t *) *queue; \
ev->prev = (ngx_event_t **) queue; \
*queue = ev; \
\
if (ev->next) { \
ev->next->prev = &ev->next; \
} \
\
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "post event %p", ev); \
\
} else { \
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, \
"update posted event %p", ev); \
}
//加锁,然后将ev插入到queue的队头位置
#define ngx_post_event(ev, queue) \
\
ngx_mutex_lock(ngx_posted_events_mutex); \
ngx_locked_post_event(ev, queue); \
ngx_mutex_unlock(ngx_posted_events_mutex);
//将ev从所在的queue中删除
#define ngx_delete_posted_event(ev) \
\
*(ev->prev) = ev->next; \
\
if (ev->next) { \
ev->next->prev = ev->prev; \
} \
\
ev->prev = NULL; \
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, \
"delete posted event %p", ev);
void ngx_event_process_posted( ngx_cycle_t *cycle,
ngx_thread_volatile ngx_event_t **posted );
void ngx_wakeup_worker_thread( ngx_cycle_t *cycle );
#if (NGX_THREADS)
ngx_int_t ngx_event_thread_process_posted( ngx_cycle_t *cycle );
#endif
extern ngx_thread_volatile ngx_event_t *ngx_posted_accept_events;
extern ngx_thread_volatile ngx_event_t *ngx_posted_events;
#endif /* _NGX_EVENT_POSTED_H_INCLUDED_ */
/*
* Copyright (C) Igor Sysoev
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>
ngx_thread_volatile ngx_event_t *ngx_posted_accept_events;
ngx_thread_volatile ngx_event_t *ngx_posted_events;
#if (NGX_THREADS)
ngx_mutex_t *ngx_posted_events_mutex;
#endif
//将posted链表中的所有项,删除掉,然后调用ev的处理函数处理之
void
ngx_event_process_posted( ngx_cycle_t *cycle,
ngx_thread_volatile ngx_event_t **posted )
{
ngx_event_t *ev;
for ( ;; )
{
ev = (ngx_event_t *) *posted;
ngx_log_debug1( NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted event %p", ev );
if ( ev == NULL )
{
return;
}
ngx_delete_posted_event( ev );
ev->handler( ev );
}
}
#if (NGX_THREADS) && !(NGX_WIN32)
void
ngx_wakeup_worker_thread( ngx_cycle_t *cycle )
{
ngx_int_t i;
#if 0
ngx_uint_t busy;
ngx_event_t *ev;
busy = 1;
if ( ngx_mutex_lock( ngx_posted_events_mutex ) == NGX_ERROR )
{
return;
}
for ( ev = (ngx_event_t *) ngx_posted_events; ev; ev = ev->next )
{
if ( *(ev->lock) == 0 )
{
busy = 0;
break;
}
}
ngx_mutex_unlock(ngx_posted_events_mutex);
if (busy)
{
return;
}
#endif
//只唤醒一个空闲的线程
for ( i = 0; i < ngx_threads_n; i++ )
{
if ( ngx_threads[ i ].state == NGX_THREAD_FREE )
{
ngx_cond_signal( ngx_threads[i].cv );
return;
}
}
}
ngx_int_t
ngx_event_thread_process_posted( ngx_cycle_t *cycle )
{
ngx_event_t *ev;
for ( ; ; )
{
ev = (ngx_event_t *) ngx_posted_events;
for ( ; ; )
{
ngx_log_debug1( NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted event %p", ev );
if ( ev == NULL ) // 到最后了,直接返回OK
{
return NGX_OK;
}
if ( ngx_trylock( ev->lock ) == 0 ) // 没有get到ev的锁,则看下一个
{
ngx_log_debug1( NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted event %p is busy", ev );
ev = ev->next;
continue;
}
//获得了 ev->lock锁,set为1了,则看下ev->own_lock,如果ev->own_lock > 1 表示已经被占据了,则解锁,看下一个ev
// ev->own_lock为0表示还没有被占据,设置为1表示被当前线程占据了
// 我觉得这里的own_lock是多此一举,如果event被除了queue之外的多个线程同时修改的话,用ev->lock就够了
//搞什么own_lock呢?
if ( ev->lock != ev->own_lock )
{
if ( *(ev->own_lock) )
{
ngx_log_error( NGX_LOG_ALERT, cycle->log, 0,
"the own lock of the posted event %p is busy", ev );
ngx_unlock( ev->lock );
ev = ev->next;
continue;
}
//设置为1 是原子操作
*(ev->own_lock) = 1;
}
//将ev 从queue中删除
ngx_delete_posted_event( ev );
//调整ev的各种状态,防止其他线程扫描到ev后同时处理
ev->locked = 1;
ev->ready |= ev->posted_ready;
ev->timedout |= ev->posted_timedout;
ev->pending_eof |= ev->posted_eof;
#if (NGX_HAVE_KQUEUE)
ev->kq_errno |= ev->posted_errno;
#endif
if ( ev->posted_available )
{
ev->available = ev->posted_available;
}
ev->posted_ready = 0;
ev->posted_timedout = 0;
ev->posted_eof = 0;
#if (NGX_HAVE_KQUEUE)
ev->posted_errno = 0;
#endif
ev->posted_available = 0;
//解锁,其他线程可以继续扫描处理了
ngx_mutex_unlock( ngx_posted_events_mutex );
ev->handler( ev ); // ev 在锁外静悄悄的自己单独处理
ngx_mutex_lock( ngx_posted_events_mutex );
if ( ev->locked ) // ev 已经被锁住了,肯定是自己锁的,上面自己加的锁,其余线程抢不到的
{
ngx_unlock( ev->lock ); // 解锁
if ( ev->lock != ev->own_lock ) // 解锁own_lock
{
ngx_unlock( ev->own_lock );
}
}
ngx_log_debug1( NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted event %p is done", ev );
break; // 跳出里层循环,跳到外层循环,接着从头开始处理,直至处理到ev为NULL为止
}
}
}
#else
void
ngx_wakeup_worker_thread(ngx_cycle_t *cycle)
{
}
#endif