信号量的实现和应用

时间:2022-12-12 15:16:49

第四次实验是一道坎啊,话说当年差点没做出来,各种纠结的问题都出现了。这一次的实验涉及到前几次实验的小综合,所以要求有点高。建议一定要认真仔细多阅读几遍指导书。 

实验的内容简单来说就是用信号量来实现生产者和消费者问题。

  1. 建立一个生产者进程,N个消费者进程(N>1);
  2. 文件建立一个共享缓冲区;
  3. 生产者进程依次向缓冲区写入整数0,1,2,...,M,M>=500;
  4. 消费者进程从缓冲区读数,每次读一个,并将读出的数字从缓冲区删除,然后将本进程ID和数字输出到标准输出;
  5. 缓冲区同时最多只能保存10个数。
完成上述的五个步骤,最好是将pc.c在 Ubuntu下测试成功后,在改成适合0.11的版本。记得在Ubuntu下编译的时候一定不要忘了加上-lpthread,在编译的时候要连接上Thread库才可以使用,在0.11下则不需要。在Ubuntu用gcc -o pc pc.c -lpthread -Wall就可以了。

实验的步骤如下:

(1)在ubuntu下,用系统提供的sem_open()、sem_close()、sem_wait()和sem_post()等信号量相关的系统调用编写pc.c程序。

(2)在ubuntu上编译并运行pc.c,检查运行结果。

后面为linux0.11编写信号量相关的系统调用:

(3)参考高级版本的linux内核,实现linux0.11下的山寨版信号量(4个系统调用):

sem_t *sem_open(const char *name, unsigned int value);

int sem_wait(sem_t *sem);

int sem_post(sem_t *sem);

int sem_unlink(const char *name);

(4)、参照实验2,修改相关文件和添加这些系统调用相关的代码

(5)、编译linux0.11内核

(6)、运行虚拟机

(7)、将pc.c复制到虚拟机,并编译运行,查看运行结果。

接下来,我们先来点预备知识。

生产者消费者问题的模型

Producer()
{
    生产一个产品item;
    P(Empty);  //空闲缓存资源
    P(Mutex);  //互斥信号量
    将item放到空闲缓存中;
    V(Mutex);
    V(Full);  //产品资源
}

Consumer()
{
    P(Full);  
    P(Mutex);  
    从缓存区取出一个赋值给item;
    V(Mutex);
    V(Empty);
    消费产品item;
} 
其中PV操作的含义如下:

执行P操作P(S)时信号量S的值减1,若结果不为负则P(S)执行完毕,否则执行P操作的进程暂停以等待释放。

执行V操作V(S)时,S的值加1,若结果不大于0则释放一个因执行P(S)而等待的进程

生产者消费者问题中需要用到三个信号量,其中一个互斥信号量,两个同步信号量。在进行互斥操作的时候一定要注意,P、V操作是成对出现的,先进行P操作,进入临界区,访问临界资源,在进行V操作,出临界区。互斥信号量的初值一般是1.

进行同步操作的时候,要分析清进程间的制约关系。同一信号量的P、V操作也要成对出现,但是其P、V操作分别在不同的进程中。同步信号量的初值一般与资源的数目有关。

信号量值的含义:大于零时,表示可用的临界资源数目;等于零时,表示资源正好用完了;小于零时,表示系统中因请求该资源而被阻塞的进程数目。

在有了上述的预备知识之后,我们可以开始这个实验了。

第一步,首先编写pc.c。pc.c要完成的功能在文章开始的时候就已经说明了。其中要注意的一点是,我对于缓冲区的处理是一次性将缓冲区中的十个数都读出来,然后消费一个,再将其余的九个数写回文件。感觉这种方法要简单一点,至少不需要用到那些文件指针。下面是pc.c文件:

#define __LIBRARY__
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

#define PMAX 500
#define NUMBER 10
#define CMAX 100

_syscall2(int,sem_open,const char*, name,unsigned int,value);
_syscall1(int,sem_wait,sem_t*,sem);
_syscall1(int,sem_post,sem_t*,sem);
_syscall1(int,sem_unlink,sem_t*,sem);

    static int produce = 0;
    static int consume = 0;

int main(int argc,char* argv[])
{
    sem_t *Mutex;
    sem_t *number;
    sem_t *location;
    FILE *f;
    int i,j,k,m;
    int x,y;
    int read=-1;
    int b[10];
    pid_t pid[5];

    Mutex = (sem_t*)sem_open("Mutex",1);
    number = (sem_t*)sem_open("number",0);
    location = (sem_t*)sem_open("location",10);

    for(i=0; i<5; i++)
    {
        pid[i] = fork();
        if(pid[i]<0)
        {
            printf("error in fork!\n");
            exit(0);
        }
        else if(pid[i]==0)
        {
            printf("process%d\n",getpid());
            for(k=0; k<CMAX; k++)
            {
                sem_wait(number);
                sem_wait(Mutex);
                f = fopen("test.txt","rb");
                for(m=0; m<NUMBER; m++)
                {
                    read =-1;
                    fread(&read,sizeof(int),1,f);
                    b[m]=read;
                }
                fclose(f);
                printf("%d: %d\n",getpid(),b[0]);
                fflush(stdout);
                f = fopen("test.txt","wb");
                for(m=1; m<NUMBER; m++)
                {
                    if(b[m] >= 0)
                        fwrite(&(b[m]),sizeof(int),1,f);
                }
                consume++;
                fflush(f);
                /*for(x=0;x<1000;x++)
                    {
                        for(y=0;y<1000;y++)
                        ;
                    }*/
                fclose(f);
                sem_post(Mutex);
                sem_post(location);
            }
            exit(0);

        }
    }

    printf("father process\n");
    for(j=0; j<PMAX; j++)
    {
        sem_wait(location);
        sem_wait(Mutex);
        f= fopen("test.txt","ab+");

        read++;
        fwrite(&read,sizeof(int),1,f);
        fflush(f);
        produce++;
        /*for(x=0;x<1000;x++)
            {
                for(y=0;y<1000;y++)
                ;
            }*/
        fclose(f);
        sem_post(Mutex);
        sem_post(number);
    }
    /*for(x=0;x<1000;x++)
      {
          for(y=0;y<1000;y++)
          ;
      }*/
    wait(NULL);
    wait(NULL);
    wait(NULL);
    wait(NULL);
    wait(NULL);
    sem_unlink(Mutex);
    sem_unlink(number);
    sem_unlink(location);


    return 0;
}

将pc.c完成,只是完成了这个实验很小的一部分。较难的部分是sem.c的编写。

sem_t是信号量类型,需要自定义。下面是unistd.h文件中对于信号量类型的定义。在实现信号量的时候需要用到一个队列,用来保存当前等待的进程。

struct queue
{
    int front;
    int rear;
    struct task_struct *task[30];
};
typedef struct queue queue;
   
struct semaphore
{
    int value;
    int flag;
    int lock;
    /*char *name;*/
    queue wait;
};
typedef struct semaphore sem_t;
接着是sem.c。这部分的难度较大,仅供参考。
#include <errno.h>
#include <linux/kernel.h>
#include <asm/segment.h>
#include <asm/system.h>
#include <linux/sched.h>
#include <unistd.h>
/*#include <signal.h>*/
#define MAX 30

char names[MAX][MAX] = {'\0'};
sem_t arr[30]= {NULL};

void MakeNull(queue *q)
{
    q->front = 0;
    q->rear = MAX - 1;
}

int add(int x)
{
    int y;
    y = (x+1)%MAX;
    return y;
}

void EnQueue(struct task_struct *curr, queue *q)
{
    if(add(add(q->rear))==q->front)
        printk("queue is full");
    else
    {
        q->rear = add(q->rear);
        q->task[q->rear]=curr;
    }
}

void DeQueue(queue *q)
{
    if(add(q->rear)==q->front)
    {
        printk("queue is empty");
    }
    else
        q->front = add(q->front);
}

struct task_struct *Front( queue* q )
{
    if(add(q->rear)==q->front)
        return NULL;
    else
        return (q->task[q->front]);
}

int sys_sem_open(const char *name, unsigned int value)
{
    char n;
    int i = 0;
    int tag;
    char MyName[MAX];

    while ((n = get_fs_byte(&name [i])) != '\0')
    {
        MyName[i] = n;
        i++;
    }
    MyName[i] = '\0';

    for (i = 0; i < MAX; i++)
    {
        tag = strcmp(MyName,names[i]);
        if (tag==0)
            return &arr[i];
    }

    for (i = 0; i < MAX; i++)
    {
        if (arr[i].flag == 0)
        {
            strcpy(names[i],MyName);
            arr[i].flag = 1;
            arr[i].value = value;
            /*printk("%d",value);*/
            MakeNull( &(arr[i].wait));
            return &arr[i];
        }
    }
    return NULL;
}

int sys_sem_wait(sem_t *sem)
{
    int value;
    cli();
    (sem->value) = (sem->value)-1;
    value = sem->value;
    if (value < 0)
    {
        EnQueue(current, &(sem->wait));
        /*sleep();*/
        current->state = TASK_UNINTERRUPTIBLE;
        schedule();
    }
    sti();
    return 0;
}

int sys_sem_post(sem_t *sem)
{
    struct task_struct* tmp;
    int value;
    cli();
    (sem->value) = (sem->value)+1;
    value = sem->value;
    /*printk("post  %d\t",sem->value);*/
    if (value <= 0)
    {
        tmp = Front(&(sem->wait));
        /*wake(tmp);*/
         if (tmp != NULL)
        (*tmp).state = TASK_RUNNING;
        DeQueue(&(sem->wait));
    }

    sti();
    return 0;
}
int sys_sem_unlink(const char *name)
{
    char n;
    int i=0,j=0,k=0;
    char aName[MAX];
    int tag;
    while ((n = get_fs_byte(name + i)) != '\0')
    {
        aName[i] = n;
        i++;
    }
    aName[i] = '\0';
    for(j=0; j<MAX; j++)
    {
        tag = strcmp(names[i],aName);
        if (tag==0)
        {
            arr[i].flag = 0;
            arr[i].value = 0;
            for(k=0;k<MAX;k++)
            {
               names[i][k] = '\0';
            }
            return 0;
        }
    }
    return -1;
}

剩下的工作就是增加系统调用在第二次实验中已经做过了,可以参考 哈工大软件学院操作系统实验2----系统调用这篇文章。

这次实验可以说的就这么多了,还需要大家耐心研究研究sem.c的实现那块。

偶然发现了实验报告(仅供参考啊):

1.在pc.c中去掉所有与信号量有关的代码,再运行程序,执行效果有变化吗?为什么会这样?

答:有,消费者消费的数据将不在按照递增顺序输出,并且会出现read error。因为没有了p(empty)操作,所以当生产者已经写满 了缓冲区仍然要进入生产时,就会覆盖掉还没有被消费者取走的数据,这样当消费者进入缓冲区取出数据时,就不是正常的递增顺序了,假设生产者在缓冲区中写了400,401,402,403,404,405,406,407,408,409共10个数据,那么还继续写,就会把400变为410,这样当消费者进入取数时,就会出现399,410,401,这样的输出顺序,同样的,因为没有了p(full)操作,当缓冲区的数据都被取走了,而消费者依然进入取数,读出的数据就是无效的。而没有p(mutex)v(mutex)这个加锁和解锁的操作,使得临界区代码可能出现多进程并发访问的情况,因此会有read error的情况出现。

2.实验的设计者在第一次编写生产者——消费者程序的时候,是这么做的:

Producer()
{
    P(Mutex);  //互斥信号量
    生产一个产品item;
    P(Empty);  //空闲缓存资源
    将item放到空闲缓存中;
    V(Full);  //产品资源
    V(Mutex);
}

Consumer()
{
    P(Mutex);  
    P(Full);  
    从缓存区取出一个赋值给item;
    V(Empty);
    消费产品item;
    V(Mutex);
} 

这样可行吗?如果可行,那么它和标准解法在执行效果上会有什么不同?如果不可行,那么它有什么问题使它不可行?

答:不可行。会有死锁情况的发生,当mutex = 1,并且生产者要进入生产一个产品,假设此时empty.value为0,那么会有mutex.value = 0,p(empty)后得到的value小于0,生产者进程进入等待在信号量empty的等待队列上面,并调用schedule(),可是此时并未解锁,即mutex.value值仍然为0。它们都等待在信号量mutex上面。同理,消费者进程也是如此,若mutex.value = 1,full.value = 0,在执行完p(mutex)p(full)之后,mutex.value = 0,并且将消费者进程放入等待在信号量full的等待队列上面,而此时也并未释放mutex,因此消费者和生产者进程都等待在mutex信号量上面。