使用条件变量 + 互斥区
http://blogread.cn/it/article/7248?f=catetitle
http://baike.baidu.com/link?url=mFxsi1w7pYQI3p-C175_u14hB0fCbYFr4JqPlNpfEZEbn4l1wZLuHuLgsrc__rvA815BnG99hyUoYgq1SGsw5a
类比: c++ pthread_mutex_[un]lock(obj) 与 java synchronize(obj)c++ pthread_cond_wait(cond, obj) 与 java wait(obj)
#include<pthread.h>#include<unistd.h>#include<stdio.h>#include<string.h>#include<stdlib.h> static pthread_mutex_t mtx=PTHREAD_MUTEX_INITIALIZER;static pthread_cond_t cond=PTHREAD_COND_INITIALIZER; struct node { int n_number; struct node *n_next;} *head=NULL; /*[thread_func]*/ /*释放节点内存*/static void cleanup_handler(void*arg) { printf("Clean up handler of second thread.\n"); free(arg); (void)pthread_mutex_unlock(&mtx);} static void *thread_func(void *arg) { struct node*p=NULL; pthread_cleanup_push(cleanup_handler,p); pthread_mutex_lock(&mtx); //这个mutex_lock主要是用来保护wait等待临界时期的情况, //当在wait为放入队列时,这时,已经存在Head条件等待激活 //的条件,此时可能会漏掉这种处理 //这个while要特别说明一下,单个pthread_cond_wait功能很完善, //为何这里要有一个while(head==NULL)呢?因为pthread_cond_wait //里的线程可能会被意外唤醒,如果这个时候head==NULL, //则不是我们想要的情况。这个时候, //应该让线程继续进入pthread_cond_wait while(1) { while(head==NULL) { pthread_cond_wait(&cond,&mtx); } //pthread_cond_wait会先解除之前的pthread_mutex_lock锁定的mtx, //然后阻塞在等待队列里休眠,直到再次被唤醒 //(大多数情况下是等待的条件成立而被唤醒,唤醒后, //该进程会先锁定先pthread_mutex_lock(&mtx);, //再读取资源用这个流程是比较清楚的 /*block-->unlock-->wait()return-->lock*/ p=head; head=head->n_next; printf("Got%dfromfrontofqueue\n",p->n_number); free(p); } pthread_mutex_unlock(&mtx);//临界区数据操作完毕,释放互斥锁 pthread_cleanup_pop(0); return 0;} int main(void) { pthread_t tid; int i; struct node *p; pthread_create(&tid,NULL,thread_func,NULL); //子线程会一直等待资源,类似生产者和消费者, //但是这里的消费者可以是多个消费者, //而不仅仅支持普通的单个消费者,这个模型虽然简单, //但是很强大 for(i=0;i<10;i++) { p=(struct node*)malloc(sizeof(struct node)); p->n_number=i; pthread_mutex_lock(&mtx);//需要操作head这个临界资源,先加锁, p->n_next=head; head=p; pthread_cond_signal(&cond); pthread_mutex_unlock(&mtx);//解锁 sleep(1); } printf("thread1wannaendthecancelthread2.\n"); pthread_cancel(tid); //关于pthread_cancel,有一点额外的说明,它是从外部终止子线程, //子线程会在最近的取消点,退出线程,而在我们的代码里,最近的 //取消点肯定就是pthread_cond_wait()了。 pthread_join(tid,NULL); printf("Alldone--exiting\n"); return 0;}
附上c++版synchronize/wait/notify条件变量实现(cond)
class NormalCond : public Condhttp://blogread.cn/it/article/7248?f=catetitle
{
public:
NormalCond() {
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond, NULL);
}
~NormalCond() {
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void lock() { pthread_mutex_lock(&_mutex); }
void unlock() { pthread_mutex_unlock(&_mutex); }
void wait(size_t) { pthread_cond_wait(&_cond, &_mutex); }
void wake() { pthread_cond_signal(&_cond); }
private:
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
class LayeredCond : public Cond
{
public:
LayeredCond(size_t layers = 1) : _value(0), _layers(layers) {
pthread_mutex_init(&_mutex, NULL);
if (_layers > sizeof(int)*8) {
printf("FATAL: cannot support such layer %u (max %u)\n",
_layers, sizeof(int)*8);
abort();
}
_waiters = new size_t[_layers];
memset(_waiters, 0, sizeof(size_t)*_layers);
}
~LayeredCond() {
pthread_mutex_destroy(&_mutex);
delete _waiters;
_waiters = NULL;
}
void lock() {
pthread_mutex_lock(&_mutex);
}
void unlock() {
pthread_mutex_unlock(&_mutex);
}
void wait(size_t layer) {//cond wait
if (layer >= _layers) {
printf("FATAL: layer overflow (%u/%u)\n", layer, _layers);
abort();
}
_waiters[layer]++; //record waiter threads on condition "_value"
while (_value == 0) {
int value = _value;
unlock();
syscall(__NR_futex, &_value, FUTEX_WAIT_BITSET, value,//suspend and wait for cond wake
NULL, NULL, layer2mask(layer));
lock(); //when waked, try to get lock again
}
_waiters[layer]--;
_value--;
}
void wake() {
int mask = ~0;
lock();
for (size_t i = 0; i < _layers; i++) {
if (_waiters[i] > 0) {
mask = layer2mask(i);
break;
}
}
_value++;
unlock();
syscall(__NR_futex, &_value, FUTEX_WAKE_BITSET, 1,
NULL, NULL, mask);
}
private:
int layer2mask(size_t layer) {
return 1 << layer;
}
private:
pthread_mutex_t _mutex;
int _value;
size_t* _waiters;
size_t _layers;
};
template<class T>
class Stack
{
public:
Stack(size_t size, size_t cond_layers = 0) : _size(size), _sp(0) {
_buf = new T*[_size];
_cond = (cond_layers > 0) ?
(Cond*)new LayeredCond(cond_layers) : (Cond*)new NormalCond();
}
~Stack() {
delete []_buf;
delete _cond;
}
T* pop(size_t layer = 0) {
T* ret = NULL;
_cond->lock();
do {
if (_sp > 0) {
ret = _buf[--_sp];
}
else {
_cond->wait(layer);
}
} while (ret == NULL);
_cond->unlock();
return ret;
}
void push(T* obj) {
_cond->lock();
if (_sp >= _size) {
printf("FATAL: stack overflow\n");
abort();
}
_buf[_sp++] = obj;
_cond->unlock();
_cond->wake();
}
private:
const size_t _size;
size_t _sp;
T** _buf;
Cond* _cond;
};