记录型信号量解决生产者消费者问题

时间:2024-04-14 14:04:45

先上书上伪代码:

记录型信号量解决生产者消费者问题

             记录型信号量解决生产者消费者问题

用java代码实现如下:

生产者(Producer)

import java.util.Random;

public class Producer implements Runnable{
    private String producerName;
    private Buffer buffer;
    private Semaphore semaphore;
    public Producer(String producerName, Buffer buffer, Semaphore semaphore){
        this.producerName = producerName;
        this.buffer = buffer;
        this.semaphore = semaphore;
    }
    @Override
    public void run() {
        int time;
        Random random = new Random();
        while (true){
            System.out.println(producerName+"生产了一个产品");
            Item item = new Item(producerName+"的产品");
            time = random.nextInt(5000)+5000;
            try {
                semaphore.wait("empty");
                semaphore.wait("mutex");
                System.out.println(producerName+"放进一个产品");
                buffer.pushItem(item);
                semaphore.signal("mutex");
                semaphore.signal("full");
                System.out.println("让"+producerName+"睡"+time+"ms");
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

生产者(Consumer)

import java.util.Random;

public class Consumer implements Runnable{
    private String consumerName;
    private Buffer buffer;
    private Semaphore semaphore;
    public Consumer(String consumerName, Buffer buffer, Semaphore semaphore){
        this.consumerName = consumerName;
        this.buffer = buffer;
        this.semaphore = semaphore;
    }

    @Override

    public void run() {
        int time;
        Random random = new Random();
        while (true){
            time = random.nextInt(5000)+5000;
            try {
                semaphore.wait("full");
                semaphore.wait("mutex");

                System.out.println(consumerName+"拿走一个产品");
                buffer.popItem();
                semaphore.signal("mutex");
                semaphore.signal("empty");
                System.out.println("让"+consumerName+"睡"+time+"ms");
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

缓冲区(Buffer)

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class Buffer {
    private int index;
    private int maxSize;
    private List<Item> items;

    public Buffer(int maxSize) {
        this.index = 0;
        this.maxSize = maxSize;
        items = new ArrayList<Item>();
    }

    public synchronized void pushItem(Item item) {
        if (index==maxSize){
            System.out.println("产品已满,程序错误");
            return;
        }
        items.add(index, item);
        index++;
        System.out.println("缓冲区进入一个产品,还有"+items.size()+"个");
    }
    public synchronized void popItem(){
        if(index==0){
            System.out.println("没有产品无法取出,程序错误");
            return;
        }
        index--;
        items.remove(index);
        System.out.println("缓冲区减少一个产品,还有"+items.size()+"个");
    }

    public synchronized int getCount(){
        return items.size();
    }
}

产品(Item)

public class Item {
    private String itemName;
    public Item(String itemName){
        this.itemName = itemName;
    }
}

信号量(Semaphore)

public class Semaphore {
    private int mutex;
    private int empty;
    private int full;
    private Buffer buffer;

    public Semaphore(Buffer buffer, int empty, int mutex) {
        this.buffer = buffer;
        this.empty = empty;
        this.mutex = mutex;
        this.full = 0;

    }

    public synchronized void wait(String type) throws InterruptedException {
        if ("empty".equals(type)) {
            empty--;
            if (empty < 0) {
                this.wait();
            }
        } else if ("full".equals(type)) {
            full--;
            if (full < 0) {
                this.wait();
            }
        } else if ("mutex".equals(type)) {
            mutex--;
            if (mutex < 0) {
                this.wait();
            }
        }
    }

    public synchronized void signal(String type) throws InterruptedException {
        if ("empty".equals(type)) {
            empty++;
            if (empty <= 0) {
                this.notifyAll();
            }
        } else if ("full".equals(type)) {
            full++;
            if (full <= 0) {
                this.notifyAll();
            }
        } else if ("mutex".equals(type)) {
            mutex++;
            if (mutex <= 0) {
                this.notifyAll();
            }
        }
    }
}

启动类(Start)

public class Start {
    public static void main(String args[]){
        int maxsize = 10;
        Buffer buffer = new Buffer(maxsize);
        Semaphore semaphore = new Semaphore(buffer, maxsize, 1);
        Producer[] producers = new Producer[5];
        Consumer[] consumers = new Consumer[5];
        for (int i = 0; i < producers.length; i++) {
            producers[i] = new Producer("生产者" + i + "号", buffer, semaphore);
            consumers[i] = new Consumer("消费者" + i + "号", buffer, semaphore);
            new Thread(producers[i]).start();
            new Thread(consumers[i]).start();
        }
    }
}

运行结果:

记录型信号量解决生产者消费者问题