要理解生产消费者问题,首先应弄清PV操作的含义:PV操作是由P操作原语和V操作原语组成(原语是不可中断的过程),对信号量进行操作,具体定义如下:
P(S):①将信号量S的值减1,即S=S-1;
②如果S>0,则该进程继续执行;否则该进程置为等待状态,排入等待队列。
V(S):①将信号量S的值加1,即S=S+1;
②如果S>0,则该进程继续执行;否则释放队列中第一个等待信号量的进程。
P操作相当于申请资源,而V操作相当于释放资源。
生产者-消费者问题是一个有代表性的进程同步问题,生产者-消费者问题,也称作有界缓冲区问题,两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,或者当缓冲区空时,消费者还要从中取出数据项的问题。为了保证这种情况不会发生,我们通常使用信号量和消息传递来解决生产者-消费者问题。
(1)使用信号量解决生产者-消费者问题
一个信号量的取值可以为 0(表示没有保存下来的唤醒操作)或者为正值(表示有一个或多个唤醒操作)。
并且设立了两种操作:down和 up(也是一般教科书上说的 P/V向量)。对一个信号量执行 down操作,表示检查其值是否大于 0,如果该值大于 0,则将其值减 1(即用掉一个保存的唤醒信号)并继续;如果为 0,则进程休眠,而且此时 down操作并未结束。另外,就是检查数值,修改变量值以及可能发生的休眠操作都作为单一的,不可分割的原子操作来完成。
下面开始考虑用信号量来解决生产者-消费者问题了。
#define N 100 // 缓冲区中的槽数目
typedef int semaphore; // 信号量一般被定义为特殊的整型数据
semaphore mutex = 1; // 控制对临界区的访问
semaphore empty = N; // 计数缓冲区中的空槽数目
semaphore full = 0; // 计数缓冲区中的满槽数目
/* 生产者进程 */
void proceducer(void)
{
int item;
while(1)
{
item = procedure_item(); // 生成数据
down(&empty); // 将空槽数目减 1
down(&mutex); // 进入临界区
insert_item(item); // 将新数据放入缓冲区
up(&mutex); // 离开临界区
up(&full); // 将满槽的数目加 1
}
}
/* 消费者进程 */
void consumer(voi)
{
int item;
while(1)
{
down(&full); // 将满槽数目减 1
down(&mutex); // 进入临界区
item = remove_item(); // 从缓冲区中取出数据项
up(&mutex); // 离开临界区
up(&empty); // 将空槽数目加 1
consumer_item(item); // 处理数据项
}
}
该解决方案使用了三个信号量:一个为 full,用来记录充满的缓冲槽的数目,一个为 empty,记录空的缓冲槽总数,一个为 mutex,用来确保生产者和消费者不会同时访问缓冲区。mutex的初始值为 1,供两个或者多个进程使用的信号量,保证同一个时刻只有一个进程可以进入临界区,称为二元信号量(binary semaphore)。如果每一个进程在进入临界区前都执行一个 down(...),在刚刚退出临界区时执行一个 up(...),就能够实现互斥。
另外,通常是将 down和 up操作作为系统调用来实现,而且 OS只需要在执行以下操作时暂时禁止全部中断:测试信号量,更新信号量以及在需要时使某个进程休眠。
这里使用了三个信号量,但是它们的目的却不相同,其中 full和 empty用来同步(synchronization),而 mutex用来实现互斥。
(2)使用消息传递解决生产者-消费者问题
这种 IPC方式使用两条原语 send和 receive,也是系统调用。如:
send(dest, &msg) // 将消息 msg发送到目标(进程)dest中
receive(src, &msg) // 接收由 src过来的 msg,如果没有消息可用,则可能阻塞接收者
消息传递系统会面临位于网络中不同机器上的通信进程的情形,所以会更加的复杂。如:消息可能被网络丢失,一般使用确认(ACK)消息。如果发送方在一定的时间段内没有收到确认消息,则重发消息。
如果消息本身被正确接收,但是返回的 ACK消息丢失,发送方则重发消息,这样接收方就会收到两份同样的消息。一般使用在每条原始消息的头部嵌入一个连续的序号来解决这个问题。
另外,消息传递系统还需要解决进程命名的问题,在 send和 receive系统调用中指定的进程必须没有二义性的。还有其他的一些问题,如性能问题,身份认证等等,不过那个就会扯多了,还是看看如果解决这个生产者-消费者的问题吧:
#define N 100 // 缓冲区中的槽数目
/* 生产者进程 */
void proceducer(void)
{
int item;
messagemsg; // 消息缓冲区
while(1)
{
item = procedure_item(); // 生成数据
receive(consumer, &msg); // 等待消费者发送空的缓冲区
build_msg(&msg, item); // 创建待发送消息
send(consumer, &msg); // 发送数据项给消费者
}
}
/* 消费者进程 */
void consumer(voi)
{
int item,i;
messagemsg;
for(i=0;i<N; i++)
send(producer, &msg); // 发送给生产者 N 个空缓冲区
while(1)
{
receive(producer, &msg); // 接收包含数据项的消息
item = extract_item(&msg); // 解析消息,并组装成数据项
send(proceduer, &msg); // 然后又将空缓冲区发送回生产者
consumer_item(item); // 处理数据项
}
}
在这个解决方案中,共使用了 N条消息,有点类似于上一个的共享内存缓冲区的 N个槽,消费者进程这边首先通过一个 for循环将 N条空消息发送给生产者。当生产者向消费者传递一个数据项时,是通过取走每一条接收到的空消息,然后送回填充了内容的消息给消费者的。通过这种方式,整个消息传递系统中的总的消息数(包括空的消息 + 存了数据项的消息== N)是不变的。
如果运行过程中,生产者进程的速度比消费者快,则所有的消息最终都会塞满,然后生产者进程就会等待消费者(即使调用 procedure也是阻塞在 receive处),直到消费者返回一条空的消息;反之亦然。
下面再来看一下消息传递方式的两种变体。一种是:为每一个进程分配一个唯一的地址,让消息按照这个进程的地址进行编址。也就是 send和 receive调用的第一个参数指定为具体的进程地址。另一种是:引入信箱(mailbox),可以信箱就像一个盒子,里面装了很多的信件,这个信件就是我们要传递的消息,当然信箱是有容量限制的。当使用信箱时,send和 receive系统调用中的地址参数就是信箱的地址,而不是进程的地址。当一个进程尝试向一个容量爆满的信箱发送消息时,它将会被挂起,直到信箱中有消息被取走。