实验的内容简单来说就是用信号量来实现生产者和消费者问题。
- 建立一个生产者进程,N个消费者进程(N>1);
- 用文件建立一个共享缓冲区;
- 生产者进程依次向缓冲区写入整数0,1,2,...,M,M>=500;
- 消费者进程从缓冲区读数,每次读一个,并将读出的数字从缓冲区删除,然后将本进程ID和数字输出到标准输出;
- 缓冲区同时最多只能保存10个数。
实验的步骤如下:
(1)在ubuntu下,用系统提供的sem_open()、sem_close()、sem_wait()和sem_post()等信号量相关的系统调用编写pc.c程序。
(2)在ubuntu上编译并运行pc.c,检查运行结果。
后面为linux0.11编写信号量相关的系统调用:
(3)参考高级版本的linux内核,实现linux0.11下的山寨版信号量(4个系统调用):
sem_t *sem_open(const char *name, unsigned int value);
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_unlink(const char *name);
(4)、参照实验2,修改相关文件和添加这些系统调用相关的代码
(5)、编译linux0.11内核
(6)、运行虚拟机
(7)、将pc.c复制到虚拟机,并编译运行,查看运行结果。
接下来,我们先来点预备知识。
生产者消费者问题的模型
Producer() { 生产一个产品item; P(Empty); //空闲缓存资源 P(Mutex); //互斥信号量 将item放到空闲缓存中; V(Mutex); V(Full); //产品资源 } Consumer() { P(Full); P(Mutex); 从缓存区取出一个赋值给item; V(Mutex); V(Empty); 消费产品item; }其中PV操作的含义如下:
执行P操作P(S)时信号量S的值减1,若结果不为负则P(S)执行完毕,否则执行P操作的进程暂停以等待释放。
执行V操作V(S)时,S的值加1,若结果不大于0则释放一个因执行P(S)而等待的进程。
生产者消费者问题中需要用到三个信号量,其中一个互斥信号量,两个同步信号量。在进行互斥操作的时候一定要注意,P、V操作是成对出现的,先进行P操作,进入临界区,访问临界资源,在进行V操作,出临界区。互斥信号量的初值一般是1.
进行同步操作的时候,要分析清进程间的制约关系。同一信号量的P、V操作也要成对出现,但是其P、V操作分别在不同的进程中。同步信号量的初值一般与资源的数目有关。
信号量值的含义:大于零时,表示可用的临界资源数目;等于零时,表示资源正好用完了;小于零时,表示系统中因请求该资源而被阻塞的进程数目。
在有了上述的预备知识之后,我们可以开始这个实验了。
第一步,首先编写pc.c。pc.c要完成的功能在文章开始的时候就已经说明了。其中要注意的一点是,我对于缓冲区的处理是一次性将缓冲区中的十个数都读出来,然后消费一个,再将其余的九个数写回文件。感觉这种方法要简单一点,至少不需要用到那些文件指针。下面是pc.c文件:
#define __LIBRARY__ #include <unistd.h> #include <stdio.h> #include <stdlib.h> #define PMAX 500 #define NUMBER 10 #define CMAX 100 _syscall2(int,sem_open,const char*, name,unsigned int,value); _syscall1(int,sem_wait,sem_t*,sem); _syscall1(int,sem_post,sem_t*,sem); _syscall1(int,sem_unlink,sem_t*,sem); static int produce = 0; static int consume = 0; int main(int argc,char* argv[]) { sem_t *Mutex; sem_t *number; sem_t *location; FILE *f; int i,j,k,m; int x,y; int read=-1; int b[10]; pid_t pid[5]; Mutex = (sem_t*)sem_open("Mutex",1); number = (sem_t*)sem_open("number",0); location = (sem_t*)sem_open("location",10); for(i=0; i<5; i++) { pid[i] = fork(); if(pid[i]<0) { printf("error in fork!\n"); exit(0); } else if(pid[i]==0) { printf("process%d\n",getpid()); for(k=0; k<CMAX; k++) { sem_wait(number); sem_wait(Mutex); f = fopen("test.txt","rb"); for(m=0; m<NUMBER; m++) { read =-1; fread(&read,sizeof(int),1,f); b[m]=read; } fclose(f); printf("%d: %d\n",getpid(),b[0]); fflush(stdout); f = fopen("test.txt","wb"); for(m=1; m<NUMBER; m++) { if(b[m] >= 0) fwrite(&(b[m]),sizeof(int),1,f); } consume++; fflush(f); /*for(x=0;x<1000;x++) { for(y=0;y<1000;y++) ; }*/ fclose(f); sem_post(Mutex); sem_post(location); } exit(0); } } printf("father process\n"); for(j=0; j<PMAX; j++) { sem_wait(location); sem_wait(Mutex); f= fopen("test.txt","ab+"); read++; fwrite(&read,sizeof(int),1,f); fflush(f); produce++; /*for(x=0;x<1000;x++) { for(y=0;y<1000;y++) ; }*/ fclose(f); sem_post(Mutex); sem_post(number); } /*for(x=0;x<1000;x++) { for(y=0;y<1000;y++) ; }*/ wait(NULL); wait(NULL); wait(NULL); wait(NULL); wait(NULL); sem_unlink(Mutex); sem_unlink(number); sem_unlink(location); return 0; }
将pc.c完成,只是完成了这个实验很小的一部分。较难的部分是sem.c的编写。
sem_t是信号量类型,需要自定义。下面是unistd.h文件中对于信号量类型的定义。在实现信号量的时候需要用到一个队列,用来保存当前等待的进程。
struct queue { int front; int rear; struct task_struct *task[30]; }; typedef struct queue queue; struct semaphore { int value; int flag; int lock; /*char *name;*/ queue wait; }; typedef struct semaphore sem_t;接着是sem.c。这部分的难度较大,仅供参考。
#include <errno.h> #include <linux/kernel.h> #include <asm/segment.h> #include <asm/system.h> #include <linux/sched.h> #include <unistd.h> /*#include <signal.h>*/ #define MAX 30 char names[MAX][MAX] = {'\0'}; sem_t arr[30]= {NULL}; void MakeNull(queue *q) { q->front = 0; q->rear = MAX - 1; } int add(int x) { int y; y = (x+1)%MAX; return y; } void EnQueue(struct task_struct *curr, queue *q) { if(add(add(q->rear))==q->front) printk("queue is full"); else { q->rear = add(q->rear); q->task[q->rear]=curr; } } void DeQueue(queue *q) { if(add(q->rear)==q->front) { printk("queue is empty"); } else q->front = add(q->front); } struct task_struct *Front( queue* q ) { if(add(q->rear)==q->front) return NULL; else return (q->task[q->front]); } int sys_sem_open(const char *name, unsigned int value) { char n; int i = 0; int tag; char MyName[MAX]; while ((n = get_fs_byte(&name [i])) != '\0') { MyName[i] = n; i++; } MyName[i] = '\0'; for (i = 0; i < MAX; i++) { tag = strcmp(MyName,names[i]); if (tag==0) return &arr[i]; } for (i = 0; i < MAX; i++) { if (arr[i].flag == 0) { strcpy(names[i],MyName); arr[i].flag = 1; arr[i].value = value; /*printk("%d",value);*/ MakeNull( &(arr[i].wait)); return &arr[i]; } } return NULL; } int sys_sem_wait(sem_t *sem) { int value; cli(); (sem->value) = (sem->value)-1; value = sem->value; if (value < 0) { EnQueue(current, &(sem->wait)); /*sleep();*/ current->state = TASK_UNINTERRUPTIBLE; schedule(); } sti(); return 0; } int sys_sem_post(sem_t *sem) { struct task_struct* tmp; int value; cli(); (sem->value) = (sem->value)+1; value = sem->value; /*printk("post %d\t",sem->value);*/ if (value <= 0) { tmp = Front(&(sem->wait)); /*wake(tmp);*/ if (tmp != NULL) (*tmp).state = TASK_RUNNING; DeQueue(&(sem->wait)); } sti(); return 0; } int sys_sem_unlink(const char *name) { char n; int i=0,j=0,k=0; char aName[MAX]; int tag; while ((n = get_fs_byte(name + i)) != '\0') { aName[i] = n; i++; } aName[i] = '\0'; for(j=0; j<MAX; j++) { tag = strcmp(names[i],aName); if (tag==0) { arr[i].flag = 0; arr[i].value = 0; for(k=0;k<MAX;k++) { names[i][k] = '\0'; } return 0; } } return -1; }
剩下的工作就是增加系统调用在第二次实验中已经做过了,可以参考 哈工大软件学院操作系统实验2----系统调用这篇文章。
这次实验可以说的就这么多了,还需要大家耐心研究研究sem.c的实现那块。
偶然发现了实验报告(仅供参考啊):
1.在pc.c中去掉所有与信号量有关的代码,再运行程序,执行效果有变化吗?为什么会这样?
答:有,消费者消费的数据将不在按照递增顺序输出,并且会出现read error。因为没有了p(empty)操作,所以当生产者已经写满 了缓冲区仍然要进入生产时,就会覆盖掉还没有被消费者取走的数据,这样当消费者进入缓冲区取出数据时,就不是正常的递增顺序了,假设生产者在缓冲区中写了400,401,402,403,404,405,406,407,408,409共10个数据,那么还继续写,就会把400变为410,这样当消费者进入取数时,就会出现399,410,401,这样的输出顺序,同样的,因为没有了p(full)操作,当缓冲区的数据都被取走了,而消费者依然进入取数,读出的数据就是无效的。而没有p(mutex)v(mutex)这个加锁和解锁的操作,使得临界区代码可能出现多进程并发访问的情况,因此会有read error的情况出现。
2.实验的设计者在第一次编写生产者——消费者程序的时候,是这么做的:
Producer() { P(Mutex); //互斥信号量 生产一个产品item; P(Empty); //空闲缓存资源 将item放到空闲缓存中; V(Full); //产品资源 V(Mutex); } Consumer() { P(Mutex); P(Full); 从缓存区取出一个赋值给item; V(Empty); 消费产品item; V(Mutex); }
这样可行吗?如果可行,那么它和标准解法在执行效果上会有什么不同?如果不可行,那么它有什么问题使它不可行?
答:不可行。会有死锁情况的发生,当mutex = 1,并且生产者要进入生产一个产品,假设此时empty.value为0,那么会有mutex.value = 0,p(empty)后得到的value小于0,生产者进程进入等待在信号量empty的等待队列上面,并调用schedule(),可是此时并未解锁,即mutex.value值仍然为0。它们都等待在信号量mutex上面。同理,消费者进程也是如此,若mutex.value = 1,full.value = 0,在执行完p(mutex)p(full)之后,mutex.value = 0,并且将消费者进程放入等待在信号量full的等待队列上面,而此时也并未释放mutex,因此消费者和生产者进程都等待在mutex信号量上面。