====================================================================================
生产者消费者模型
-
模型:
简单来说就是有一个缓冲区,生产者往缓冲区里写数据,消费者从缓冲区拿走数据。当然这只是狭义上的对计算机而言的生产者消费者模型,其实这种模型适用于多种场景,生活中非常常见。 -
其遵循的原则
3种关系即:“消费者——消费者,生产者——生产者,消费者——生产者”关系,前两者为互斥关系,后一个为同步与互斥关系。
2个角色即:“生产者,消费者”。
1个交易场所即:“缓冲区”。
基于链表:
需要用到的工具
-
互斥锁
在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为” 互斥锁” 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。 -
互斥锁的创建和销毁
有两种方法创建互斥锁,分别为静态方式和动态方式。
1> POSIX定义了一个宏PTHREAD_MUTEX_INITIALIZER
来静态初始化互斥锁,方法如下:pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER
在Linux threads实现中,pthread_mutex_t
是一个结构,而PTHREAD_MUTEX_INITIALIZER
则是一个结构常量。
2>动态方式是采用pthread_mutex_init()
函数来初始化互斥锁,API定义如下:int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr)
其中mutexattr用于指定互斥锁属性(见下),如果为NULL则使用缺省属性。
3>pthread_mutex_destroy ()
用于注销一个互斥锁,API定义如下:int pthread_mutex_destroy(pthread_mutex_t *mutex)
销毁一个互斥锁即意味着释放它所占用的资源,且要求锁当前处于开放状态。由于在Linux中,互斥锁并不占用任何资源,因此LinuxThreads
中的pthread_mutex_destroy()
除了检查锁状态以外(锁定状态则返回EBUSY)没有其他动作。
条件变量的应用
条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使”条件成立”(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。条件变量类型为pthread_cond_t
。
-
条件变量的创建
条件变量和互斥锁一样,都有静态动态两种创建方式,
1,静态方式使PTHREAD_COND_INITIALIZER
常量,如下:pthread_cond_t cond=PTHREAD_COND_INITIALIZER
2,动态方式调用pthread_cond_init()
函数,API定义如下:int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr)
3,尽管POSIX标准中为条件变量定义了属性,但在LinuxThreads中没有实现,因此cond_attr值通常为NULL,且被忽略。
4,注销一个条件变量需要调用pthread_cond_destroy(),只有在没有线程在该条件变量上等待的时候才能注销这个条件变量,否则返回EBUSY。API定义如下:int pthread_cond_destroy(pthread_cond_t *cond)
。
其实现代码:#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
typedef int DataType;
typedef struct ListNode
{
DataType data;
struct ListNode* next;
}ListNode;
ListNode* mylist=NULL;
static int IsEmpty(ListNode* list)
{
if(list==NULL)
return 0;
else
return 1;
}
ListNode* BuyNode(DataType x)
{
ListNode* node=(ListNode*)malloc(sizeof(ListNode));
node->data=x;
node->next=NULL;
return node;
}
void PushFront(ListNode** list, DataType x)
{
if(IsEmpty(*list)){
ListNode* newNode = BuyNode(x);
newNode->next = *list;
*list=newNode;
}else{
*list=BuyNode(x);
}
return;
}
ListNode* PopFront(ListNode** list)
{
ListNode*next = NULL;
if(IsEmpty(*list)){
next=*list;
*list=next->next;
}
return next;
}
void PrintList(ListNode* list)
{
if(IsEmpty(list)){
while(list){
printf("%d ", list->data);
list=list->next;
}
printf("\n");
}
return;
}
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void *consume()
{
while(1){
pthread_mutex_lock(&lock);
ListNode *tmp = PopFront(&mylist);
while(tmp ==NULL)
{
pthread_cond_wait(&cond,&lock);
tmp = PopFront(&mylist);
}
printf("consume done...!%d\n", tmp->data);
sleep(2);
pthread_mutex_unlock(&lock);
pthread_cond_signal(&cond);
}
}
void* product()
{
int i=0;
while(1){
pthread_mutex_lock(&lock);
PushFront(&mylist, i);
printf("product done...!%d\n", i++);
pthread_cond_wait(&cond,&lock);
pthread_mutex_unlock(&lock);
pthread_cond_signal(&cond);
}
}
int main()
{
pthread_t c, p;
pthread_create(&c, NULL, consume, NULL);
pthread_create(&p, NULL, product, NULL);
pthread_join(c, NULL);
pthread_join(p, NULL);
return 0;
}
基于环形队列
用到的工具
-
多元信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);//初始化信号量成功返回0,失败返回-1
Link with -lrt or -pthread.//编译时需要链接pthread库
int sem_wait(sem_t *sem);//阻塞式等待,相当于P操作,申请资源
int sem_trywait(sem_t *sem);//非阻塞式等待,申请资源
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);//定时等待,每隔一段时间询问操作系统能不能申请到资源
Link with -lrt or -pthread.
Feature Test Macro Requirements for glibc (see feature_test_macros(7)):
sem_timedwait(): _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
模型如图:
其实现代码
#include<stdio.h>
#include<pthread.h>
#include <semaphore.h>
#define SIZE 32
int RingBuf[SIZE]={0};
int step=0;
sem_t semblank;
sem_t semdata;
void *product()
{
int i=0;
while(1){
sem_wait(&semblank);
RingBuf[step++]=i;
sem_post(&semdata);
printf("product done! %d\n", i++);
step%=32;
}
}
void *consume()
{
int i=0;
while(1){
sem_wait(&semdata);
printf("consume done !%d\n", RingBuf[step]);
sem_post(&semblank);
sleep(1);
}
}
void Destory()
{
sem_destroy(&semdata);
sem_destroy(&semblank);
printf("destory sem success!\n");
}
int main()
{
pthread_t c, p;
sem_init(&semblank, 0, SIZE);
sem_init(&semdata, 0 , 0);
pthread_create(&c, NULL, consume, NULL);
pthread_create(&p, NULL, product, NULL);
pthread_join(c, NULL);
pthread_join(p, NULL);
Destory();
return 0;
}
基于环形队列的多生产多消费这者模型
这里需要创建多个进程来模拟消费者与生产者,消费者与消费者之间,生产者与生产者是互斥关系,需要创建互斥锁来实现。基于单生产但消费代码的改动。
其代码如下
#include<stdio.h>
#include<pthread.h>
#include <semaphore.h>
#define SIZE 32
int RingBuf[SIZE]={0};
int step=0;
sem_t semblank;
sem_t semdata;
pthread_mutex_t lock1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lock2 = PTHREAD_MUTEX_INITIALIZER;
void *product2()
{
int i=0;
while(1){
pthread_mutex_lock(&lock1);
sem_wait(&semblank);
RingBuf[step++]=i;
sem_post(&semdata);
pthread_mutex_unlock(&lock1);
printf("product2 done! %d\n", i++);
step%=32;
}
}
void *product1()
{
int i=0;
while(1){
pthread_mutex_lock(&lock1);
sem_wait(&semblank);
RingBuf[step++]=i;
sem_post(&semdata);
pthread_mutex_unlock(&lock1);
printf("product1 done! %d\n", i++);
step%=32;
}
}
void *consume1()
{
int i=0;
while(1){
pthread_mutex_lock(&lock2);
sem_wait(&semdata);
printf("consume1 done !%d\n", RingBuf[step]);
sem_post(&semblank);
pthread_mutex_unlock(&lock2);
//sleep(2);
}
}
void *consume2()
{
while(1){
pthread_mutex_lock(&lock2);
sem_wait(&semdata);
printf("consume2 done !%d\n", RingBuf[step]);
sem_post(&semblank);
pthread_mutex_unlock(&lock2);
sleep(1);
}
}
void Destory()
{
sem_destroy(&semdata);
sem_destroy(&semblank);
printf("destory sem success!\n");
}
int main()
{
pthread_t c1, c2, p1, p2;
sem_init(&semblank, 0, SIZE);
sem_init(&semdata, 0, 0);
pthread_create(&c1, NULL, consume1, NULL);
pthread_create(&c2, NULL, consume2, NULL);
pthread_create(&p1, NULL, product1, NULL);
pthread_create(&p2, NULL, product2, NULL);
pthread_join(c1, NULL);
pthread_join(c2, NULL);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
Destory();
return 0;
}
总结
生产消费模型的关键在于对321原则的实践,其使用的工具有两种,信号量和互斥锁。这两种工具要搞清楚,之后就是分析生产者与消费者之间的关系,一一列举出来然后看哪些地方需要加锁。