先上书上伪代码:
用java代码实现如下:
生产者(Producer)
import java.util.Random; public class Producer implements Runnable{ private String producerName; private Buffer buffer; private Semaphore semaphore; public Producer(String producerName, Buffer buffer, Semaphore semaphore){ this.producerName = producerName; this.buffer = buffer; this.semaphore = semaphore; } @Override public void run() { int time; Random random = new Random(); while (true){ System.out.println(producerName+"生产了一个产品"); Item item = new Item(producerName+"的产品"); time = random.nextInt(5000)+5000; try { semaphore.wait("empty"); semaphore.wait("mutex"); System.out.println(producerName+"放进一个产品"); buffer.pushItem(item); semaphore.signal("mutex"); semaphore.signal("full"); System.out.println("让"+producerName+"睡"+time+"ms"); Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } } }
生产者(Consumer)
import java.util.Random; public class Consumer implements Runnable{ private String consumerName; private Buffer buffer; private Semaphore semaphore; public Consumer(String consumerName, Buffer buffer, Semaphore semaphore){ this.consumerName = consumerName; this.buffer = buffer; this.semaphore = semaphore; } @Override public void run() { int time; Random random = new Random(); while (true){ time = random.nextInt(5000)+5000; try { semaphore.wait("full"); semaphore.wait("mutex"); System.out.println(consumerName+"拿走一个产品"); buffer.popItem(); semaphore.signal("mutex"); semaphore.signal("empty"); System.out.println("让"+consumerName+"睡"+time+"ms"); Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } } }
缓冲区(Buffer)
import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class Buffer { private int index; private int maxSize; private List<Item> items; public Buffer(int maxSize) { this.index = 0; this.maxSize = maxSize; items = new ArrayList<Item>(); } public synchronized void pushItem(Item item) { if (index==maxSize){ System.out.println("产品已满,程序错误"); return; } items.add(index, item); index++; System.out.println("缓冲区进入一个产品,还有"+items.size()+"个"); } public synchronized void popItem(){ if(index==0){ System.out.println("没有产品无法取出,程序错误"); return; } index--; items.remove(index); System.out.println("缓冲区减少一个产品,还有"+items.size()+"个"); } public synchronized int getCount(){ return items.size(); } }
产品(Item)
public class Item { private String itemName; public Item(String itemName){ this.itemName = itemName; } }
信号量(Semaphore)
public class Semaphore { private int mutex; private int empty; private int full; private Buffer buffer; public Semaphore(Buffer buffer, int empty, int mutex) { this.buffer = buffer; this.empty = empty; this.mutex = mutex; this.full = 0; } public synchronized void wait(String type) throws InterruptedException { if ("empty".equals(type)) { empty--; if (empty < 0) { this.wait(); } } else if ("full".equals(type)) { full--; if (full < 0) { this.wait(); } } else if ("mutex".equals(type)) { mutex--; if (mutex < 0) { this.wait(); } } } public synchronized void signal(String type) throws InterruptedException { if ("empty".equals(type)) { empty++; if (empty <= 0) { this.notifyAll(); } } else if ("full".equals(type)) { full++; if (full <= 0) { this.notifyAll(); } } else if ("mutex".equals(type)) { mutex++; if (mutex <= 0) { this.notifyAll(); } } } }
启动类(Start)
public class Start { public static void main(String args[]){ int maxsize = 10; Buffer buffer = new Buffer(maxsize); Semaphore semaphore = new Semaphore(buffer, maxsize, 1); Producer[] producers = new Producer[5]; Consumer[] consumers = new Consumer[5]; for (int i = 0; i < producers.length; i++) { producers[i] = new Producer("生产者" + i + "号", buffer, semaphore); consumers[i] = new Consumer("消费者" + i + "号", buffer, semaphore); new Thread(producers[i]).start(); new Thread(consumers[i]).start(); } } }
运行结果: