生产者-消费者(producer-consumer)问题是一个著名的线程同步问题。它描述的是:有一群生产者线程在生产产品,并将这些产品提供给消费者线程去消费。
为使生产者与消费者之间能够并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池,生产者将它所生产的产品放入一个缓冲区中;消费者可以从一个缓冲区中取走产品产生消费。
尽管所有的生产者线程和消费者线程都是以异步方式运行的,但他们之间必须保持同步,即不允许消费者到一个空缓冲区去消费,也不允许生产者向一个已经被占用的缓冲区投放产品。
我把这个问题复杂化,设立m个缓冲池,每个缓冲池都有各自固定的容量,每个生产者或消费者在进行生产消费活动之前,先选择一个缓冲池。由此会引发一个线程死锁的问题:所有的生产者都在满的缓冲池等待,直到某个消费者取走一个产品,释放出一块缓冲区;同时所有的消费者都在空的缓冲池等待,直到某个生产者放进一个产品。
解决方法:记录每一个线程的等待状态,如果当前线程会产生等待,则检测是否会产生死锁(所有线程都在等待),如果会产生死锁,拒绝此次生产消费活动(换一个缓冲池)
Java Code:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 | public class ProducerAndConsumer { //作者:xelz,博客http://www.cnblogs.com/mingcn private static final int PRODUCTION_LINE_COUNT = 20 ; //生产线(缓冲池)条数 private static final int PRODUCTION_LINE_SIZE = 100 ; //生产线最大容量 private static final int PRODUCER_COUNT = 50 ; //生产者个数 private static final int CONSUMER_COUNT = 50 ; //消费者个数 public static void main(String[] args) { ProductionLine[] productionLine = new ProductionLine[PRODUCTION_LINE_COUNT]; Producer[] producer = new Producer[PRODUCER_COUNT]; Consumer[] consumer = new Consumer[CONSUMER_COUNT]; SyncLock.initLock(PRODUCER_COUNT, CONSUMER_COUNT); //初始化每个线程的等待状态 for ( int i = 0 ; i < PRODUCTION_LINE_COUNT; i++) { //初始化每条生产线,随机容量 productionLine[i] = new ProductionLine(( int ) (Math.random() * PRODUCTION_LINE_SIZE) + 1 , i); } for ( int i = 0 ; i < PRODUCER_COUNT; i++) { //初始化生产者线程 producer[i] = new Producer(i, productionLine); new Thread(producer[i]).start(); } for ( int i = 0 ; i < CONSUMER_COUNT; i++) { //初始化消费者线程 consumer[i] = new Consumer(i, productionLine); new Thread(consumer[i]).start(); } } } class Product { //产品(缓冲区)类 int productID; //制造者编号 int productionLineID; //生产线编号 int producerID; //产品编号(产品在该生产线上的位置) Product( int producerID) { this .producerID = producerID; } } class ProductionLine { //生产线(缓冲池)类 Product[] product; //缓冲池采用循环队列模式 int size; //缓冲池大小 int count; //缓冲池当前产品计数 int productionLineID; //生产线编号 int produceID; //队头指针,下一个被放入的产品的编号 int consumeID; //队尾指针,下一个被取走的产品的编号 ProductionLine( int size, int productionLineID) { //初始化生产线(缓冲池) this .size = size; this .productionLineID = productionLineID; product = new Product[size]; for ( int i = 0 ; i < size / 2 ; i++) { //为防止刚开始产销不平衡,预先放置一半产品 putProduct( new Product(- 1 )); //产品生产者编号-1,系统制造 } } boolean isFull() { //判断缓冲池是否满 return count == size; } boolean isEmpty() { //判断缓冲池是否空 return 0 == count; } void putProduct(Product product) { //放入一个产品,并将队头指针前移 this .product[produceID] = product; product.productID = produceID; //给产品贴上标签,产品编号,生产线编号 product.productionLineID = productionLineID; produceID = ++produceID % size; //下一个产品放置位置 count++; } Product getProduct() { //取出一个产品,并将队尾指针前移 Product product = this .product[consumeID]; this .product[consumeID] = null ; consumeID = ++consumeID % size; //下一个产品取出位置 count--; return product; } } class Producer implements Runnable { //生产者线程 int producerID; //自己的生产者编号 ProductionLine[] productionLine; //共享的生产线 ProductionLine currentProductionLine; Producer( int producerID, ProductionLine[] productionLine) { this .producerID = producerID; this .productionLine = productionLine; } private void produce() { //生产活动 Product product; int productionLineID; while ( true ) { do { //选择一条生产线,若会产生死锁,则重选一条生产线 productionLineID = ( int ) (Math.random() * productionLine.length); currentProductionLine = productionLine[productionLineID]; } while ((SyncLock.waitOnFull[producerID] = currentProductionLine.isFull()) && SyncLock.deadLock()); //synchronized(SyncLock.printSyncLock) { // System.out.print("Producer " + producerID +" wants to "); // System.out.println("produce at ProductionLine " + productionLineID); //} synchronized (currentProductionLine) { //同步对象:当前生产线,不能同时有两个及以上线程操作同一生产线 while (currentProductionLine.isFull()) { //缓冲池满,无法生产,等待 try { currentProductionLine.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } product = new Product(producerID); //生产新的产品 currentProductionLine.putProduct(product); //加入缓冲池 synchronized (SyncLock.printSyncLock) { //打印生产线、生产者、产品ID及缓冲池是否变满 System.out.print( "ProductionLine: " + productionLineID); System.out.print( "\tProducer: " + producerID); System.out.print( "\tProduct: " + product.productID); System.out.println(currentProductionLine.isFull() ? "\t(" + productionLineID + ")Full!" : "" ); } //释放当前生产线同步锁 currentProductionLine.notifyAll(); //生产活动结束,唤醒当前生产线上等待的其他生产者/消费者线程 } try { Thread.sleep(( int ) (Math.random() * 1000 )); } catch (InterruptedException e) { e.printStackTrace(); } } } public void run() { produce(); } } class Consumer implements Runnable { int consumerID; //自己的消费者编号 ProductionLine[] productionLine; //共享的生产线 ProductionLine currentProductionLine; Consumer( int consumerID, ProductionLine[] productionLine) { this .consumerID = consumerID; this .productionLine = productionLine; } private void consume() { //消费活动 Product product; int productionLineID; while ( true ) { do { //选择一条生产线,若会产生死锁,则重选一条生产线 productionLineID = ( int ) (Math.random() * productionLine.length); currentProductionLine = productionLine[productionLineID]; } while ((SyncLock.waitOnEmpty[consumerID] = currentProductionLine.isEmpty()) && SyncLock.deadLock()); //synchronized(SyncLock.printSyncLock) { // System.out.print("Consumer " + consumerID +" wants to "); // System.out.println("consume at ProductionLine " + productionLineID); //} synchronized (currentProductionLine) { //同步对象:当前生产线,不能同时有两个及以上线程操作同一生产线 while (currentProductionLine.isEmpty()) { //缓冲池空,无法消费,等待 try { currentProductionLine.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } product = currentProductionLine.getProduct(); //消费产品 synchronized (SyncLock.printSyncLock) { //打印生产线、消费者、产品及其制造者ID,以及生产线是否变空 System.out.print( "ProductionLine: " + productionLineID); System.out.print( "\tConsumer: " + consumerID); System.out.print( "\tProduct: " + product.productID); System.out.print( "(" + product.producerID + ")" ); System.out.println(currentProductionLine.isEmpty() ? "\t(" + productionLineID + ")Empty!" : "" ); } currentProductionLine.notifyAll(); //生产活动结束,唤醒当前生产线上等待的其他生产者/消费者线程 } try { Thread.sleep(( int ) (Math.random() * 1000 )); } catch (InterruptedException e) { e.printStackTrace(); } } } public void run() { consume(); } } class SyncLock { //提供死锁检测的静态方法,及打印同步 static final Object printSyncLock = new Object(); //用于同步输出,防止不同线程输出交叉 static boolean [] waitOnEmpty; //指示某一消费者是否在空缓冲池等待 static boolean [] waitOnFull; //指示某一生产者是否在满缓冲池等待 static void initLock( int ProducerCount, int ConsumerCount) { //初始化等待指示器 waitOnEmpty = new boolean [ConsumerCount]; waitOnFull = new boolean [ProducerCount]; } static boolean deadLock() { //判断是否产生死锁 for ( boolean b : waitOnEmpty) { if (!b) return false ; } for ( boolean b : waitOnFull) { if (!b) return false ; } return true ; //若所有线程都在等待状态,则产生死锁 } } |