多个线程同时访问共享数据时可能会冲突,比如两个线程都要把某个全局变量增加1,这个操作在某平台需要三条指令完成:
-
从内存读变量值到寄存器
-
寄存器的值加1
-
将寄存器的值写回内存
假设两个线程在多处理器平台上同时执行这三条指令,则可能导致下图所示的结果,最后变量只加了一次而非两次。
如下例子就演示了这一过程:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int counter; /* incremented by threads */
void *doit(void *vptr)
{
int i, val;
for (i = 0; i < 100; i++) {
val = counter;
usleep(1000);
counter = val + 1;
}
return NULL;
}
int main(int argc, char **argv)
{
pthread_t tidA, tidB;
pthread_create(&tidA, NULL, &doit, NULL);
pthread_create(&tidB, NULL, &doit, NULL);
pthread_join(tidA, NULL);
pthread_join(tidB, NULL);
printf("counter = %d \n", counter);
return 0;
}
在这个例子中,虽然每个线程都给counter加了100,但由于结果的互相覆盖,最终输出值不是200,而是100。
tianfang@linux-k36c:/mnt/share/test> run
counter = 100
tianfang@linux-k36c:/mnt/share/test>
互斥锁mutex
对于多线程的程序,访问冲突的问题是很普遍的,解决的办法是引入互斥锁(Mutex,Mutual Exclusive Lock),获得锁的线程可以完成"读-修改-写"的操作,然后释放锁给其它线程,没有获得锁的线程只能等待而不能访问共享数据,这样"读-修改-写"三步操作组成一个原子操作,要么都执行,要么都不执行,不会执行到中间被打断,也不会在其它处理器上并行做这个操作。
和mutext相关的函数有如下几个:
#include <pthread.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
从名字中基本上就能看出来该如何使用,这里就只是以上面的例子为例,用mutex将其改造一下:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int counter; /* incremented by threads */
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
void *doit(void *vptr)
{
int i, val;
for (i = 0; i < 100; i++) {
pthread_mutex_lock(&counter_mutex);
val = counter;
usleep(1000);
counter = val + 1;
pthread_mutex_unlock(&counter_mutex);
}
return NULL;
}
int main(int argc, char **argv)
{
pthread_mutex_init(&counter_mutex, NULL);
pthread_t tidA, tidB;
pthread_create(&tidA, NULL, &doit, NULL);
pthread_create(&tidB, NULL, &doit, NULL);
pthread_join(tidA, NULL);
pthread_join(tidB, NULL);
pthread_mutex_destroy(&counter_mutex);
printf("counter = %d \n", counter);
return 0;
}
这次的运行结果就是正确的了:
tianfang@linux-k36c:/mnt/share/test> run
counter = 200
tianfang@linux-k36c:/mnt/share/test>
条件变量condtion
前面介绍的Mutext主要用于实现互斥,在多线程的场景中往往还有同步的需求。例如,在生产者和消费者的例子中,消费者需要等待生产者产生数据后才能消费。
单用mutex不能解决同步的问题:假如消费者先获取到锁,但此时并没有资源可用,如果把锁占着不放,生产者无法获取锁,此时就成了死锁;如果立即释放,并重新进入,则会成为轮询而消耗cpu资源。
在pthread库中通过可条件变量(Condition Variable)来阻塞等待一个条件,或者唤醒等待这个条件的线程。Condition Variable用pthread_cond_t类型的变量表示,其相关函数如下:
#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
从它的wait函数可以看到,condtion是和mutext一起使用的,基本流程如下:
-
消费者获取资源锁,如果当前无可用资源则调用cond_wait函数释放锁,并等待condtion通知。
-
生产者产生资源后,获取资源锁,放置资源后嗲用cond_signal函数通知。并释放资源锁。
-
消费者的cond_wait函数等到condtion通知后,重新获取资源锁,消费资源后再次释放资源锁。
从中可以看到,mutex用于保护资源,wait函数用于等待信号,signal和broadcast函数用于通知信号。其中wait函数中有一次对mutex的释放和重新获取操作,因此生产者和消费者并不会出现死锁。
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#include <iostream>
using namespace std;
queue<int> buffer;
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void* consumer(void *p)
{
while(true)
{
sleep(rand() % 5);
pthread_mutex_lock(&lock);
if (buffer.empty())
pthread_cond_wait(&has_product, &lock);
cout << "### Consume " << buffer.front() << endl;
buffer.pop();
pthread_mutex_unlock(&lock);
}
}
void* producer(void *p)
{
while(true)
{
sleep(rand() % 5);
pthread_mutex_lock(&lock);
buffer.push(rand() % 1000);
cout << ">>> Produce " << buffer.back() << endl;
pthread_cond_signal(&has_product);
pthread_mutex_unlock(&lock);
}
}
int main(int argc, char *argv[])
{
pthread_t pid, cid;
srand(time(NULL));
pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);
pthread_join(pid, NULL);
pthread_join(cid, NULL);
return 0;
}
信号量Semaphore
Mutex变量是非0即1的,可看作一种资源的可用数量,初始化时Mutex是1,表示有一个可用资源,加锁时获得该资源,将Mutex减到0,表示不再有可用资源,解锁时释放该资源,将Mutex重新加到1,表示又有了一个可用资源。
信号量(Semaphore)和Mutex类似,表示可用资源的数量,和Mutex不同的是这个数量可以大于1。其相关操作函数是:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t * sem);
int sem_destroy(sem_t * sem);
这里以一个有限资源池的生产者和消费者的例子演示一下信号量的用法:
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>
#define NUM 5
int queue[NUM];
sem_t blank_number, product_number;
void *producer(void *arg)
{
int p = 0;
while (true)
{
sem_wait(&blank_number);
queue[p] = rand() % 1000 + 1;
printf(">>> Produce %d\n", queue[p]);
sem_post(&product_number);
p = (p+1) % NUM;
sleep(rand()%5);
}
}
void *consumer(void *arg)
{
int c = 0;
while (true)
{
sem_wait(&product_number);
printf("### Consume %d\n", queue[c]);
queue[c] = 0;
sem_post(&blank_number);
c = (c+1) % NUM;
sleep(rand() % 5);
}
}
int main(int argc, char *argv[])
{
pthread_t pid, cid;
sem_init(&blank_number, 0, NUM);
sem_init(&product_number, 0, 0);
pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);
pthread_join(pid, NULL);
pthread_join(cid, NULL);
sem_destroy(&blank_number);
sem_destroy(&product_number);
return 0;
}
读写锁(Reader-Writer Lock)
读写锁也是一种线程间的互斥操作,但和互斥锁不同的是,它分为读和写两种模式,其中写模式是独占资源的(和互斥锁一样),而读模式则是共享资源的,此时允许多个读者,从而提高并发性。
由于一时找不到合适的小例子来介绍它,并且读写锁的模型也是比较容易理解的,这里就不举例了。