8.1线程同步2015/8/5

时间:2021-04-01 17:58:57

线程同步

多个线程同时访问共享数据是可能会冲突。比如两个线程都要把某个全局变量增加一,这个操作在某平台需要三条指令完成:
从内存读变量值到寄存器
寄存器的值加1
将寄存器的值写会内存
假设两个线程在多处理器平台上同时执行这三条指令,则可能导致,线程1和线程2同时从内存中把数据读取到寄存其中,并且各自加一之后又把放回内存中,此时,这个变量只加了一次,而非两次。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NLOOP 500000
int counter;

void *doit(void *);

int main(void)
{
pthread_t tid1, tid2;

pthread_create(&tid1, NULL, doit, NULL);
pthread_create(&tid2, NULL, doit, NULL);

pthread_join(tid1, NULL);
pthread_join(tid2, NULL);

return 0;
}

void *doit(void *arg)
{
int i;
int tmp;
for (i=0; i<NLOOP; i++) {
tmp = counter;
printf("counter = %d\n", tmp+1);
counter = (tmp + 1);
}
}

在上述例子中,可以说明线程对同一数据的读写问题,因为这个问题是是有几率出现的,所以在变量+1的时候又使用了一个变量,使得结果更明显,如果不这样,直接把counter++;这样出现的几率会变小,因为对内存的操作较少了,可是还会出现!

线程同步的原因

1.共享资源,多个线程都可对共享资源操作(全局变量等)
2.线程操作共享资源的先后顺序不确定
3.处理器对存储器的操作一般不是原子操作(处理器处理的最小单位,不会被其他进程打断而导致保存处理器现场的操作)
同步,就是当线程1去操作变量A,在操作A的代码前面加把锁,把当线程1运行这段代码的时候,把这把锁锁上,然后等线程1运行完操作变量A的代码之后,再解锁,这样就不会产生一个内存变量同时被几个线程操作的问题,

互斥量

原语

    #include <pthread.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //定义一个锁,PTHREAD_MUTEX_INITIALIZER是直接赋值初始化
pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);//这个是函数初始化,可以修改锁的属性,如果不修改,则设为NULL
pthread_mutex_destory(pthread_mutex_t *mutex);//释放锁,在锁使用完之后把所释放掉
pthread_mutex_lock(pthread_mutex_t *mutex);//得到这把锁,如果这把锁其他线程占用,那本线程就阻塞等待
pthread_mutex_trylock(pthread_mutex_t *mutex);//申请锁,成功返回0,失败返回错误码,但不会阻塞。
pthread_mutex_unlock(pthread_mutex_t *mutex);//解锁

利用互斥量锁修改之前的代码,把变量上锁

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NLOOP 5000
int counter;
pthread_mutex_t mutex; //定义互斥锁,要是全局变量,因为线程需要调用这个参数,也可以通过线程的传参传到线程内部

void *doit(void *);

int main(void)
{
pthread_t tid1, tid2;
pthread_mutex_init(&mutex, NULL);//初始化锁

pthread_create(&tid1, NULL, doit, NULL);
pthread_create(&tid2, NULL, doit, NULL);

pthread_join(tid1, NULL);
pthread_join(tid2, NULL);

pthread_mutex_destroy(&mutex);//锁用完,把锁释放掉
return 0;
}

void *doit(void *arg)
{
int i;
int tmp;
for (i=0; i<NLOOP; i++) {
pthread_mutex_lock(&mutex);//上锁
tmp = counter;
printf("counter = %d\n", tmp+1);
counter = (tmp + 1);
pthread_mutex_unlock(&mutex);//解锁
}
}

注意:上锁和解锁一定要把修改变量的代码全部包涵起来,不然还是会有问题。所以这个临界点一定要选好。

死锁

情景1:线程1获得锁a再去申请锁a

例子:

#include <stdio.h>
#include <pthread.h>

pthread_mutex_t mu1;

int counter;

void *do_thread(void *arg)
{
pthread_mutex_lock(&mu1);
printf("counter = %d\n", counter++);
pthread_mutex_lock(&mu1);
printf("counter = %d\n", counter--);
pthread_mutex_unlock(&mu1);
pthread_mutex_unlock(&mu1);
}

int main(void)
{
pthread_t tid;

pthread_mutex_init(&mu1, NULL);

pthread_create(&tid, NULL, do_thread, NULL);

pthread_join(tid, NULL);

pthread_mutex_destroy(&mu1);

return 0;
}
情景2:2.线程一拥有A锁,请求获得B锁;线程二拥有B锁,请求获得A锁
线程1和2,全局变量m,n,要是想访问m,n就要得到锁一锁二;线程1访问m,得到锁1,再访问n,得到锁二,线程2访问n,得到锁二,再访问m,得到锁1,但是当线程1访问m,在想访问n的时候这个时候就会阻塞等待,而线程2访问n再想访问m的时候也要阻塞等待,这个时候就产生了死锁

例子:

#include <stdio.h>
#include <pthread.h>

pthread_mutex_t mu1 = PTHREAD_MUTEX_INITIALIZER; //锁一
pthread_mutex_t mu2;//锁二

int counter1; //m
int counter2; //n

void *do_thread1(void *);
void *do_thread2(void *);

int main(void)
{
pthread_t tid1, tid2;

pthread_mutex_init(&mu2, NULL);

pthread_create(&tid1, NULL, do_thread1, NULL);
pthread_create(&tid2, NULL, do_thread2, NULL);

pthread_join(tid1, NULL);
pthread_join(tid2, NULL);

pthread_mutex_destroy(&mu1);
pthread_mutex_destroy(&mu2);
}

void *do_thread1(void *arg)
{
while(1) {
pthread_mutex_lock(&mu1);
printf("counter1 = %d, tid = %x\n", counter1++, (int)pthread_self());

pthread_mutex_lock(&mu2);
printf("counter2 = %d, tid = %x\n", counter2++, (int)pthread_self());
pthread_mutex_unlock(&mu1);
pthread_mutex_unlock(&mu2);
}
}

void *do_thread2(void *arg)
{
while(1) {
pthread_mutex_lock(&mu2);
printf("counter2 = %d\n", counter2++);

pthread_mutex_lock(&mu1);
printf("counter1 = %d\n", counter1++);
pthread_mutex_unlock(&mu2);
pthread_mutex_unlock(&mu1);
}
}

读写锁

为了提高程序的并发性,操作全局变量的时候又分为读写锁,读时锁不阻塞,写的时候阻塞,读写锁不能同时生效。并不是两个锁,而是一个锁有两个属性。
一个线程持有读锁,第二个线程请求写锁,第三个线程请求读锁,执行顺序是读写读
一个线程持有写锁,第二个线程请求读锁,第三个线程请求写锁,执行顺序是写写读
写锁的优先级比读锁高

原语

pthread_rwlock_t    //创建锁
pthread_rwlock_init //初始化锁
pthread_rwlock_destroy //销毁锁
pthread_rwlock_rdlock //申请读锁,阻塞
pthread_rwlock_wrlock //申请写锁,阻塞
pthread_rwlock_tryrdlock //尝试申请读锁,出错返回不阻塞
pthread_rwlock_trywrlock //尝试申请写锁,出错返回不阻塞
pthread_rwlock_unlock //解锁

例子:3个线程不定时写同一全局资源,5个线程不定时读同一全局资源

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

int counter; //全局变量
pthread_rwlock_t rwlock; //定义锁

void *th_write(void *arg) //写线程
{
int t;
while (1) {
pthread_rwlock_wrlock(&rwlock); //申请写锁
t = counter;
usleep(100);
printf("write %x: counter=%d ++counter=%d\n", (int)pthread_self(), t, ++counter);
pthread_rwlock_unlock(&rwlock); //释放写锁
usleep(1000);
}
}

void *th_read(void *arg) //读线程
{
while(1) {
pthread_rwlock_rdlock(&rwlock); //申请读锁
printf("read %x: %d\n", (int)pthread_self(), counter);
pthread_rwlock_unlock(&rwlock); //释放线程
usleep(100);
}
}

int main(void)
{
int i;
pthread_t tid[8];
pthread_rwlock_init(&rwlock, NULL); //初始化锁

for (i=0; i<3; i++) //3个写线程
pthread_create(&tid[i], NULL, th_write, NULL);
for (i=3; i<8; i++) //5个读线程
pthread_create(&tid[i], NULL, th_read, NULL);

pthread_rwlock_destroy(&rwlock); //销毁锁

for(i=0; i<8; i++) //回收线程
pthread_join(tid[i], NULL);

return 0;
}

条件变量

条件变量给多个线程提供了一个汇合的场所,一般和互斥锁结合使用

经典应用场景:生产者消费者模型
8.1线程同步2015/8/5
生产者生产产品,消费者消费产品,汇合点在商场。生产者生产产品放到商场,消费者从商场消费产品
生产者是一个线程,消费者是一个线程,这两个线程在并发情况下执行。当商场没有产品的时候,消费者去消费产品,会被阻塞,知道生产者生产商品放到商场里之后,再去唤醒消费者,让消费者去商场消费产品
如果有两个消费者,都去没有产品的商场里去消费,那都会被阻塞,等待生产者把商品放到商场之后唤醒
那如何去唤醒消费者,就需要条件变量,当消费者去没有商品的商场去消费时会被阻塞,会阻塞在条件变量上,当生产者把产品放到商场之后,回去唤醒条件变量,那阻塞在条件变量上的消费者都会被唤醒,这时候就会往下执行,去消费产品,在唤醒条件变量上又有两种方式,第一种是只唤醒一个阻塞在条件变量上的消费者,其他消费者继续阻塞,唤醒的去消费商品,第二种是唤醒全部阻塞在条件变量上的消费者,都去消费产品。
但是商品具有全局性,不能同时被多个消费者消费,所以在商品上要上把互斥锁,所以消费者在被唤醒之后还有去得到互斥锁,只有得到互斥锁的消费者才可以去消费商品,没有得到的在申请锁的时候阻塞;同理,生产者在去把商品放进商场里的时候也要申请互斥锁才能把商品放到商场中,所以生产者也要去申请互斥锁
流程:消费者去消费产品,商场没有产品,阻塞在条件变量上,生产者申请互斥锁,把生产的产品放到商场里,然后去唤醒阻塞在条件变量上的消费者,然后消费者去申请互斥锁,申请到的去消费产品,没申请到的阻塞,把商品消费完之后在阻塞在条件变量上,在消费者消费产品的时候生产者也会阻塞在申请互斥锁上

条件变量控制原语:
pthread_cond_t //创建一个条件变量
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr); //初始化
int pthread_cond_destroy(pthread_cond_t *cond); //销毁
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex); //等待,当消费者发现没有商品,会阻塞在这上面
//阻塞在变量上,释放互斥锁,等待变量唤醒
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime); //这个是可以设置等待时间,如果超时返回错误号
int pthread_cond_signal(pthread_cond_t *cond); //唤醒一个阻塞在wait上的线程
int pthread_cond_broadcast(pthread_cond_t *cond); //唤醒所有阻塞在wait上的线程

例子:把链表的元素作为一个产品,充分利用了链表元素增加的便利以及长度的不可预测性,

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>

struct msg {
struct msg *next;
int num;
};

struct msg *head; //定义一个全局的结构体指针
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER; //变量
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; //互斥锁

void *consumer(void *p)
{
struct msg *mp;

for(;;) {
pthread_mutex_lock(&lock); //首先获得商品的锁,
while (head == NULL) //看商场中是否有产品,因为链表,如果头元节点为NULL,就是没有产品,当不为空是就说明有产品,就开始消费产品。
pthread_cond_wait(&has_product, &lock);//没有产品,那就在阻塞在变量上,并且把商品的锁释放掉,一遍生产者往商场中生产产品
mp = head;
head = mp->next;//得到产品,并且把商场中的对应的产品删除
pthread_mutex_unlock(&lock); //释放互斥锁
printf("Consume %d\n", mp->num);
free(mp);//使用完产品,把产品释放掉。
sleep(rand()%5);
}
}

void *producer(void *p)
{
struct msg *mp;
for (;;) {
mp = malloc(sizeof(struct msg)); //创建一个产品空间
mp->num = rand()%1000+1; //生产产品
printf("Produce %d\n", mp->num);
pthread_mutex_lock(&lock); //把产品放到商店里面,因为涉及到商店里产品的更改,所以要上锁
mp->next = head; //利用头插法把产品放到全局链表节点中
head = mp;
pthread_mutex_unlock(&lock);//操作完就解锁
pthread_cond_signal(&has_product);//然后唤醒一个阻塞在变量上的消费者
sleep(rand()%5);//挂起
}
}

int main(void)
{
pthread_t pid, cid;

srand(time(NULL));
pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);
pthread_join(pid, NULL);
pthread_join(cid, NULL);

pthread_cond_destroy(&has_product);
return 0;
}

信号量

互斥量的锁只有上锁可解锁,并且只有一把锁,而信号量有好几把,可以自己设定,并且也不是通过释放锁来更改阻塞,而是通过把信号量加1来唤醒则塞。
可以简单的理解为互斥锁是一把锁,一把钥匙,而信号量可以有一把锁,多把钥匙,并且要钥匙的个数增加

原语

include <semaphore.h>
sem_t
int sem_init(sem_t *sem, int pshared, unsigned int value); //初始化,可以设定数量
int sem_wait(sem_t *sem); //如果sem中的信号量大于0,就减一,向下执行,如果等于0就阻塞
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
int sem_post(sem_t *sem); //把信号量加1,没有上限
int sem_destroy(sem_t *sem); //销毁信号量

例子:

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <semaphore.h>

#define NUM 5
int queue[NUM];
sem_t blank_number, product_number; //信号量

void *producer(void *arg)
{
int p = 0;
while (1) {
sem_wait(&blank_number); //如果blank_number大于0,就减一,如果等于0就阻塞
queue[p] = rand() % 1000 + 1; //生产商品,并放入商场
printf("Profuce %d\n", queue[p]);
sem_post(&product_number); //把producr_number的信号量加1
p = (p+1)%NUM; //向下以一个格子,继续生产
//sleep(rand()%5);
}
}

void *consumer(void *arg)
{
int c = 0;
while (1) {
sem_wait(&product_number); //生产者生产一个产品,并把这个信号量加1
printf("Consume %d\n", queue[c]);
queue[c] = 0; //消费
sem_post(&blank_number); //把这个格子的产品消费完,就解锁一个信号量
c = (c+1)%NUM;
sleep(rand()%5);
}
}

int main(void)
{
pthread_t pid, cid;
srand(time(NULL));

sem_init(&blank_number, 0, NUM); //初始化信号量,并设置信号量的个数
sem_init(&product_number, 0, 0);

pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);

pthread_join(pid, NULL);
pthread_join(cid, NULL);

sem_destroy(&blank_number); //销毁
sem_destroy(&product_number);

return 0;
}

进程间锁

用的是互斥锁,通过修改互斥锁的属性来实现进程间的锁

include <pthread.h>
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
pshared:
线程锁:PTHREAD_PROCESS_PRIVATE
进程锁:PTHREAD_PROCESS_SHARED
默认情况是线程锁

例子:是把互斥量和互斥量的属性都放到mmap中去,这样其他的进程才有机会访问到,不然进程之间是读是共享,写时复制,只要锁的状态以改变,就会出问题

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <string.h>

struct mt {
int num;
pthread_mutex_t mutex;
pthread_mutexattr_t mutexattr;
};

int main(void)
{
int fd, i;
struct mt *mm;
pid_t pid;
fd = open("mt_text", O_CREAT | O_RDWR, 0777);
ftruncate(fd, sizeof(*mm)); //把文件拓展sizeof(*mm)这么大的空间
mm = mmap(NULL, sizeof(*mm), PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); //把文件映射到内存中
close(fd);

pthread_mutexattr_init(&mm->mutexattr); //初始化互斥量的属性

pthread_mutexattr_setpshared(&mm->mutexattr, PTHREAD_PROCESS_SHARED); //把互斥量属性设置成进程间共享

pthread_mutex_init(&mm->mutex, &mm->mutexattr); //初始化互斥量

pid = fork();
if (pid == 0) {
for (i=0; i<10; i++) {
pthread_mutex_lock(&mm->mutex); //申请锁
(mm->num)++;
printf("mun++:%d\n", mm->num);
pthread_mutex_unlock(&mm->mutex);//释放
sleep(1);
}
}
else if (pid > 0) {
for (i=0; i<10; i++) {
pthread_mutex_lock(&mm->mutex);//申请锁
mm->num += 2;
printf("mm->num+=:%d\n", mm->num);
pthread_mutex_unlock(&mm->mutex);//释放
sleep(1);
}
wait(NULL);
}

pthread_mutex_destroy(&mm->mutex);
pthread_mutexattr_destroy(&mm->mutexattr);

munmap(mm, sizeof(*mm));
unlink("mt_test");

return 0;
}

文件锁

使用fcntl提供文件锁
读共享,写独占

struct flock {
....
short l_type; /* Type of lock: F_RDLOCK, F_WRLOCK, F_UNLOCK */锁的类型
short l_whence; /* How to interpret l_start: SEEK_SET, SEEK_CUR, SEEK_END */基准位置
off_t l_start; /* starting offset for lock */偏置
pid_t l_pid; /* PID of process blocking our lock (F_GETLK only) */
};

例子:

#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>

void sys_err(char *str)
{
perror(str);
exit(1);
}

int main(int argc, char *argv[])
{
int fd;
struct flock f_lock;
if (argc < 2) {
printf("./a.out filename\n");
exit(1);
}

if ((fd = open(argv[1], O_RDWR)) < 0)
sys_err("open");

f_lock.l_type = F_WRLCK;
//f_lock.l_type = F_RDLCK;
f_lock.l_whence = SEEK_SET;
f_lock.l_start = 0;
f_lock.l_len = 0; //0代表整个文件加锁

fcntl(fd, F_SETLKW, &f_lock);
printf("get flock\n");
sleep(10);
f_lock.l_type = F_UNLCK;
fcntl(fd, F_SETLKW, &f_lock);
printf("un flock\n");

close(fd);
return 0;
}