nginx evnent_posted 线程处理函数

时间:2021-03-24 17:32:18


/*
 * Copyright (C) Igor Sysoev
 */




#ifndef _NGX_EVENT_POSTED_H_INCLUDED_
#define _NGX_EVENT_POSTED_H_INCLUDED_




#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>




#if (NGX_THREADS)
extern ngx_mutex_t  *ngx_posted_events_mutex;
#endif




//将ev插入到queue指向的队列的队头位置,用宏会更快,不使用函数


#define ngx_locked_post_event(ev, queue)                                      \
                                                                              \
    if (ev->prev == NULL) {                                                   \
        ev->next = (ngx_event_t *) *queue;                                    \
        ev->prev = (ngx_event_t **) queue;                                    \
        *queue = ev;                                                          \
                                                                              \
        if (ev->next) {                                                       \
            ev->next->prev = &ev->next;                                       \
        }                                                                     \
                                                                              \
        ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "post event %p", ev);  \
                                                                              \
    } else  {                                                                 \
        ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,                        \
                       "update posted event %p", ev);                         \
    }




//加锁,然后将ev插入到queue的队头位置


#define ngx_post_event(ev, queue)                                             \
                                                                              \
    ngx_mutex_lock(ngx_posted_events_mutex);                                  \
    ngx_locked_post_event(ev, queue);                                         \
    ngx_mutex_unlock(ngx_posted_events_mutex);




//将ev从所在的queue中删除


#define ngx_delete_posted_event(ev)                                           \
                                                                              \
    *(ev->prev) = ev->next;                                                   \
                                                                              \
    if (ev->next) {                                                           \
        ev->next->prev = ev->prev;                                            \
    }                                                                         \
                                                                              \
    ev->prev = NULL;                                                          \
    ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,                            \
                   "delete posted event %p", ev);






void ngx_event_process_posted( ngx_cycle_t *cycle,
    ngx_thread_volatile ngx_event_t **posted );


void ngx_wakeup_worker_thread( ngx_cycle_t *cycle );


#if (NGX_THREADS)
ngx_int_t ngx_event_thread_process_posted( ngx_cycle_t *cycle );
#endif




extern ngx_thread_volatile ngx_event_t  *ngx_posted_accept_events;


extern ngx_thread_volatile ngx_event_t  *ngx_posted_events;




#endif /* _NGX_EVENT_POSTED_H_INCLUDED_ */





/*
 * Copyright (C) Igor Sysoev
 */




#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>




ngx_thread_volatile ngx_event_t  *ngx_posted_accept_events;
ngx_thread_volatile ngx_event_t  *ngx_posted_events;


#if (NGX_THREADS)
ngx_mutex_t                      *ngx_posted_events_mutex;
#endif




//将posted链表中的所有项,删除掉,然后调用ev的处理函数处理之


void
ngx_event_process_posted( ngx_cycle_t *cycle,
    ngx_thread_volatile ngx_event_t **posted )
{
    ngx_event_t  *ev;


    for ( ;; ) 
{


        ev = (ngx_event_t *) *posted;


        ngx_log_debug1( NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                      "posted event %p", ev );


        if ( ev == NULL )
{
            return;
        }


        ngx_delete_posted_event( ev );


        ev->handler( ev );
    }
}




#if (NGX_THREADS) && !(NGX_WIN32)


void
ngx_wakeup_worker_thread( ngx_cycle_t *cycle )
{
    ngx_int_t     i;


#if 0
    ngx_uint_t    busy;
    ngx_event_t  *ev;


    busy = 1;


    if ( ngx_mutex_lock( ngx_posted_events_mutex ) == NGX_ERROR )
{
        return;
    }


    for ( ev = (ngx_event_t *) ngx_posted_events; ev; ev = ev->next ) 
{
        if ( *(ev->lock) == 0 ) 
{
            busy = 0;
            break;
        }
    }


    ngx_mutex_unlock(ngx_posted_events_mutex);


    if (busy)
{
        return;
    }


#endif


//只唤醒一个空闲的线程


    for ( i = 0; i < ngx_threads_n; i++ )
{
        if ( ngx_threads[ i ].state == NGX_THREAD_FREE )
{
            ngx_cond_signal( ngx_threads[i].cv ); 


            return;
        }
    }
}




ngx_int_t
ngx_event_thread_process_posted( ngx_cycle_t *cycle )
{
    ngx_event_t  *ev;


    for ( ; ; )
{


        ev = (ngx_event_t *) ngx_posted_events;


        for (  ; ; ) 
{


            ngx_log_debug1( NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                          "posted event %p", ev );


            if ( ev == NULL ) // 到最后了,直接返回OK
{
                return NGX_OK;
            }


            if ( ngx_trylock( ev->lock ) == 0 ) // 没有get到ev的锁,则看下一个
{


                ngx_log_debug1( NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "posted event %p is busy", ev );


                ev = ev->next;


                continue;
            }


//获得了 ev->lock锁,set为1了,则看下ev->own_lock,如果ev->own_lock > 1 表示已经被占据了,则解锁,看下一个ev


// ev->own_lock为0表示还没有被占据,设置为1表示被当前线程占据了


// 我觉得这里的own_lock是多此一举,如果event被除了queue之外的多个线程同时修改的话,用ev->lock就够了


//搞什么own_lock呢? 


            if ( ev->lock != ev->own_lock )
{
                if ( *(ev->own_lock) )
{
                    ngx_log_error( NGX_LOG_ALERT, cycle->log, 0,
                             "the own lock of the posted event %p is busy", ev );


                    ngx_unlock( ev->lock );


                    ev = ev->next;


                    continue;
                }


//设置为1 是原子操作


                *(ev->own_lock) = 1;
            }


//将ev 从queue中删除


            ngx_delete_posted_event( ev );


//调整ev的各种状态,防止其他线程扫描到ev后同时处理


            ev->locked = 1;


            ev->ready |= ev->posted_ready;
            ev->timedout |= ev->posted_timedout;
            ev->pending_eof |= ev->posted_eof;


#if (NGX_HAVE_KQUEUE)
            ev->kq_errno |= ev->posted_errno;
#endif


            if ( ev->posted_available ) 
{
                ev->available = ev->posted_available;
            }


            ev->posted_ready = 0;
            ev->posted_timedout = 0;
            ev->posted_eof = 0;


#if (NGX_HAVE_KQUEUE)
            ev->posted_errno = 0;
#endif


            ev->posted_available = 0;

//解锁,其他线程可以继续扫描处理了


            ngx_mutex_unlock( ngx_posted_events_mutex );


            ev->handler( ev );  // ev 在锁外静悄悄的自己单独处理


            ngx_mutex_lock( ngx_posted_events_mutex );


            if ( ev->locked )  // ev 已经被锁住了,肯定是自己锁的,上面自己加的锁,其余线程抢不到的
{
                ngx_unlock( ev->lock ); // 解锁


                if ( ev->lock != ev->own_lock ) // 解锁own_lock
{
                    ngx_unlock( ev->own_lock );
                }
            }


            ngx_log_debug1( NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "posted event %p is done", ev );


            break; // 跳出里层循环,跳到外层循环,接着从头开始处理,直至处理到ev为NULL为止
        }
    }
}


#else


void
ngx_wakeup_worker_thread(ngx_cycle_t *cycle)
{
}


#endif