多线程协同任务之:生产者和消费者模型
介绍
在上一篇中介绍了多线程的等待和唤醒机制,这一篇我们就实践一下,多个生产者和多个消费模型。
假如有一个资源类Resouce,其有一个属性name属性和我们赋予的一个标识为flag属性。现在通过启动多线程,来达到一边赋值一边取值打印的效果。
Resource类定义
public class Resource {
/** 标识位 */
private boolean flag = false;
/** 资源名称 */
private String name;
//setter和getter方法
...
}
ProductThread线程类
public class ProductThread implements Runnable {
/** 资源对象 */
private Resource resource;
public ProductThread(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
int count = 0;
while (true) {
synchronized (resource) {
if (resource.isFlag()) {// 如果有产品这个线程就放弃执行权
try {
resource.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {// 如果没有产品就生产,生产完之后通知消费者
count++;
resource.setName("产品" + count);
System.out.println(Thread.currentThread().getName() +"生产者 product:" + resource.getName());
resource.setFlag(true);
resource.notifyAll();
//备注这个方法唤醒所有其它线程,包括消费者线程和本方的生产者线程。
//按理说不应该唤醒本方的生产者线程,因为这是没有意义的,但是如果使用同步代码块只能这样实现。如果使用Lock就可以更细粒度控制。
}
}
}
}
}
ConsumeThread线程类
public class ConsumeThread implements Runnable {
/** 资源对象*/
private Resource resource;
public ConsumeThread(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while (true) {
synchronized (resource) {
if (resource.isFlag()) {// 如果有产品,就直接消费。消费完之后通知生产线程
System.out.println(Thread.currentThread().getName() +"消费者consume:" + resource.getName());
resource.setFlag(false);
resource.notifyAll();
//备注这个方法唤醒所有其它线程,包括消费者线程和本方的生产者线程。
//按理说不应该唤醒本方的消费者线程,因为这是没有意义的,但是如果使用同步代码块只能这样实现。如果使用Lock就可以更细粒度控制。
} else {// 如果没有产品,就等待生产者生产产品
try {
resource.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
执行结果
main方法
public static void main(String[] args) {
Resource resource = new Resource();
//生产者
ProductThread productThread1 = new ProductThread(resource);
ProductThread productThread2 = new ProductThread(resource);
//消费者
ConsumeThread consumeThread1 = new ConsumeThread(resource);
ConsumeThread consumeThread2 = new ConsumeThread(resource);
new Thread(productThread1).start();
new Thread(productThread2).start();
new Thread(consumeThread1).start();
new Thread(consumeThread2).start();
}
基于ReentranLock和Condition实现生产者消费者模型
由于使用Synchronize同步代码块来实现生产者消费者模型存在一定的性能消耗,所以使用改进后的ReentranLock和Condition来代替。
Resource资源类
public class Resource {
// 标识位
private boolean flag = false;
public Resource() {
}
// 新建一个ReentranLock对象
private ReentrantLock reentrantLock = new ReentrantLock();
// 定义生产者监视器
private Condition product_condition = reentrantLock.newCondition();
// 定义消费者监视器
private Condition consume_condition = reentrantLock.newCondition();
// 产品名称
private String name;
static int count = 0;
public void consume() {
try {
reentrantLock.lock();
if (isFlag()) {// 如果有产品就消费
System.out.println(Thread.currentThread().getName() + "消费者 consume:" + getName());
setFlag(false);
product_condition.signalAll();
} else {// 没有产品就等待
try {
consume_condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
reentrantLock.unlock();
}
}
public void product() {
try {
reentrantLock.lock();
if (isFlag()) {// 如果已经有产品,等待消费
try {
product_condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// 如果没有产品那么就生产产品然后再唤醒消费者所有线程。
count++;
setName("产品:" + count);
System.out.println(Thread.currentThread().getName() + "生产者 product:" + getName());
setFlag(true);
consume_condition.signalAll();
}
} finally {
reentrantLock.unlock();
}
}
}
生产者线程
public class ProductThread implements Runnable {
//资源对象
private Resource resource;
public ProductThread(Resource resource) {
this.resource = resource;
}
public void run() {
while (true) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.product();
}
}
}
消费者线程
public class ConsumeThread implements Runnable {
// 资源对象
private Resource resource;
public ConsumeThread(Resource resource) {
this.resource = resource;
}
public void run() {
while (true) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.consume();
}
}
}
总结
多线程协同任务时,虽然可以在线程类run方法中处理,但是并不推荐在run方法中写复杂代码.
最好在共享对象方法中处理业务逻辑和锁处理。
总结
最容易弄错的地方就是,什么时候调用wait方法什么时候调用notifyAll方法,这一点确实不好说。
通常是当这一边的线程不用执行任何代码时,那么久调用wait方法,放弃执行权,如果当这边修改了某标识位之后,
想让对方执行那么久执行notify方法。这么说还是显得比较抽象,暂时还没想到比较简单的说法,还是和应用场景有关,很难总结。
备注:要注意notifyAll唤醒是的所有等待的方法,不管是本方还是对方的(生产者和消费者),
这就导致了并不是很合理,没有充分利用CPU资源,有些场景下可能会导致,CPU频繁切换而执行效率并不高。
备注:这两篇的重点并不是写出java生产者和消费者模型的代码,而是掌握线程通信原理以及等待唤醒机制。
当然生产者消费者模型非常重要,实际应用中通常会在生产者和消费者之间有一个缓存池比如队列容器作缓冲池。
也有比较成熟的方案比如消息中间件,例如RabbitMQ,相信我消息中间件非常的有用。
展望:我们下一篇使用ReentrantLock锁来改进这种方式。