生产者与消费者模型

时间:2022-06-06 17:38:35

====================================================================================

生产者消费者模型

  • 模型: 
    简单来说就是有一个缓冲区,生产者往缓冲区里写数据,消费者从缓冲区拿走数据。当然这只是狭义上的对计算机而言的生产者消费者模型,其实这种模型适用于多种场景,生活中非常常见。
  • 其遵循的原则 
    3种关系即:“消费者——消费者,生产者——生产者,消费者——生产者”关系,前两者为互斥关系,后一个为同步与互斥关系。 
    2个角色即:“生产者,消费者”。 
    1个交易场所即:“缓冲区”。

基于链表:

需要用到的工具

  • 互斥锁 
    在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为” 互斥锁” 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。
  • 互斥锁的创建和销毁 
    有两种方法创建互斥锁,分别为静态方式和动态方式。 
    1> POSIX定义了一个宏PTHREAD_MUTEX_INITIALIZER来静态初始化互斥锁,方法如下: 
    pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER 
    在Linux threads实现中,pthread_mutex_t是一个结构,而PTHREAD_MUTEX_INITIALIZER则是一个结构常量。 
    2>动态方式是采用pthread_mutex_init()函数来初始化互斥锁,API定义如下:
    int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr) 其中mutexattr用于指定互斥锁属性(见下),如果为NULL则使用缺省属性。 
    3> pthread_mutex_destroy ()用于注销一个互斥锁,API定义如下: int pthread_mutex_destroy(pthread_mutex_t *mutex) 销毁一个互斥锁即意味着释放它所占用的资源,且要求锁当前处于开放状态。由于在Linux中,互斥锁并不占用任何资源,因此LinuxThreads中的 pthread_mutex_destroy()除了检查锁状态以外(锁定状态则返回EBUSY)没有其他动作。

条件变量的应用

条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使”条件成立”(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。条件变量类型为pthread_cond_t

  • 条件变量的创建 
    条件变量和互斥锁一样,都有静态动态两种创建方式, 
    1,静态方式使PTHREAD_COND_INITIALIZER常量,如下:pthread_cond_t cond=PTHREAD_COND_INITIALIZER 
    2,动态方式调用pthread_cond_init()函数,API定义如下: 
    int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr) 
    3,尽管POSIX标准中为条件变量定义了属性,但在LinuxThreads中没有实现,因此cond_attr值通常为NULL,且被忽略。 
    4,注销一个条件变量需要调用pthread_cond_destroy(),只有在没有线程在该条件变量上等待的时候才能注销这个条件变量,否则返回EBUSY。API定义如下: 
    int pthread_cond_destroy(pthread_cond_t *cond)。 
    其实现代码:

    #include<stdio.h>
    #include<stdlib.h>
    #include<pthread.h>

    typedef int DataType;
    typedef struct ListNode
    {
    DataType data;
    struct ListNode* next;
    }ListNode;

    ListNode* mylist=NULL;

    static int IsEmpty(ListNode* list)
    {
    if(list==NULL)
    return 0;
    else
    return 1;
    }

    ListNode* BuyNode(DataType x)
    {
    ListNode* node=(ListNode*)malloc(sizeof(ListNode));
    node->data=x;
    node->next=NULL;
    return node;
    }

    void PushFront(ListNode** list, DataType x)
    {
    if(IsEmpty(*list)){
    ListNode* newNode = BuyNode(x);
    newNode->next = *list;
    *list=newNode;
    }else{
    *list=BuyNode(x);
    }
    return;
    }

    ListNode* PopFront(ListNode** list)
    {
    ListNode*next = NULL;
    if(IsEmpty(*list)){
    next=*list;
    *list=next->next;
    }
    return next;
    }

    void PrintList(ListNode* list)
    {
    if(IsEmpty(list)){
    while(list){
    printf("%d ", list->data);
    list=list->next;
    }
    printf("\n");
    }
    return;
    }


    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

    void *consume()
    {
    while(1){
    pthread_mutex_lock(&lock);
    ListNode *tmp = PopFront(&mylist);
    while(tmp ==NULL)
    {
    pthread_cond_wait(&cond,&lock);
    tmp = PopFront(&mylist);
    }
    printf("consume done...!%d\n", tmp->data);
    sleep(2);
    pthread_mutex_unlock(&lock);
    pthread_cond_signal(&cond);

    }
    }

    void* product()
    {
    int i=0;
    while(1){
    pthread_mutex_lock(&lock);
    PushFront(&mylist, i);
    printf("product done...!%d\n", i++);
    pthread_cond_wait(&cond,&lock);
    pthread_mutex_unlock(&lock);
    pthread_cond_signal(&cond);
    }
    }

    int main()
    {

    pthread_t c, p;
    pthread_create(&c, NULL, consume, NULL);
    pthread_create(&p, NULL, product, NULL);

    pthread_join(c, NULL);
    pthread_join(p, NULL);
    return 0;
    }

基于环形队列

用到的工具

  • 多元信号量

    #include <semaphore.h>
    int sem_init(sem_t *sem, int pshared, unsigned int value);//初始化信号量成功返回0,失败返回-1
    Link with -lrt or -pthread.//编译时需要链接pthread库
    int sem_wait(sem_t *sem);//阻塞式等待,相当于P操作,申请资源
    int sem_trywait(sem_t *sem);//非阻塞式等待,申请资源
    int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);//定时等待,每隔一段时间询问操作系统能不能申请到资源
    Link with -lrt or -pthread.
    Feature Test Macro Requirements for glibc (see feature_test_macros(7)):
    sem_timedwait(): _POSIX_C_SOURCE >
    = 200112L || _XOPEN_SOURCE >= 600

模型如图:

生产者与消费者模型

其实现代码

#include<stdio.h>
#include<pthread.h>
#include <semaphore.h>

#define SIZE 32

int RingBuf[SIZE]={0};
int step=0;
sem_t semblank;
sem_t semdata;


void *product()
{
int i=0;
while(1){
sem_wait(&semblank);
RingBuf[step++]=i;
sem_post(&semdata);
printf("product done! %d\n", i++);
step%=32;
}
}

void *consume()
{
int i=0;
while(1){
sem_wait(&semdata);
printf("consume done !%d\n", RingBuf[step]);
sem_post(&semblank);
sleep(1);
}
}

void Destory()
{
sem_destroy(&semdata);
sem_destroy(&semblank);
printf("destory sem success!\n");
}
int main()
{
pthread_t c, p;
sem_init(&semblank, 0, SIZE);
sem_init(&semdata, 0 , 0);

pthread_create(&c, NULL, consume, NULL);
pthread_create(&p, NULL, product, NULL);

pthread_join(c, NULL);
pthread_join(p, NULL);
Destory();
return 0;
}

基于环形队列的多生产多消费这者模型

这里需要创建多个进程来模拟消费者与生产者,消费者与消费者之间,生产者与生产者是互斥关系,需要创建互斥锁来实现。基于单生产但消费代码的改动。

其代码如下

#include<stdio.h>
#include<pthread.h>
#include <semaphore.h>

#define SIZE 32

int RingBuf[SIZE]={0};
int step=0;
sem_t semblank;
sem_t semdata;

pthread_mutex_t lock1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lock2 = PTHREAD_MUTEX_INITIALIZER;

void *product2()
{
int i=0;
while(1){
pthread_mutex_lock(&lock1);
sem_wait(&semblank);
RingBuf[step++]=i;
sem_post(&semdata);

pthread_mutex_unlock(&lock1);
printf("product2 done! %d\n", i++);
step%=32;
}
}

void *product1()
{
int i=0;
while(1){
pthread_mutex_lock(&lock1);
sem_wait(&semblank);
RingBuf[step++]=i;
sem_post(&semdata);
pthread_mutex_unlock(&lock1);
printf("product1 done! %d\n", i++);
step%=32;
}

}

void *consume1()
{
int i=0;
while(1){
pthread_mutex_lock(&lock2);
sem_wait(&semdata);
printf("consume1 done !%d\n", RingBuf[step]);
sem_post(&semblank);
pthread_mutex_unlock(&lock2);
//sleep(2);
}
}

void *consume2()
{
while(1){
pthread_mutex_lock(&lock2);
sem_wait(&semdata);
printf("consume2 done !%d\n", RingBuf[step]);
sem_post(&semblank);
pthread_mutex_unlock(&lock2);
sleep(1);
}
}

void Destory()
{
sem_destroy(&semdata);
sem_destroy(&semblank);
printf("destory sem success!\n");
}

int main()
{
pthread_t c1, c2, p1, p2;
sem_init(&semblank, 0, SIZE);
sem_init(&semdata, 0, 0);

pthread_create(&c1, NULL, consume1, NULL);
pthread_create(&c2, NULL, consume2, NULL);
pthread_create(&p1, NULL, product1, NULL);

pthread_create(&p2, NULL, product2, NULL);
pthread_join(c1, NULL);
pthread_join(c2, NULL);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
Destory();
return 0;
}

总结

生产消费模型的关键在于对321原则的实践,其使用的工具有两种,信号量和互斥锁。这两种工具要搞清楚,之后就是分析生产者与消费者之间的关系,一一列举出来然后看哪些地方需要加锁。