经典线程同步问题(生产者&消费者)

时间:2022-04-21 16:41:47

生产者-消费者(producer-consumer)问题是一个著名的线程同步问题。它描述的是:有一群生产者线程在生产产品,并将这些产品提供给消费者线程去消费。

为使生产者与消费者之间能够并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池,生产者将它所生产的产品放入一个缓冲区中;消费者可以从一个缓冲区中取走产品产生消费。

尽管所有的生产者线程和消费者线程都是以异步方式运行的,但他们之间必须保持同步,即不允许消费者到一个空缓冲区去消费,也不允许生产者向一个已经被占用的缓冲区投放产品。

我把这个问题复杂化,设立m个缓冲池,每个缓冲池都有各自固定的容量,每个生产者或消费者在进行生产消费活动之前,先选择一个缓冲池。由此会引发一个线程死锁的问题:所有的生产者都在满的缓冲池等待,直到某个消费者取走一个产品,释放出一块缓冲区;同时所有的消费者都在空的缓冲池等待,直到某个生产者放进一个产品。

解决方法:记录每一个线程的等待状态,如果当前线程会产生等待,则检测是否会产生死锁(所有线程都在等待),如果会产生死锁,拒绝此次生产消费活动(换一个缓冲池)

 

Java Code

 

?
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 publicclassProducerAndConsumer {//作者:xelz,博客http://www.cnblogs.com/mingcn          privatestaticfinalintPRODUCTION_LINE_COUNT =20;//生产线(缓冲池)条数    privatestaticfinalintPRODUCTION_LINE_SIZE =100;//生产线最大容量    privatestaticfinalintPRODUCER_COUNT =50;//生产者个数    privatestaticfinalintCONSUMER_COUNT =50;//消费者个数          publicstaticvoidmain(String[] args) {        ProductionLine[] productionLine =newProductionLine[PRODUCTION_LINE_COUNT];        Producer[] producer =newProducer[PRODUCER_COUNT];        Consumer[] consumer =newConsumer[CONSUMER_COUNT];        SyncLock.initLock(PRODUCER_COUNT, CONSUMER_COUNT);//初始化每个线程的等待状态        for(inti = 0; i < PRODUCTION_LINE_COUNT; i++) {//初始化每条生产线,随机容量            productionLine[i] =newProductionLine((int) (Math.random() * PRODUCTION_LINE_SIZE) +1, i);        }        for(inti = 0; i < PRODUCER_COUNT; i++) {//初始化生产者线程            producer[i] =newProducer(i, productionLine);            newThread(producer[i]).start();        }        for(inti = 0; i < CONSUMER_COUNT; i++) {//初始化消费者线程            consumer[i] =newConsumer(i, productionLine);            newThread(consumer[i]).start();        }    }      }   classProduct {//产品(缓冲区)类          intproductID;//制造者编号    intproductionLineID;//生产线编号    intproducerID;//产品编号(产品在该生产线上的位置)          Product(intproducerID) {        this.producerID = producerID;    }      }   classProductionLine {//生产线(缓冲池)类          Product[] product;//缓冲池采用循环队列模式    intsize;//缓冲池大小    intcount;//缓冲池当前产品计数    intproductionLineID;//生产线编号    intproduceID;//队头指针,下一个被放入的产品的编号    intconsumeID;//队尾指针,下一个被取走的产品的编号          ProductionLine(intsize,intproductionLineID) {//初始化生产线(缓冲池)        this.size = size;        this.productionLineID = productionLineID;        product =newProduct[size];        for(inti = 0; i < size /2; i++) {//为防止刚开始产销不平衡,预先放置一半产品            putProduct(newProduct(-1));//产品生产者编号-1,系统制造        }    }          booleanisFull() {//判断缓冲池是否满        returncount == size;    }          booleanisEmpty() {//判断缓冲池是否空        return0== count;    }          voidputProduct(Product product) {//放入一个产品,并将队头指针前移        this.product[produceID] = product;        product.productID = produceID;//给产品贴上标签,产品编号,生产线编号        product.productionLineID = productionLineID;        produceID = ++produceID % size;//下一个产品放置位置        count++;    }          Product getProduct() {//取出一个产品,并将队尾指针前移        Product product = this.product[consumeID];        this.product[consumeID] =null;        consumeID = ++consumeID % size;//下一个产品取出位置        count--;        returnproduct;    }      }   classProducerimplementsRunnable {//生产者线程          intproducerID;//自己的生产者编号    ProductionLine[] productionLine;//共享的生产线    ProductionLine currentProductionLine;          Producer(intproducerID, ProductionLine[] productionLine) {         this.producerID = producerID;        this.productionLine = productionLine;    }          privatevoidproduce() {//生产活动        Product product;        intproductionLineID;        while(true) {             do{//选择一条生产线,若会产生死锁,则重选一条生产线                productionLineID = (int) (Math.random() * productionLine.length);                 currentProductionLine = productionLine[productionLineID];            }while((SyncLock.waitOnFull[producerID] = currentProductionLine.isFull()) && SyncLock.deadLock());            //synchronized(SyncLock.printSyncLock) {            //  System.out.print("Producer " + producerID +" wants to ");            //  System.out.println("produce at ProductionLine  " + productionLineID);            //}            synchronized(currentProductionLine) {//同步对象:当前生产线,不能同时有两个及以上线程操作同一生产线                while(currentProductionLine.isFull()) {//缓冲池满,无法生产,等待                    try{                        currentProductionLine.wait();                    }catch(InterruptedException e) {                        e.printStackTrace();                    }                }                product =newProduct(producerID);//生产新的产品                currentProductionLine.putProduct(product);//加入缓冲池                synchronized(SyncLock.printSyncLock) {//打印生产线、生产者、产品ID及缓冲池是否变满                    System.out.print("ProductionLine: "+ productionLineID);                     System.out.print("\tProducer: "+ producerID);                    System.out.print("\tProduct: "+ product.productID);                    System.out.println(currentProductionLine.isFull() ?"\t("+ productionLineID + ")Full!":"");                }//释放当前生产线同步锁                currentProductionLine.notifyAll();//生产活动结束,唤醒当前生产线上等待的其他生产者/消费者线程            }            try{                Thread.sleep((int) (Math.random() *1000));            }catch(InterruptedException e) {                e.printStackTrace();            }        }    }          publicvoidrun() {         produce();    }      }   classConsumerimplementsRunnable {          intconsumerID;//自己的消费者编号    ProductionLine[] productionLine;//共享的生产线    ProductionLine currentProductionLine;          Consumer(intconsumerID, ProductionLine[] productionLine) {         this.consumerID = consumerID;        this.productionLine = productionLine;    }          privatevoidconsume() {//消费活动        Product product;        intproductionLineID;        while(true) {             do{//选择一条生产线,若会产生死锁,则重选一条生产线                productionLineID = (int) (Math.random() * productionLine.length);                 currentProductionLine = productionLine[productionLineID];            }while((SyncLock.waitOnEmpty[consumerID] = currentProductionLine.isEmpty()) && SyncLock.deadLock());            //synchronized(SyncLock.printSyncLock) {            //  System.out.print("Consumer " + consumerID +" wants to ");            //  System.out.println("consume at ProductionLine  " + productionLineID);            //}            synchronized(currentProductionLine) {//同步对象:当前生产线,不能同时有两个及以上线程操作同一生产线                while(currentProductionLine.isEmpty()) {//缓冲池空,无法消费,等待                    try{                        currentProductionLine.wait();                    }catch(InterruptedException e) {                        e.printStackTrace();                    }                }                product = currentProductionLine.getProduct();//消费产品                synchronized(SyncLock.printSyncLock) {//打印生产线、消费者、产品及其制造者ID,以及生产线是否变空                    System.out.print("ProductionLine: "+ productionLineID);                     System.out.print("\tConsumer: "+ consumerID);                    System.out.print("\tProduct: "+ product.productID);                    System.out.print("("+ product.producerID + ")");                    System.out.println(currentProductionLine.isEmpty() ?"\t("+ productionLineID + ")Empty!":"");                }            currentProductionLine.notifyAll();//生产活动结束,唤醒当前生产线上等待的其他生产者/消费者线程            }            try{                Thread.sleep((int) (Math.random() *1000));            }catch(InterruptedException e) {                e.printStackTrace();            }        }    }          publicvoidrun() {         consume();    }      }   classSyncLock {//提供死锁检测的静态方法,及打印同步          staticfinalObject printSyncLock = newObject();//用于同步输出,防止不同线程输出交叉    staticboolean[] waitOnEmpty;//指示某一消费者是否在空缓冲池等待    staticboolean[] waitOnFull;//指示某一生产者是否在满缓冲池等待          staticvoidinitLock(intProducerCount,intConsumerCount) {//初始化等待指示器        waitOnEmpty =newboolean[ConsumerCount];        waitOnFull =newboolean[ProducerCount];    }          staticbooleandeadLock() {//判断是否产生死锁        for(booleanb : waitOnEmpty) {             if(!b)returnfalse;        }        for(booleanb : waitOnFull) {             if(!b)returnfalse;        }        returntrue;//若所有线程都在等待状态,则产生死锁    }      }