1 概述
线程间通信是多线程十分重要的一个知识点,Java多线程是用基于wait/notify的等待/通知模式实现的。其一个经典的案例就是“生产者消费者模式”。其中,生产者负责生产商品,消费者负责消费商品。在没有商品时,消费者必须等待生产者生产;而在已经有商品时,生产者必须等待消费者消费完才能继续消费。当然,还有“变种”模式,就是生产者可以生产商品堆积,但是堆积的数目不能超过一定的数目。可是,原理还是类似的,下面就生产商品不堆积的情况介绍一下Java多线程的实现方式。
2 最简单的模型:一生产者一消费者
生产者消费者最简单的模型就是只有一个生产者和一个消费者,Java的实现方式也很简单,参考下面的代码。其中,抽象了一个商品类即Resource类,为了方便起见,为其定义了一个属性即它的id,在生产的时候这个id随机产生(小于1000)。Resource类拥有生产和消费两个方法,这两个方法都是加synchronized关键词同步的,持有的锁都是this锁。另外,在Resource类中定义了一个标志位,用于标识商品是不是已经生产了。在调用生产方法时,判断这个标志位,如果为真就表示已经有商品,生产方法需要等待,当其被消费者消费以后唤醒,执行生产操作,将标志位置真,唤醒消费完以后执行等待的消费者。就这样,生产者和消费者“你来我往”,执行本身操作以后,切换标志位,唤醒对方,本身今日等待状态。
import java.util.Random; /** * Created by fubinhe on 16/9/30. */ public class SingleProducerConsumer { public static void main(String[] args) { Resource res = new Resource(); Producer producer = new Producer(res); Consumer consumer = new Consumer(res); Thread proThread = new Thread(producer); Thread conThread = new Thread(consumer); proThread.start(); conThread.start(); } } class Producer implements Runnable { private Resource res; public Producer(Resource res) { this.res = res; } @Override public void run() { while (true) { res.produce(new Random().nextInt(1000)); } } } class Consumer implements Runnable { private Resource res; public Consumer(Resource res) { this.res = res; } @Override public void run() { while (true) { res.consume(); } } } class Resource { private int id; private boolean hasProduced = false; public synchronized void produce(int id) { if (hasProduced) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.id = id; System.out.println("produced..." + id); hasProduced = true; this.notify(); } public synchronized void consume() { if (!hasProduced) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("consumed......" + this.id); hasProduced = false; this.notify(); } }
这个程序的执行结果如下图所示,生产状态和消费状态不断的切换。
3 多生产者和消费者
当有多个生产者和消费者的数目不再单一时,上面的模型就会出现一些问题。下面以两个生产者和两个消费者为例(因为具体数目有多少个的原理都是类似的,方便起见只选择两个),说明当出现多个生产者和消费者时,会遇到的一些问题和我们的解决方式。
3.1 执行权一直被某一方占有
我们将上面的代码稍微改动一下,分别开辟两个生产者和消费者线程。
import java.util.Random; /** * Created by fubinhe on 16/9/30. */ public class MultipleProducerConsumer { public static void main(String[] args) { Resource res = new Resource(); Producer producer = new Producer(res); Consumer consumer = new Consumer(res); Thread proThread1 = new Thread(producer); Thread proThread2 = new Thread(producer); Thread conThread1 = new Thread(consumer); Thread conThread2 = new Thread(consumer); proThread1.start(); proThread2.start(); conThread1.start(); conThread2.start(); } } class Producer implements Runnable { private Resource res; public Producer(Resource res) { this.res = res; } @Override public void run() { while (true) { res.produce(new Random().nextInt(1000)); } } } class Consumer implements Runnable { private Resource res; public Consumer(Resource res) { this.res = res; } @Override public void run() { while (true) { res.consume(); } } } class Resource { private int id; private boolean hasProduced = false; public synchronized void produce(int id) { if (hasProduced) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.id = id; System.out.println("produced..." + id); hasProduced = true; this.notify(); } public synchronized void consume() { if (!hasProduced) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("consumed......" + this.id); hasProduced = false; this.notify(); } }
然后其他代码都不需要改动,执行以后可能会出现类似下图的结果,就是某一方会一直占有执行权,而对方没有机会执行。出现这种情况,是因为始终只有一个线程会抢到main函数中的res对象的this锁而得到执行权,例如生产者proThread1抢到,而其他三个线程都在等待状态(就是另外一个生产者线程和两个消费者线程),当前生产完了以后,就会唤醒等待的线程。而这个时候有三个线程在等待,不幸的是,执行权可能被另外一个生产者线程抢到,更不幸的是,执行权一直被这两个生产者轮流抢到,而消费者线程却一直“眼巴巴”的在旁边看着得不得执行的权力。就这样,会出现下图这样的情形。当然,也可能出现一直是消费者线程在执行的情况。
3.2 假死
为了解决上面的那个问题,可以将上面的代码中,produce方法和consume方法最开始的地方将if判断改为while。这样的话,当某个线程被唤醒时,它会回头判断是谁唤醒它的,如果是自己一方的,就接着等待,否则才得到执行权进行下面的操作。
import java.util.Random; /** * Created by fubinhe on 16/9/30. */ public class MultipleProducerConsumer { public static void main(String[] args) { Resource res = new Resource(); Producer producer = new Producer(res); Consumer consumer = new Consumer(res); Thread proThread1 = new Thread(producer); Thread proThread2 = new Thread(producer); Thread conThread1 = new Thread(consumer); Thread conThread2 = new Thread(consumer); proThread1.start(); proThread2.start(); conThread1.start(); conThread2.start(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()]; Thread.currentThread().getThreadGroup().enumerate(threads); for (Thread thread : threads) { System.out.println(thread.getName() + ": " + thread.getState()); } } } class Producer implements Runnable { private Resource res; public Producer(Resource res) { this.res = res; } @Override public void run() { while (true) { res.produce(new Random().nextInt(1000)); } } } class Consumer implements Runnable { private Resource res; public Consumer(Resource res) { this.res = res; } @Override public void run() { while (true) { res.consume(); } } } class Resource { private int id; private boolean hasProduced = false; public synchronized void produce(int id) { while (hasProduced) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.id = id; System.out.println("produced..." + id); hasProduced = true; this.notify(); } public synchronized void consume() { while (!hasProduced) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("consumed......" + this.id); hasProduced = false; this.notify(); } }
当将if改为while时,就可以消除某一方一直执行的问题,但是会出现“假死”的危险,如下图所示。在上面代码中的main函数中,添加了对所有线程的状态判断。发现生成者和消费者现在全部是处在等待的状态。出现这样的情况,是因为,当某个线程被自己方唤醒后重新判断后继续等待,而由于它没有出synchronized修饰的方法,其持有的this锁一直不会释放,这样就出现“假死”的情况。
3.3 终极解决方法
最终的解决方式,就是不能一次只唤醒一个,因为这个被唤醒的有可能是自己方的。那么,将notify方法换成notifyAll方法,就是唤醒时唤醒所有在等待的线程,也不分哪一方了。这样就能解决“假死”的问题。最终的代码和执行结果如下。
import java.util.Random; /** * Created by fubinhe on 16/9/30. */ public class MultipleProducerConsumer { public static void main(String[] args) { Resource res = new Resource(); Producer producer = new Producer(res); Consumer consumer = new Consumer(res); Thread proThread1 = new Thread(producer); Thread proThread2 = new Thread(producer); Thread conThread1 = new Thread(consumer); Thread conThread2 = new Thread(consumer); proThread1.start(); proThread2.start(); conThread1.start(); conThread2.start(); } } class Producer implements Runnable { private Resource res; public Producer(Resource res) { this.res = res; } @Override public void run() { while (true) { res.produce(new Random().nextInt(1000)); } } } class Consumer implements Runnable { private Resource res; public Consumer(Resource res) { this.res = res; } @Override public void run() { while (true) { res.consume(); } } } class Resource { private int id; private boolean hasProduced = false; public synchronized void produce(int id) { while (hasProduced) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.id = id; System.out.println("produced..." + id); hasProduced = true; this.notifyAll(); } public synchronized void consume() { while (!hasProduced) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("consumed......" + this.id); hasProduced = false; this.notifyAll(); } }
4 传统方法的缺点
通过上面的一步步剖析,可以发现传统方式实现生产者消费者模式时,唤醒线程的方式比较“费劲”。而且这种“不分你我”的唤醒方式的一个明显的缺点是,比较消耗资源,因为有些线程并唤醒后也不能执行重新进入等待状态。Java 5引入了Condition对象,通过不同的Condition对象更有针对性的唤醒等待的线程,具体可以参考下一篇blog:Chapter 6 生产者消费者之Condition实现。