Java多线程之带你认识不一样的生产者消费者(一)

时间:2021-02-12 17:31:24

Java多线程之带你认识不一样的生产者消费者(一)

生产者/消费者模式是等待/通知最经典的案例。这种模式有不少变形,同时在使用上有不少药注意的细节,但是万变不离其宗,它的原理都是基于wait()和notify();只要掌握了这些基本的原理,了解它的变形式也不在话下。我们首先来了解一下wait()和notify();

1、wait()/notify()

wait()、notify()和notifyAll()都是属于基类Object的一部分,不属于Thread的一部分。实际上只能在同步代码块里面调用这几个方法,如果在非同步代码块调用这些方法,程序可以编译但是会报IllegalMonitorStateException异常。

1.1.1wait()的两种形式

(1). 第一种接受毫秒作为参数,含义与sleep()方法里面的参数相同,都是指”在此期间暂停”,但是与sleep()不同的是,对于wait()而言:

  • 在wait()期间对象锁是释放的
  • 可以通过notify()和notifyAll(),或者令时间到期,让线程不在等待
  • 这里需要注意一点,sleep()和yield()方法不会使锁释放,也就是说,线程无法被挂起。

(2). 这种事更加常用的形式,就是wait()方法不接受任何参数,这种wait()将一直等待下去,直到接收notify()和notifyAll()的消息。

1.1.2 notify()/notifyAll()

  • notify()是唤醒某一个等待的线程,notifyAll()是唤醒所有的线程,前提的多个线程共用的是一把锁

2 生产者消费者模式的实现

2.1单个生产和单个消费 操作值

创建名称为Task的项目,并创建名称为P的类,代码如下

  • 生产者

package valueobjectwait;

public class P {

private String lock;
public P(String lock) {
super();
this.lock = lock;
}
public void setValue(){
try {
synchronized (lock) {
if (!ValueObject.value.equals("")) {
lock.wait();
}
String value = Thread.currentThread().getName()+ System.currentTimeMillis()+"_"+System.nanoTime();
Thread.sleep(500);
System.out.println("set 的值是"+value);
ValueObject.value = value;
lock.notify();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

创建名称为C的类,代码如下

  • 消费者

package valueobject;

public class C {

private String lock;
public C(String lock) {
super();
this.lock = lock;
}
public void getValue(){
try {
synchronized (lock) {

if (ValueObject.value.equals("")) {
lock.wait();
}
//String value = System.currentTimeMillis()+"_"+System.nanoTime();
Thread.sleep(500);
System.out.println("get 的值是"+Thread.currentThread().getName()+ValueObject.value);
ValueObject.value = "";
lock.notify();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

创建存储值的对象ValueObject,代码如下



package valueobject;

public class ValueObject {

public static String value = "";
}

创建两个工作的线程,一个是生产者线程,一个是消费者线程,代码如下:

  • 生产者线程

package valueobject;

public class P_Thread extends Thread{

private P p;
public P_Thread(P p) {
super();
this.p = p;
}
@Override
public void run() {
super.run();
while (true) {
p.setValue();
}
}
}



  • 消费者线程

package valueobject;

public class C_Thread extends Thread{

private C c;
public C_Thread(C p) {
super();
this.c = p;
// TODO Auto-generated constructor stub
}

@Override
public void run() {
// TODO Auto-generated method stub
super.run();
while (true) {
c.getValue();
}
}
}


所有的工作都已经完成了,接下来就是运行看一下效果了,运行的类如下


package valueobject;

public class Run {

public static void main(String[] args) {
String myStack = new String("");

P p = new P(myStack);
C c = new C(myStack);

P_Thread pThread = new P_Thread(p);
C_Thread cThread = new C_Thread(c);


//一個生產者 一個消費者
pThread.start();
cThread.start();



set 的值是Thread-01494127022759_55606521773743

get 的值是Thread-1Thread-01494127022759_55606521773743

set 的值是Thread-01494127023760_55607523019188

get 的值是Thread-1Thread-01494127023760_55607523019188

set 的值是Thread-01494127024760_55608523121778

get 的值是Thread-1Thread-01494127024760_55608523121778

set 的值是Thread-01494127025760_55609523255829

get 的值是Thread-1Thread-01494127025760_55609523255829

set 的值是Thread-01494127026760_55610523329694

get 的值是Thread-1Thread-01494127026760_55610523329694

set 的值是Thread-01494127027761_55611523256513

get 的值是Thread-1Thread-01494127027761_55611523256513

set 的值是Thread-01494127028761_55612523078690

get 的值是Thread-1Thread-01494127028761_55612523078690

set 的值是Thread-01494127029761_55613523735951

get 的值是Thread-1Thread-01494127029761_55613523735951

}
}

这是一个生产者和一个消费者的实现,我们从结果可以看书,两个线程是交替的打印,实现了我们要的效果,两条线程间实现了良好的交互效果。
但是我们如果实现多生产和多消费会出现什么效果呢!让我们拭目以待

2.2 多个生产和多个消费 操作值


package valueobject;

public class Run {

public static void main(String[] args) {
String myStack = new String("");

P p = new P(myStack);
C c = new C(myStack);
P_Thread[] pThread = new P_Thread[3];
C_Thread[] cThread = new C_Thread[3];


for (int i = 0; i < 3; i++) {
pThread[i] = new P_Thread(p);
pThread[i].setName("生產者"+(i+1));
cThread[i] = new C_Thread(c);
cThread[i] .setName("消費者"+(i+1));

pThread[i].start();
cThread[i].start();

}

}
}

get 的值是消費者1-生產者3-1494128470358-57054094666814

get 的值是消費者3-

get 的值是消費者1-

set 的值是生產者3-1494128472358-57056094787870

set 的值是生產者2-1494128472858-57056594779663

get 的值是消費者2-生產者2-1494128472858-57056594779663

set 的值是生產者1-1494128473858-57057595001258

get 的值是消費者2-生產者1-1494128473858-57057595001258

set 的值是生產者2-1494128474858-57058594798129

set 的值是生產者3-1494128475358-57059094783083

get 的值是消費者1-生產者3-1494128475358-57059094783083


get 的值是消費者3-

get 的值是消費者1-


这是截取部分的打印结果,会出现一种很奇怪的现象,就是取出来的值为空字符串。这就让人费解了,我们明明判断了如果为空就等待,为什么还会打印出空字符串呢?
其实也不难理解,因为我们是多个生产者和消费者,消费者线程唤醒的不一定是生产者线程,也有可能是消费者线程,一个为空的消费者正在等待,一个消费完成的消费者就会释放锁,唤醒其他线程,正好唤醒了为空的消费者等待线程,所以就会打印出空字符串。
那么我们如何来解决这个问题呢下面来看解决办法。



package valueobject;

public class C {

private String lock;
public C(String lock) {
super();
this.lock = lock;
}
public void getValue(){

try {
synchronized (lock) {
//这里讲if改成while
while (ValueObject.value.equals("")) {
// System.out.println("线程-------"+Thread.currentThread().getName()+"等待");
lock.wait();
}
//String value = System.currentTimeMillis()+"_"+System.nanoTime();
Thread.sleep(500);
System.out.println("get 的值是"+Thread.currentThread().getName()+"-"+ValueObject.value);
ValueObject.value = "";
lock.notify();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

package valueobject;

public class P {

private String lock;
public P(String lock) {
super();
this.lock = lock;
}
public void setValue(){
try {
synchronized (lock) {

while (!ValueObject.value.equals("")) {
//System.out.println("线程-------"+Thread.currentThread().getName()+"等待");
lock.wait();
}
String value = Thread.currentThread().getName()+"-"+ System.currentTimeMillis()+"-"+System.nanoTime();
Thread.sleep(500);
System.out.println("set 的值是"+value);
ValueObject.value = value;
lock.notify();


}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}


}

}


这样就不会出现为空的现象了,但是我们发现又出现一个问题。就是程序会出现假死的现象。控制台输出如下:

set 的值是生產者1-1494144118506-72700875012310

get 的值是消費者3-生產者1-1494144118506-72700875012310

set 的值是生產者2-1494144119508-72701876916384

只打印了三条信息就停止了,说明所有的程序都在等待。情况和上文分析的一样,生产者唤醒了生产者,消费者唤醒了消费者,导致条件都是满足等待的条件,所以线程都会等待,造成程序的假死现象。那么解决的办法是什么呢?其实很简单。
就是将 notify()改成notifyAll();

这里只贴出生产者的代码,消费者同理。


package valueobject;

public class C {

private String lock;
public C(String lock) {
super();
this.lock = lock;
}
public void getValue(){
try {
synchronized (lock) {
while (ValueObject.value.equals("")) {
// System.out.println("线程-------"+Thread.currentThread().getName()+"等待");
lock.wait();
}
//String value = System.currentTimeMillis()+"_"+System.nanoTime();
Thread.sleep(500);
System.out.println("get 的值是"+Thread.currentThread().getName()+"-"+ValueObject.value);
ValueObject.value = "";
lock.notifyAll();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}


2.3 单个/多个 生产和单个/多个 消费 操作栈

这里的生产者和消费者和上面操作值时候的一致,就不在分开介绍了直接一步到位。


package entity;

import java.util.ArrayList;
import java.util.List;

public class MyStack {

private List list = new ArrayList<>();

synchronized public void push() {
try {
while (list.size() == 1) {
this.wait();
}
list.add("anything =" + Math.random());
// this.notify();
this.notifyAll();

System.out.println("push = " + list.size());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

synchronized public String pop() {

String returnVlaue = "";
try {

// 如果是if 会出现这个错误
/*
* push = 1 pop = 0 pop = anything =0.5275243082532858 pop操作中的 :
* Thread-4线程呈wait状态 pop操作中的 : Thread-3线程呈wait状态 pop操作中的 :
* Thread-2线程呈wait状态 pop操作中的 : Thread-1线程呈wait状态 pop操作中的 :
* Thread-5线程呈wait状态 push = 1 pop = 0 pop = anything
* =0.19530279243959514 pop操作中的 : Thread-4线程呈wait状态 Exception in
* thread "Thread-3" java.lang.IndexOutOfBoundsException: Index: 0,
* Size: 0 at java.util.ArrayList.rangeCheck(Unknown Source) at
* java.util.ArrayList.get(Unknown Source) at
* entity.MyStack.pop(MyStack.java:38) at
* entity.C.popService(C.java:15) at
* entity.C_Thread.run(C_Thread.java:21)
*
*/

/*
* if (list.size() == 0) {
* System.out.println("pop操作中的 : "+Thread.currentThread().getName()
* +"线程呈wait状态"); this.wait(); }
*/


while (list.size() == 0) {
System.out.println("pop操作中的 : " + Thread.currentThread().getName() + "线程呈wait状态");
this.wait();
}

returnVlaue = "" + list.get(0);
list.remove(0);

// list.add("anything ="+Math.random());

// 这里是notify 上面同理
/**
* push = 1 pop = 0 pop = anything =0.08657286040254364
* pop操作中的 :Thread-1线程呈wait状态
* pop操作中的 : Thread-5线程呈wait状态
* pop操作中的 :
* Thread-4线程呈wait状态
* pop操作中的 : Thread-3线程呈wait状态
* pop操作中的 :
* Thread-2线程呈wait状态 push = 1 pop = 0 pop = anything
* =0.39817146563953265
* pop操作中的 : Thread-1线程呈wait状态
* pop操作中的 :
* Thread-5线程呈wait状态
*
*/

//this.notify();
this.notifyAll();

System.out.println("pop = " + list.size());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return returnVlaue;
}

}



package entity;

public class C {

private MyStack myStack;
public C(MyStack stack){
super();
this.myStack = stack;

}

public void popService(){
System.out.println("pop = "+myStack.pop());
}
}

public class P {

private MyStack myStack;
public P(MyStack stack){
super();
this.myStack = stack;

}
public void pushService(){
myStack.push();
}
}

package entity;

public class C_Thread extends Thread{
private C c;



public C_Thread(C p) {
super();
this.c = p;
// TODO Auto-generated constructor stub
}


@Override
public void run() {
// TODO Auto-generated method stub
super.run();

while (true) {
c.popService();
}
}

}


package entity;

public class P_Thread extends Thread{
private P p;



public P_Thread(P p) {
super();
this.p = p;
// TODO Auto-generated constructor stub
}


@Override
public void run() {
// TODO Auto-generated method stub
super.run();

while (true) {
p.pushService();
}
}

}

package entity;

public class Run {

public static void main(String[] args) {

// 一个生产一个消费
MyStack myStack = new MyStack();
/*
* P p = new P(myStack); C c = new C(myStack);
*
* P_Thread pThread = new P_Thread(p); C_Thread cThread = new
* C_Thread(c);
*
* pThread.start(); cThread.start();
*/


// 一个生产多个消费

/*
* P p = new P(myStack); P_Thread pThread = new P_Thread(p);
*
* pThread.start();
*
* C c = new C(myStack); C c1 = new C(myStack); C c2 = new C(myStack); C
* c3 = new C(myStack); C c4 = new C(myStack); C_Thread cThread = new
* C_Thread(c); C_Thread cThread1 = new C_Thread(c1); C_Thread cThread2
* = new C_Thread(c2); C_Thread cThread3 = new C_Thread(c3); C_Thread
* cThread4 = new C_Thread(c4);
*
* cThread.start(); cThread1.start(); cThread2.start();
* cThread3.start(); cThread4.start();
*/


// 多生产与单个消费
P p = new P(myStack);
P p1 = new P(myStack);
P p2 = new P(myStack);
P p3 = new P(myStack);

P_Thread pThread = new P_Thread(p);
P_Thread pThread1 = new P_Thread(p1);
P_Thread pThread2 = new P_Thread(p2);
P_Thread pThread3 = new P_Thread(p3);
pThread.start();

C c = new C(myStack);
/*
* C c1 = new C(myStack); C c2 = new C(myStack); C c3 = new C(myStack);
* C c4 = new C(myStack);
*/

C_Thread cThread = new C_Thread(c);
/*
* C_Thread cThread1 = new C_Thread(c1); C_Thread cThread2 = new
* C_Thread(c2); C_Thread cThread3 = new C_Thread(c3); C_Thread cThread4
* = new C_Thread(c4);
*/


cThread.start();
/*
* cThread1.start(); cThread2.start(); cThread3.start();
* cThread4.start();
*/

// 多生产与多消费
P p0 = new P(myStack);
P p10 = new P(myStack);
P p20 = new P(myStack);
P p30 = new P(myStack);

P_Thread pThread0 = new P_Thread(p0);
P_Thread pThread10 = new P_Thread(p10);
P_Thread pThread20 = new P_Thread(p20);
P_Thread pThread30 = new P_Thread(p30);
pThread0.start();
pThread10.start();
pThread20.start();
pThread30.start();



C c0 = new C(myStack);
C c1 = new C(myStack);
C c2 = new C(myStack);
C c3 = new C(myStack);
C c4 = new C(myStack);
C_Thread cThread0 = new C_Thread(c0);
C_Thread cThread1 = new C_Thread(c1);
C_Thread cThread2 = new C_Thread(c2);
C_Thread cThread3 = new C_Thread(c3);
C_Thread cThread4 = new C_Thread(c4);

cThread0.start();
cThread1.start();
cThread2.start();
cThread3.start();
cThread4.start();

}
}

这里就是操作栈的所有代码,注释都写在代码中,还有输出的值。有兴趣的同学可以验证一下。

2.4 生产者消费者队列

看到这里你是否觉得上面的生产者消费者太麻烦了呢,每一次都要wait()和nofity();一不留心还要写错。同时wait()和notifyAll()方法以一种非常低级的方式解决了任务互操作的问题,即每一次操作都要握手。在许多种情况下,我们可以瞄向更高的抽象级别,使用同步队列来解决问题,同步队列中在任何时刻都只允许一个任务插入或移除元素。


package Queue;
import java.util.concurrent.BlockingQueue;
public class QueueRunnable implements Runnable {
private BlockingQueue<String> mQueue;
public QueueRunnable(BlockingQueue<String> mQueue) {
super();
this.mQueue = mQueue;
}
@Override
public void run() {
// TODO Auto-generated method stub
while (!Thread.interrupted()) {
try {
mQueue.put("---");
System.out.println("生產者"+mQueue.size());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

//消费者线程
package Queue;
import java.util.concurrent.BlockingQueue;
public class QueueRunnable2 implements Runnable {
private BlockingQueue<String> mQueue;
public QueueRunnable2(BlockingQueue<String> mQueue) {
super();
this.mQueue = mQueue;
}
@Override
public void run() {
// TODO Auto-generated method stub
while (!Thread.interrupted()) {
try {
String string = mQueue.take();

System.out.println("消費者"+mQueue.size()+string);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}



package Queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;

public class Run {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new
// LinkedBlockingQueue<String>(10);
// ArrayBlockingQueue<>(10);
SynchronousQueue<>();

ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new QueueRunnable2(queue));
Thread.sleep(1000);
executorService.execute(new QueueRunnable(queue));
}
}

如果消费者任务试图从队列中获取对象,该队列此时为空,那么这些队列可以挂起消费者任务,并且当有更多的元素可用时恢复消费者任务,阻塞队列可以解决非常大量的问题,而其方式与wait()和notifyAll()相比,则简单可靠的多。同时有的队列可以指定大小LinkedBlockingQueue(10),就是长度不超过10,超过了则阻塞,等待消费者消费。