一个关于生产者-消费者的问题

时间:2021-10-31 19:35:54
这是一个关于生产者-消费者的问题。程序要实现的目的描述如下:

1、有一个生产者生产产品
2、有一个或多个消费者消费产品
3、生产者只有等到所有消费者都准备好后才开始生产
4、保持生产能力与消费能力的平衡

第3点的是由WaitAllThreads函数实现的。
在运行过程中发现,有时是生产者一直获得互斥,有时是消费者一直获得互斥。我觉得第4点的实现有点问题。大家一起讨论一下吧。
代码如下(本来想上传整个工程的,发现上传不了),在VC6.0里编译:

#include <windows.h>
#include <process.h> 
#include <iostream>
#include <conio.h>
using namespace std;

#define MAXNUMBER 4294967695
#define GOODSLIM  5

/* 存储系统信息 被所有线程共享 [hongqun 2008-05-10] */
typedef struct system_msg
{
volatile DWORD nThread;           /* 线程总数 */
volatile DWORD nCount;  /* 当前线程数 */
volatile bool bReady;  /* 标识所有线程是否已经准备好了 */
volatile DWORD nGoods;             /* 剩余产品总数 */
    
volatile DWORD nAllGoods;        /* 生产的能力 */
volatile DWORD nAllConsume;      /* 消费的能力 */

    volatile DWORD Psleep;           /* 生产者等待的时间 */
volatile DWORD Csleep;           /* 消费者等待的时间 */


HANDLE mguard;  /* 互斥标记 */
HANDLE mReady;                   /* 全部线程准备好 */
}SYSTEM_MSG,*pSYSTEM_MSG;

typedef struct consumer_msg
{
SYSTEM_MSG *psys;
DWORD iThread;                 /* 消费者线程的编号 */  
}CONSUMER_MSG;


UINT WINAPI producer(void *psys)
{
SYSTEM_MSG * p = (SYSTEM_MSG*)psys;
    long v;
while(1)
{       
if(p->nAllGoods == MAXNUMBER)
   {
   p->nAllGoods = 0;
   p->nAllConsume = 0;
   }
v = p->nAllGoods - p->nAllConsume;
if(v > GOODSLIM)                       /* 起平衡的作用 */
{                                      /* 过剩 */
p->Psleep++;                      
p->Csleep--;
    Sleep(p->Psleep);
}
else if(v < -GOODSLIM)
{
p->Psleep--;
p->Csleep++;
if(p->Psleep > 0)
 Sleep(p->Psleep);
else
     ;
}
        else
{
Sleep(p->Psleep);
}
        
WaitForSingleObject(p->mguard,INFINITE);

cout << "Psleep:" << p->Psleep << endl;
p->nAllGoods++;            /* 生产能力加一 */
     

if(p->bReady)              /* 消费者线程是否都准备好 */
{
p->nGoods++;           /* 生产一个产品 */
cout << "Now There are " << p->nGoods << " goods." << endl;
}
else
{
cout << "Create consumers haven't completed, Wait..." << endl;
}   
ReleaseMutex(p->mguard);
}
return 0;
}


UINT WaitAllThreads(SYSTEM_MSG *p)
{
WaitForSingleObject(p->mguard,INFINITE);
   p->nCount++;          /* 当前线程总数加一 */
   
   while(p->nCount < p->nThread)
   {
   ReleaseMutex(p->mguard);
   WaitForSingleObject(p->mReady,INFINITE);
   WaitForSingleObject(p->mguard,INFINITE);
   }
        
   if(!p->bReady)
   {
   p->bReady = TRUE;     /* 标记全部线程建立完成 */
   }

       SetEvent(p->mReady);

    ReleaseMutex(p->mguard);

return 0;
}



UINT WINAPI consumer(void *pcon)
{
CONSUMER_MSG *consumer = (CONSUMER_MSG *)pcon;
SYSTEM_MSG *p = consumer->psys;
    const DWORD iThread = consumer->iThread;
    long v;
WaitAllThreads(p);         /* 等待所有线程建立 */


while(1)
{  
if(p->nAllConsume == MAXNUMBER)
  {
 p->nAllGoods = 0;
 p->nAllConsume = 0;
  }

v =  p->nAllConsume - p->nAllGoods;
if(v > GOODSLIM)                   /* 起平衡的作用 */
{
p->Csleep++;
p->Psleep--;
Sleep(p->Csleep);
}
else if(v < -GOODSLIM)
{
p->Csleep--;
p->Psleep++;                    /* 过剩 */
if(p->Csleep > 0)
 Sleep(p->Csleep);
else
  ;
}
else
{
Sleep(p->Csleep);
}


WaitForSingleObject(p->mguard,INFINITE);

cout << "Csleep : "<<p->Csleep << endl;

p->nAllConsume++;               /* 消费能力加一 */

  if(p->nGoods > 0)
  {
  p->nGoods--;                /* 消费 */
  cout << "Thread[" << iThread << "] Consume a Goods." << endl;
  }
  else
  {
  cout << "These is not any goods, wait..." << endl;
  }
  
  //getch();
ReleaseMutex(p->mguard);
}
return 0;
}

int main(int argc, char *argv[])
{

HANDLE producer_h, *consumer_h;
DWORD nThread;
SYSTEM_MSG *psys;
    CONSUMER_MSG *pcon;

cout << "input the thread count:";
cin >> nThread;

/* 初始化 */
psys =(SYSTEM_MSG *)malloc(sizeof(SYSTEM_MSG));
pcon =(CONSUMER_MSG*)malloc(sizeof(CONSUMER_MSG));

psys->bReady = FALSE;
psys->nCount = 0;
psys->nThread = nThread;
psys->nGoods  = 0;
psys->nAllGoods = 0;
psys->nAllConsume = 0;
psys->Psleep = 0;
psys->Csleep = 0;
psys->mguard = CreateMutex(NULL,FALSE,NULL);
psys->mReady = CreateEvent(NULL,FALSE,FALSE,NULL);

pcon->psys = psys;

consumer_h = (HANDLE*)malloc(nThread*sizeof(HANDLE));


/* 建立线程 */
producer_h = (HANDLE)_beginthreadex(NULL,0,producer,(void *)psys,0,NULL);
if(producer_h)
{
cout << "Producer has been created! " << endl; 
}

for(int i=0; i<nThread; i++)
{
pcon->iThread = i + 1;
consumer_h[i] = (HANDLE)_beginthreadex(NULL,0,consumer,(void *)pcon,0,NULL);
if(consumer_h[i])
{
cout << "The Thread [" << i+1 << "] has been created!" << endl;
}
else
{
cout << "Create thread [" << i+1 << "] fail!" << endl;
}
}
    
while(1)
{
Sleep(10);
}

return 0;
}

3 个解决方案

#1


至于你说的平衡问题,首先要给出平衡的定义,否则没法说
说你程序另外一个问题,这样的实现最大的问题是:锁获得顺序可能发生颠倒,从而导致死锁。如果你要获得多个锁,必须使用同时获得的方法(如WaitForMultipleObjects),而不能一一获得。

举个例子,假如我们有个对象,支持Lock和Unlock方法,那么下面的代码是不是有问题?


void Swap(TYPE& a, TYPE & b)
{
   TYPE tmp;
   a.Lock();
   b.Lock();
   tmp =a;
   a=b;
   b= tmp;
   b.Unlock();
   a.Unlock();
}


这个例子就有显然的问题,假如我们在两个线程中分别执行下面语句:
Swap(a,b)            Swap(b,a)

则执行过程可能是:
 a.Lock()                            线程1首先执行
                        b.Lock()     线程2此时获得控制权,执行b.Lock
此后,线程1要求获得b.Lock的控制权,而线程2要获得a.Lock控制权,就是一个死锁

所以只要你获得多个锁,就不能分别获得,而必须一次获得,要么成功,要么都失败
        

#2


我这里的“平衡”的目的是让生产能力和消费能力的差别控制在一定的范围内,我这里假定每一次获得互斥就拥有生产能力或消费能力。

至于你说的获得多个锁的所带来的问题,我觉得在这里并不存在,
UINT WaitAllThreads(SYSTEM_MSG *p) 

WaitForSingleObject(p->mguard,INFINITE); 
   p->nCount++;          /* 当前线程总数加一 */ 
    
   while(p->nCount  < p->nThread) 
   { 
   ReleaseMutex(p->mguard); 
   WaitForSingleObject(p->mReady,INFINITE); 
   WaitForSingleObject(p->mguard,INFINITE); 
   } 
         
   if(!p->bReady) 
   { 
   p->bReady = TRUE;     /* 标记全部线程建立完成 */ 
   } 

       SetEvent(p->mReady); 

    ReleaseMutex(p->mguard); 

return 0; 

第9和第10行里这个程序中仅有的申请多个锁的地方。但是这个函数是等待其它消费者线程的建立,只要全部消费者线程建立完成后,就通过       
SetEvent(p->mReady); 让其它(某一个)消费者线程执行下面的语句,而那一个线程又通过SetEvent(p->mReady); 让另外一个消费者
线程执行下面的语句... ...
不知我说的是否足够明白?

#3


   WaitForSingleObject(p->mReady,INFINITE); 
   WaitForSingleObject(p->mguard,INFINITE); 
用一个WaitForMultipleObjects代替比较好,否则容易死锁。

#1


至于你说的平衡问题,首先要给出平衡的定义,否则没法说
说你程序另外一个问题,这样的实现最大的问题是:锁获得顺序可能发生颠倒,从而导致死锁。如果你要获得多个锁,必须使用同时获得的方法(如WaitForMultipleObjects),而不能一一获得。

举个例子,假如我们有个对象,支持Lock和Unlock方法,那么下面的代码是不是有问题?


void Swap(TYPE& a, TYPE & b)
{
   TYPE tmp;
   a.Lock();
   b.Lock();
   tmp =a;
   a=b;
   b= tmp;
   b.Unlock();
   a.Unlock();
}


这个例子就有显然的问题,假如我们在两个线程中分别执行下面语句:
Swap(a,b)            Swap(b,a)

则执行过程可能是:
 a.Lock()                            线程1首先执行
                        b.Lock()     线程2此时获得控制权,执行b.Lock
此后,线程1要求获得b.Lock的控制权,而线程2要获得a.Lock控制权,就是一个死锁

所以只要你获得多个锁,就不能分别获得,而必须一次获得,要么成功,要么都失败
        

#2


我这里的“平衡”的目的是让生产能力和消费能力的差别控制在一定的范围内,我这里假定每一次获得互斥就拥有生产能力或消费能力。

至于你说的获得多个锁的所带来的问题,我觉得在这里并不存在,
UINT WaitAllThreads(SYSTEM_MSG *p) 

WaitForSingleObject(p->mguard,INFINITE); 
   p->nCount++;          /* 当前线程总数加一 */ 
    
   while(p->nCount  < p->nThread) 
   { 
   ReleaseMutex(p->mguard); 
   WaitForSingleObject(p->mReady,INFINITE); 
   WaitForSingleObject(p->mguard,INFINITE); 
   } 
         
   if(!p->bReady) 
   { 
   p->bReady = TRUE;     /* 标记全部线程建立完成 */ 
   } 

       SetEvent(p->mReady); 

    ReleaseMutex(p->mguard); 

return 0; 

第9和第10行里这个程序中仅有的申请多个锁的地方。但是这个函数是等待其它消费者线程的建立,只要全部消费者线程建立完成后,就通过       
SetEvent(p->mReady); 让其它(某一个)消费者线程执行下面的语句,而那一个线程又通过SetEvent(p->mReady); 让另外一个消费者
线程执行下面的语句... ...
不知我说的是否足够明白?

#3


   WaitForSingleObject(p->mReady,INFINITE); 
   WaitForSingleObject(p->mguard,INFINITE); 
用一个WaitForMultipleObjects代替比较好,否则容易死锁。