c++多线程生产者与消费者问题代码

时间:2021-07-11 00:37:36

1.目录结构

c++多线程生产者与消费者问题代码

2.代码

BufferPool.h

#ifndef BUFFERPOOL
#define BUFFERPOOL
#include<windows.h>
#include<vector>
#include<iostream>
#include<string>
#define SIZE_OF_BUFFER 4
using namespace std;
//控制参数类
class BufferPool
{
public:
	static BufferPool * GetInstance();
	
	void Produce();
	void Consume();
	void Product_Sum();

	//多线程互斥信号量
	HANDLE mutex;
	HANDLE mutex2;

	//多线程同步信号量
	//空缓冲区
	HANDLE m_hSemaphoreBufferEmpty;
	//非空缓冲区
	HANDLE m_hSemaphoreBufferFull;

	unsigned int produce_sum;            //the total produce number
	unsigned int consume_sum;            //the total consume number

private:

	unsigned short in;                   //the mark of position entering the space 
	unsigned short out;                  //the mark of position leaving the space
	unsigned int Product_ID;
	unsigned int Consume_ID;

	static BufferPool *m_Instance;
	int buffer[SIZE_OF_BUFFER];          //缓冲池

	BufferPool()
	{
		int i = 0;
		in = out = 0;
		produce_sum = consume_sum = 0;
		Product_ID = Consume_ID = 0;
		for(;i < SIZE_OF_BUFFER; i++)
			buffer[i] = 0;
	
	};

};
#endif
Consume.h

#ifndef CONSUME
#define CONSUME
#include "BufferPool.h"
#include<windows.h>
#include<vector>
#include<iostream>
#include<string>
class Consume
{
public:
Consume(){};
~Consume(){};
//消费产品
DWORD WINAPI ConsumeProduct(LPVOID lpParam);
};
#endif

Product.h

#ifndef PRODUCT
#define PRODUCT
#include "BufferPool.h"
#include<windows.h>
#include<vector>
#include<iostream>
#include<string>
class Product
{
public:
Product(){};
~Product(){};
//消费产品
DWORD WINAPI producer(LPVOID lpParam);
};
#endif

BufferPool.cpp

#include "BufferPool.h"

//---------------------------------------------------------------------------

//---------------------------------------------------------------------------
//unsigned short in, out;
//in = out = 0;
//获取单例实例
BufferPool * BufferPool::m_Instance = new BufferPool();

BufferPool * BufferPool::GetInstance()
{
if(NULL == m_Instance)
{
m_Instance = new BufferPool();
}
return  m_Instance;
}

void BufferPool::Produce()
{
 int i;
 std::cout<<"produce";
 //if(Product_ID>=10)
 // Product_ID=0;
 Product_ID++;
 produce_sum++;
 buffer[in]=Product_ID;
 printf(" buffer[%d]=%d    ",in,Product_ID);
 in=(in+1)%SIZE_OF_BUFFER;
    Product_Sum();
}

void BufferPool::Consume()
{
 int i;
 std::cout<<"consume";
 consume_sum++;
 Consume_ID=buffer[out];
 printf(" buffer[%d]=%d    ",out,Consume_ID);
 buffer[out]=0;
 out=(out+1)%SIZE_OF_BUFFER;
 Product_Sum();
}

void BufferPool::Product_Sum()
{
 int i,sum=0;
 for(i=0;i<SIZE_OF_BUFFER;i++)
 {
  if(buffer[i]!=0)
   sum++;
 }
// std::cout<<"  "<<sum<<"         ";
 //for(i=0;i<SIZE_OF_BUFFER;i++)
 //{
 // std::cout<<buffer[i]<<" ";
 //}
 printf("   %d\n", sum);
}

consume.cpp

#include "Consume.h"
//---------------------------------------------------------------------------

DWORD WINAPI Consume::ConsumeProduct(LPVOID lpParam)
{
	while (true)
	{
		//等待非空的缓冲区出现,消费者进行消费,直到事件值为0
		WaitForSingleObject(BufferPool::GetInstance()->m_hSemaphoreBufferFull, INFINITE);
		WaitForSingleObject(BufferPool::GetInstance()->mutex2,INFINITE);                //the mutex P operation
		BufferPool::GetInstance()->Consume();
		Sleep(1000);
	    ReleaseMutex(BufferPool::GetInstance()->mutex2);                                //resource semaphore P operation
		ReleaseSemaphore(BufferPool::GetInstance()->m_hSemaphoreBufferEmpty,1,NULL);            //the mutex P operation
	}
	return 0;
}

Main_prg.cpp

#include<stdlib.h>
#include<windows.h>
#include "BufferPool.h"
#include "Product.h"
#include "Consume.h"

typedef DWORD (WINAPI *pTheadFunc)(LPVOID lpParam);

const unsigned short p_count=5;        //the number of produce one time
const unsigned short c_count=5;        //the number of consumer one time
const unsigned short s_count=p_count+c_count;  //the sum number of threads
bool control = true;
HANDLE threads[s_count];               //the handle of every thread
DWORD Producer_threadID[p_count];            //the mark of producer thread
DWORD Consumer_threadID[c_count];            //the mark of consumer thread

void Create_P_Threads();
void Create_C_Threads();

void Create_P_Threads()                                  //create producer thread
{
	for(int i=0;i<p_count;i++)
	{
		 DWORD (WINAPI Product::*pProductPoint)(LPVOID lpParam) = &Product::producer;
		 int* ptemp= (int*)&pProductPoint;
		 DWORD (WINAPI *pProductThread)(LPVOID lpParam) = *(pTheadFunc*)ptemp;
		 //生产者线程
		 threads[i]=CreateThread(NULL,0,pProductThread,NULL,0,&Producer_threadID[i]);
		 if(threads[i]==NULL)
			control=false;
	}
}


void Create_C_Threads()
{
	for(int i=p_count;i<s_count;i++)
	{

		DWORD (WINAPI Consume::*pConsumePoint)(LPVOID lpParam) = &Consume::ConsumeProduct;
		int* ptemp= (int*)&pConsumePoint;
		DWORD (WINAPI *pConsumeThread)(LPVOID lpParam) = *(pTheadFunc*)ptemp;
		//消费者线程
		threads[i]=CreateThread(NULL,0,pConsumeThread,NULL,0,&Consumer_threadID[i-p_count]);
		if(threads[i]==NULL)
			control=false;
	 }
}


void info()
{
	std::cout<<"\n"<<std::endl;
	std::cout<<"produce/consume    used_total  buffer_state(from 0 to 9)"<<std::endl;
}


int main()
{
 info();
 BufferPool::GetInstance()->mutex=CreateMutex(NULL,FALSE,NULL);
 BufferPool::GetInstance()->mutex2=CreateMutex(NULL,FALSE,NULL);
 BufferPool::GetInstance()->m_hSemaphoreBufferFull=CreateSemaphore(NULL,0,SIZE_OF_BUFFER,NULL);
 BufferPool::GetInstance()->m_hSemaphoreBufferEmpty=CreateSemaphore(NULL,SIZE_OF_BUFFER,SIZE_OF_BUFFER,NULL);
 Create_C_Threads();
 Create_P_Threads();
 
	 while(control)
	 {
		  if(getchar())
		  {
			   std::cout<<std::endl;
			   std::cout<<"the total produce product number is "<<BufferPool::GetInstance()->produce_sum<<std::endl;
			   std::cout<<"the total consume product number is "<<BufferPool::GetInstance()->consume_sum<<std::endl;
			   break;
		  }
	 }
	 return 0;
}

Product.cpp

#include "Product.h"
//---------------------------------------------------------------------------
DWORD WINAPI  Product::producer(LPVOID lpParam)
{
	while (true)
	{
		//等待非空的缓冲区出现,消费者进行消费,直到信号量值为0
		WaitForSingleObject(BufferPool::GetInstance()->m_hSemaphoreBufferEmpty, INFINITE);
		WaitForSingleObject(BufferPool::GetInstance()->mutex,INFINITE);                //the mutex P operation
		BufferPool::GetInstance()->Produce();
		Sleep(1000);
		ReleaseMutex(BufferPool::GetInstance()->mutex);                                //resource semaphore P operation
		ReleaseSemaphore(BufferPool::GetInstance()->m_hSemaphoreBufferFull,1,NULL);            //the mutex P operation
	}
	return 0;
}

运行效果

c++多线程生产者与消费者问题代码