OpenMP实现生产者消费者模型

时间:2022-01-24 02:53:07

生产者消费者模型已经很古老了吧,最近写了个OpenMP版的此模型之实现,来分享下。

先说一下模型的大致做法是:

1、生产者需要取任务,生产产品。

2、消费者需要取产品,消费产品。

生产者在生产某个产品之后,要告知消费者此产品已经可以使用了。消费者通过获得可以使用这个信号来取得产品,进一步消费产品。

比如,我们有N个图像需要对每一个图像作滤波或者变换等处理,并且把处理后的结果存到硬盘上。

那么生产者可以将N个图像看成N个任务,每个任务都是独立的,每个任务的计算结果可以看成是产品,消费者就是取这个产品来写入硬盘。

先贴出一个实例代码再作解释。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <time.h>

#define jobs 1000
#define sz 102000

#if defined(_WIN32) && defined(_MSC_VER)
#include <windows.h>
double abtic() {
	__int64 freq;
	__int64 clock;
	QueryPerformanceFrequency( (LARGE_INTEGER *)&freq );
	QueryPerformanceCounter( (LARGE_INTEGER *)&clock );
	return (double)clock/freq*1000*1000;
}
#else
#include <time.h>
#include <sys/time.h>
double abtic() {
	double result = 0.0;
	struct timeval tv;
	gettimeofday( &tv, NULL );
	result = tv.tv_sec*1000*1000 + tv.tv_usec;
	return result;
}
#endif /* _WIN32 */

#if 1
double timer;
#define ABTMS timer=abtic();fprintf(stdout,"%4d  ",__LINE__)
#define ABTME fprintf(stdout,"%4d  %8.8fms\n",__LINE__,(abtic()-timer)/1000.0f)
#else
#define ABTMS
#define ABTME
#endif

int main()
{
  char *jbNotReady;
  double *a;
  double *as;
  double *pa;
  int j, k;
	char jbnr;

  a = (double*)malloc(sz*jobs*sizeof(double));
  as = (double*)malloc(jobs*sizeof(double));
  jbNotReady = (char*)malloc(jobs*sizeof(char));

  for (j = 0; j < jobs; j++)
  {
    jbNotReady[j] = 1;

  }
  memset(a, 0, sz*jobs*sizeof(double));
  memset(as, 0, jobs*sizeof(double));
  ABTMS;
#pragma omp parallel sections private(j,k,pa) shared(jbNotReady,as,a)
  {
    // producer
#pragma omp section
    {
      for (j = 0; j < jobs; j++)
      {
        pa = a+j*sz;
        for (k = 0; k < sz; k++)
        {
          pa[k] = 1.0;
        }
        jbNotReady[j] = 0;
#pragma omp flush
      }
    }
    // consumer
#pragma omp section
    {
      for (j = 0; j < jobs; j++)
      {
#pragma omp flush
        while (jbNotReady[j]){
#pragma omp flush
				}
        as[j] = 0.0;
        pa = a+j*sz;
        for (k = 0; k < sz; k++)
        {
          as[j] += pa[k];
        }
        if ((int)(as[j])!=sz)fprintf(stdout, "job id %3d :%f\n", j, as[j]);
      }
    }
  }
  ABTME;
  free(a);
  free(as);
  free(jbNotReady);
  return 0;
}

源代码中,第一个section创建的线程扮演的就是生产者的角色,第二个section扮演消费者角色。j这个变量模拟的是任务编号,第一个section中的循环模拟产生产品。第二个section每次取一个任务,而且是顺序取,通过验证任务是否已经准备好来获得正确的产品。

使用flush制导语句是为了将每个线程的缓存和内存强制保持一致,注意生产者向jbNotReady里写,而消费者只是读数据,不会出现内存中的数据写后读,读后写的问题,每个线程获得的数据都是安全的。

以上代码支持Windows和Linux,GCC4.4以后的版本都可以执行,Windows下只要支持OpenMP的编译器,都可行。