libuv学习笔记(24)

时间:2023-02-26 13:08:55

libuv学习笔记(24)

线程相关数据结构与函数(2)

数据结构

typedef union {//读写锁
struct {
unsigned int num_readers_;
CRITICAL_SECTION num_readers_lock_;
HANDLE write_semaphore_;
} state_;
/* TODO: remove me in v2.x. */
struct {
SRWLOCK unused_;
} unused1_;
/* TODO: remove me in v2.x. */
struct {
uv_mutex_t unused1_;
uv_mutex_t unused2_;
} unused2_;
} uv_rwlock_t;
typedef HANDLE uv_sem_t;//信号量
typedef union {//线程池同步管理
CONDITION_VARIABLE cond_var;//调用系统API实现
struct {//libuv自己实现
unsigned int waiters_count;
CRITICAL_SECTION waiters_count_lock;
HANDLE signal_event;
HANDLE broadcast_event;
} fallback;
} uv_cond_t;
typedef struct {
unsigned int n;
unsigned int count;
uv_mutex_t mutex;
uv_sem_t turnstile1;
uv_sem_t turnstile2;
} uv_barrier_t;

读写锁相关函数

初始化

int uv_rwlock_init(uv_rwlock_t* rwlock) {
//创建信号量,最大资源数和可用资源数都为1
HANDLE handle = CreateSemaphoreW(NULL, 1, 1, NULL);
if (handle == NULL)
return uv_translate_sys_error(GetLastError());
rwlock->state_.write_semaphore_ = handle;
//初始化临界区
InitializeCriticalSection(&rwlock->state_.num_readers_lock_);
//初始化读请求的数量
rwlock->state_.num_readers_ = 0;
return 0;
}

释放读写锁

void uv_rwlock_destroy(uv_rwlock_t* rwlock) {
DeleteCriticalSection(&rwlock->state_.num_readers_lock_);
CloseHandle(rwlock->state_.write_semaphore_);
}

读锁定

void uv_rwlock_rdlock(uv_rwlock_t* rwlock) {
//进入临界区
EnterCriticalSection(&rwlock->state_.num_readers_lock_);
//递增读请求数量
if (++rwlock->state_.num_readers_ == 1) {
//如果为1,说明没有其他的地方读锁定了,等待资源可用
DWORD r = WaitForSingleObject(rwlock->state_.write_semaphore_, INFINITE);
if (r != WAIT_OBJECT_0)
uv_fatal_error(GetLastError(), "WaitForSingleObject");
}
//离开临界区
LeaveCriticalSection(&rwlock->state_.num_readers_lock_);
}

尝试读锁定

int uv_rwlock_tryrdlock(uv_rwlock_t* rwlock) {
int err;
//尝试进入临界区
if (!TryEnterCriticalSection(&rwlock->state_.num_readers_lock_))
return UV_EBUSY;
err = 0;
if (rwlock->state_.num_readers_ == 0) {
//获取资源
DWORD r = WaitForSingleObject(rwlock->state_.write_semaphore_, 0);
if (r == WAIT_OBJECT_0)
rwlock->state_.num_readers_++;
else if (r == WAIT_TIMEOUT)
err = UV_EBUSY;
else if (r == WAIT_FAILED)
uv_fatal_error(GetLastError(), "WaitForSingleObject");
} else {
rwlock->state_.num_readers_++;
}

LeaveCriticalSection(&rwlock->state_.num_readers_lock_);
return err;
}

释放读锁定

void uv_rwlock_rdunlock(uv_rwlock_t* rwlock) {
//进入临界区
EnterCriticalSection(&rwlock->state_.num_readers_lock_);
if (--rwlock->state_.num_readers_ == 0) {
//没有都请求了,释放资源
if (!ReleaseSemaphore(rwlock->state_.write_semaphore_, 1, NULL))
uv_fatal_error(GetLastError(), "ReleaseSemaphore");
}
//离开临界区
LeaveCriticalSection(&rwlock->state_.num_readers_lock_);
}

写锁定

void uv_rwlock_wrlock(uv_rwlock_t* rwlock) {
//等待资源可用
DWORD r = WaitForSingleObject(rwlock->state_.write_semaphore_, INFINITE);
if (r != WAIT_OBJECT_0)
uv_fatal_error(GetLastError(), "WaitForSingleObject");
}

尝试写锁定

int uv_rwlock_trywrlock(uv_rwlock_t* rwlock) {
DWORD r = WaitForSingleObject(rwlock->state_.write_semaphore_, 0);
if (r == WAIT_OBJECT_0)
return 0;
else if (r == WAIT_TIMEOUT)
return UV_EBUSY;
else
uv_fatal_error(GetLastError(), "WaitForSingleObject");
}

解除写锁定

void uv_rwlock_wrunlock(uv_rwlock_t* rwlock) {
//释放信号量
if (!ReleaseSemaphore(rwlock->state_.write_semaphore_, 1, NULL))
uv_fatal_error(GetLastError(), "ReleaseSemaphore");
}

信号量相关API

初始化

int uv_sem_init(uv_sem_t* sem, unsigned int value) {
*sem = CreateSemaphore(NULL, value, INT_MAX, NULL);//创建信号量
if (*sem == NULL)
return uv_translate_sys_error(GetLastError());
else
return 0;
}

释放信号量

void uv_sem_destroy(uv_sem_t* sem) {
if (!CloseHandle(*sem))//关闭句柄
abort();
}

发送

void uv_sem_post(uv_sem_t* sem) {
if (!ReleaseSemaphore(*sem, 1, NULL))//释放一个资源
abort();
}

等待

void uv_sem_wait(uv_sem_t* sem) {
if (WaitForSingleObject(*sem, INFINITE) != WAIT_OBJECT_0)
abort();
}

尝试等待

int uv_sem_trywait(uv_sem_t* sem) {
DWORD r = WaitForSingleObject(*sem, 0);

if (r == WAIT_OBJECT_0)
return 0;

if (r == WAIT_TIMEOUT)
return UV_EAGAIN;

abort();
return -1; /* Satisfy the compiler. */
}

线程池同步相关API

初始化

int uv_cond_init(uv_cond_t* cond) {
uv__once_init();//调用uv_init全局初始化
if (HAVE_CONDVAR_API())//系统支持相关的API
return uv_cond_condvar_init(cond);
else
return uv_cond_fallback_init(cond);
}

系统支持

static int uv_cond_condvar_init(uv_cond_t* cond) {
//调用InitializeConditionVariable初始化
pInitializeConditionVariable(&cond->cond_var);
return 0;
}

系统不支持

static int uv_cond_fallback_init(uv_cond_t* cond) {
int err;
cond->fallback.waiters_count = 0;
//初始化临界区
InitializeCriticalSection(&cond->fallback.waiters_count_lock);
//新建一个自动还原状态的事件,初始化为无信号
cond->fallback.signal_event = CreateEvent(NULL, /* no security */
FALSE, /* auto-reset event */
FALSE, /* non-signaled initially */
NULL); /* unnamed */
if (!cond->fallback.signal_event) {
err = GetLastError();
goto error2;
}
//创建一个需要手动改变状态的事件
cond->fallback.broadcast_event = CreateEvent(NULL, /* no security */
TRUE, /* manual-reset */
FALSE, /* non-signaled */
NULL); /* unnamed */
if (!cond->fallback.broadcast_event) {
err = GetLastError();
goto error;
}
return 0;
error:
CloseHandle(cond->fallback.signal_event);
error2:
DeleteCriticalSection(&cond->fallback.waiters_count_lock);
return uv_translate_sys_error(err);
}

释放

void uv_cond_destroy(uv_cond_t* cond) {
if (HAVE_CONDVAR_API())//根据系统的支持情况释放资源
uv_cond_condvar_destroy(cond);
else
uv_cond_fallback_destroy(cond);
}

设为有信号状态

void uv_cond_signal(uv_cond_t* cond) {
if (HAVE_CONDVAR_API())
uv_cond_condvar_signal(cond);
else
uv_cond_fallback_signal(cond);
}

使用系统API

static void uv_cond_condvar_signal(uv_cond_t* cond) {
pWakeConditionVariable(&cond->cond_var);//唤醒
}

libuv自己实现

static void uv_cond_fallback_signal(uv_cond_t* cond) {
int have_waiters;
EnterCriticalSection(&cond->fallback.waiters_count_lock);
have_waiters = cond->fallback.waiters_count > 0;
LeaveCriticalSection(&cond->fallback.waiters_count_lock);
if (have_waiters)
//将signal_event设为有信号,这样等待该event的线程中的一个将会唤醒
SetEvent(cond->fallback.signal_event);
}

广播通知(所有等待的线程都被唤醒)

void uv_cond_broadcast(uv_cond_t* cond) {
if (HAVE_CONDVAR_API())
uv_cond_condvar_broadcast(cond);
else
uv_cond_fallback_broadcast(cond);
}

使用系统API实现

static void uv_cond_condvar_broadcast(uv_cond_t* cond) {
pWakeAllConditionVariable(&cond->cond_var);//唤醒所有线程
}

libuv自己实现

static void uv_cond_fallback_broadcast(uv_cond_t* cond) {
int have_waiters;
EnterCriticalSection(&cond->fallback.waiters_count_lock);
have_waiters = cond->fallback.waiters_count > 0;
LeaveCriticalSection(&cond->fallback.waiters_count_lock);
if (have_waiters)
//所有等待broadcast_event的线程都被唤醒
SetEvent(cond->fallback.broadcast_event);
}

等待

void uv_cond_wait(uv_cond_t* cond, uv_mutex_t* mutex) {
if (HAVE_CONDVAR_API())
uv_cond_condvar_wait(cond, mutex);
else
uv_cond_fallback_wait(cond, mutex);
}

通过系统API实现

static void uv_cond_condvar_wait(uv_cond_t* cond, uv_mutex_t* mutex) {
//等待cond_var,并离开临界区mutex
if (!pSleepConditionVariableCS(&cond->cond_var, mutex, INFINITE))
abort();
}

libuv自己实现

static int uv_cond_wait_helper(uv_cond_t* cond, uv_mutex_t* mutex,
DWORD dwMilliseconds) {
DWORD result;
int last_waiter;
HANDLE handles[2] = {
cond->fallback.signal_event,
cond->fallback.broadcast_event
};
EnterCriticalSection(&cond->fallback.waiters_count_lock);
cond->fallback.waiters_count++;//等待的线程计数加一
LeaveCriticalSection(&cond->fallback.waiters_count_lock);
//离开临界区
uv_mutex_unlock(mutex);
//等待任意一个event为有信号状态
result = WaitForMultipleObjects(2, handles, FALSE, dwMilliseconds);
EnterCriticalSection(&cond->fallback.waiters_count_lock);
cond->fallback.waiters_count--;
//如果信号是broadcast_event并且没有等待者了,说明这是最后一个等待的线程
last_waiter = result == WAIT_OBJECT_0 + 1
&& cond->fallback.waiters_count == 0;
LeaveCriticalSection(&cond->fallback.waiters_count_lock);
//对于最后一个等待线程,手动将broadcast_event设为无信号
if (last_waiter) {
ResetEvent(cond->fallback.broadcast_event);
}
uv_mutex_lock(mutex);
if (result == WAIT_OBJECT_0 || result == WAIT_OBJECT_0 + 1)
return 0;
if (result == WAIT_TIMEOUT)
return UV_ETIMEDOUT;
abort();
return -1; /* Satisfy the compiler. */
}

等待指定时间,与上面的函数相比,多了一个超时参数

int uv_cond_timedwait(uv_cond_t* cond, uv_mutex_t* mutex,
uint64_t timeout) {
if (HAVE_CONDVAR_API())
return uv_cond_condvar_timedwait(cond, mutex, timeout);
else
return uv_cond_fallback_timedwait(cond, mutex, timeout);
}

barrier相关函数

初始化

int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) {
int err;
barrier->n = count;//任务数量
barrier->count = 0;
err = uv_mutex_init(&barrier->mutex);
if (err)
return err;
err = uv_sem_init(&barrier->turnstile1, 0);
if (err)
goto error2;
err = uv_sem_init(&barrier->turnstile2, 1);
if (err)
goto error;
return 0;
error:
uv_sem_destroy(&barrier->turnstile1);
error2:
uv_mutex_destroy(&barrier->mutex);
return err;
}

释放

void uv_barrier_destroy(uv_barrier_t* barrier) {
uv_sem_destroy(&barrier->turnstile2);
uv_sem_destroy(&barrier->turnstile1);
uv_mutex_destroy(&barrier->mutex);
}

等待

int uv_barrier_wait(uv_barrier_t* barrier) {
int serial_thread;
uv_mutex_lock(&barrier->mutex);//进入临界区
if (++barrier->count == barrier->n) {//最后一个任务
uv_sem_wait(&barrier->turnstile2);//等待第二个信号量
uv_sem_post(&barrier->turnstile1);//释放第一个信号量
}
uv_mutex_unlock(&barrier->mutex);
uv_sem_wait(&barrier->turnstile1);//等待第一个信号量
uv_sem_post(&barrier->turnstile1);//释放第一个信号量
uv_mutex_lock(&barrier->mutex);//进入临界区
serial_thread = (--barrier->count == 0);
if (serial_thread) {//最后一个任务
uv_sem_wait(&barrier->turnstile1);/等待第一个
uv_sem_post(&barrier->turnstile2);//释放第二个
}
uv_mutex_unlock(&barrier->mutex);

uv_sem_wait(&barrier->turnstile2);//等待第二个
uv_sem_post(&barrier->turnstile2);//释放第二个
return serial_thread;
}

当某一线程需要等待其他一些线程任务完成之后才能继续运行时,可以使用barrier。
流程如下:
a.所有的相关线程都调用uv_barrier_wait等待同一个uv_barrier_t,此时除了最后一个,都会在等待第一个信号量的地方阻塞。
b.最后一个调用uv_barrier_wait的线程会等待第二个信号量,此时第二个信号量没有资源,然后释放第一个信号量。
c.之前阻塞在等待第一个信号量的线程中的一个获取信号量,继续运行,接着释放第一个信号量,这导致所有阻塞的进程都会因此一个一个的继续运行
d.接着除了最后一个线程,所有的线程都会阻塞在等待第二个信号量的地方。
e.最后一个线程,等待第一个信号量,此时第一个信号量重新变为没有资源,接着释放第二个信号量,激活其他等待的线程
f.其他阻塞在等待第二个信号量的线程,一个一个的唤醒,最终第二个信号量的资源为一,所有的线程继续运行

注意,最后一个线程是指逻辑上的最后一个,并不是确定的。
如果线程数比初始化时设置的任务数少,那么都会阻塞
如果线程池数比任务数多,那么等待任务数量的线程之后就会继续运行。