聊聊高并发(十四)理解Java中的管程,条件队列,Condition以及实现一个阻塞队列

时间:2021-01-17 18:02:45

这篇里面有一些基本的概念,理解概念是件有意义的事情,只有理解概念才能在面对具体问题的时候找到正确的解决思路。先看一下管程的概念

第一次在书上看到管程这个中文名称觉得很迷糊,管程到底是个什么东东,于是去找了英文原本对照一看,英文是Monitor,这不是监视器吗,更加迷糊了,为啥翻译成管程?去百科上搜了下管程,管程的定义如下:“一个管程定义了一个数据结构和能够并发进程所执行的一组操作,这组操作能同步进程和改变管程中的数据”。

从这个定义中可以看到管程其实和类的概念很相似,类是广义的封装了数据和方法,而管程不仅包含了数据和方法,并且它的方法能够同步并发进程的操作。

1. 数据

2. 方法

3. 它的方法能够同步并发进程的操作


说白了,管程就是一个专门为并发编程提出的概念,它表示一个对象自己维护自己的状态,并且能够根据自身状态来同步并发的线程操作,而不是把这种同步的手段交给调用者来处理。举个例子来说,有一个有界队列,它提供了put和take方法,由于是有界,那么问题来了:

1. 队列满时,不能加入队列,线程得等待

2. 队列空时,不能从队列取元素,线程得等待

如果让调用者自己来控制这种状态,那么代码可能如下,通过不断轮询状态,直到退出轮询

                while(true){
if(array.isFull()){
Thread.sleep(100);
}
}

这种方式是非常低效并且存在问题的,因为在并发情况下,如果不加锁的话,状态是难以控制的。

所以一种更好的方法是使用管程这种结构,由并发对象自己控制自己的状态并来同步线程操作。


接下来看下条件谓词的概念,谓词就是动词,表示一种动作。条件谓词指的是检查管程状态的动作,比如

1. isFull 是否满

2. isEmpty 是否空

条件谓词是状态改变操作的前提条件,需要不断的轮询条件谓词直到满足才能进行状态改变操作。


再看条件队列这个概率,条件队列指的是一组在等待某个条件变成真的线程,队列中的元素是线程


一个条件队列肯定和一个锁相关联。比较每个Java对象都有一个内置锁,用synchronized操作可以获得内置锁,同样,每个Java对象都有一个条件队列,当需要获得内置锁时,并发的线程就进入了条件队列, Object的wait(), notify(), notifyAll()操作可以操作条件队列。

1. wait()方法将会让当前线程进入条件队列等待,并且释放锁。 这点和Thread.sleep不一样,Thread.sleep会让线程睡眠,但是不释放锁。

    需要注意的是wait()方法的退出条件是它被notify或者notifyAll方法唤醒了,并且在又一次的锁竞争中获得了锁,也就说,当wait方法退出时,当前线程还是是持有锁的。

2. notify()方法,从条件队列的线程中随即唤醒一个线程,并让它去参与锁竞争

3. notifyAll()方法,唤醒条件队列中所有的等待线程,让它们参与锁竞争


Java 1.5之后新增了显式锁的接口java.util.concurrent.locks.Lock接口,同样提供了显式的条件接口Condition,并对条件队列进行了增强。

一个内置锁只能对应一个条件队列,这有个缺陷,就是当一个锁对应多个条件谓词时,多个条件谓词只能公用一个条件队列,这时候唤醒等待线程时有可能出现唤醒丢失的情况。比如上面有界队列的情况,有两个条件谓词 isFull 和 isEmpty,当对两个条件谓词都进行wait()时,如果使用notify()方法来唤醒的话,只是会从条件队列中选取一个线程,并不知道这个线程是在哪个条件谓词上等待,这就出现了所谓的唤醒丢失的情况。所以使用内置条件队列时,最好使用notifyAll()方法来唤醒所有的线程,避免出现唤醒丢失这个活跃性问题。但是notifyAll是一个重的方法,它会带来大量的上下文切换和锁竞争。


显式锁和显式条件队列避免了这个问题,一个显示锁可以对应多个条件Condition,一个Condition维护一个条件队列,这样对于多个条件谓词,比如isFull和isEmpty,可以使用两个Condition,对每个条件谓词单独await,唤醒时可以单独signal,效率更高。

public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
// 创建一个条件
Condition newCondition();
}

// Condition接口封装了条件队列的方法
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}



后面会有具体的例子来比较使用内次锁和内置条件队列以及使用显式锁和显式条件队列的区别。


可以看到管程中能够根据状态同步线程操作(主要是让线程等待)的方法的写法有固定的流程,是个三元组: 锁,条件谓词,wait()方法

1. 先获得锁

2. 轮询条件谓词直到满足条件

3. 一个wait要对应一个notify或notifyAll。值得注意的是,使用wait()方法必须要先获取锁

对于内置锁,写法如下

                public synchronized void put(T item) throws InterruptedException {
while(isFull()){
wait();
}
......
}

对应显式锁,写法如下

                public void put(T item) throws InterruptedException {
lock.lock();
try {
while (count == array.length) {
isFull.await();
}
}finally{
lock.unlock();
}
}

下面我们使内置锁和内置条件队列实现一个阻塞队列:

1. 有两个条件谓词 isFull()和 isEmpty()来判断队列是否满和是否空

2. 当put方法时,先获取内置锁,然后轮询isFull()状态,如果满就使用内置条件队列的wait()方法让线程等待。

    当不满时,wait()方法会被notify唤醒,然后竞争锁,直到获得锁,进入下面的流程

    修改完状态后,需要调用notifyAll()方法做一次唤醒操作,需要注意的时,put方法里面的notifyAll是为了唤醒在isEmpty条件谓词等待的线程。但是由于一个内置锁只能有一个条件队列,所以notifyAll也会唤醒在isFull条件谓词等待的线程,这样会带来性能的消耗。

     如果这里使用notify()方法,就会发生唤醒丢失,因为notify()方法只负责唤醒条件队列的一个线程,不知道它在哪个条件谓词等待。如果唤醒的是在isFull条件谓词等待的线程时,就发生了唤醒丢失。

3. take方法同put方法一样,只是take在isEmpty条件谓词等待,修改完状态后,同样需要notifyAll所有的线程来竞争锁。


package com.zc.lock;

public class BlockingArray<T> {
private final T[] array;

private int head;

private int tail;

private int count;

public BlockingArray(int size){
array = (T[])new Object[size];
}

public synchronized void put(T item) throws InterruptedException {
while(isFull()){
wait();
}

array[tail] = item;
if(++ tail == array.length){
tail = 0;
}
count ++;
System.out.println("Add item: " + item);
// 通知条件队列有元素进入
notifyAll();
}

public synchronized T take() throws InterruptedException {
while(isEmpty()){
wait();
}

T item = array[head];
if(++ head == array.length){
head = 0;
}
count --;
System.out.println("Take item: " + item);
// 通知条件队列有元素出去
notifyAll();
return item;
}

public synchronized boolean isFull(){
return count == array.length;
}

public synchronized boolean isEmpty(){
return count == 0;
}
}

下面有显式锁Lock和显式条件Condition来实现一个阻塞队列

1. 定义了一个ReentrantLock显式锁

2. 由这个显式锁创建两个条件对应isFull条件谓词和isEmpty条件谓词,这两个条件都是绑定的同一个Lock对象

3. put方法时,先获得显式锁,然后轮询队列是否满,如果满了就用Condition的await()来让线程等待。当队列不满时,await()方法被signal()方法唤醒,竞争锁直到退出await()方法。修改完状态会,单独对isEmpty的条件谓词唤醒,使用isEmpty条件的signal方法单独对在isEmpty等待的线程唤醒,这样效率比notifyAll高很多

4. take方法和put原理一样

package com.zc.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingArrayWithCondition<T> {
private final T[] array;

private int head;

private int tail;

private int count;

private java.util.concurrent.locks.Lock lock = new ReentrantLock();

private Condition isFull = lock.newCondition();

private Condition isEmpty = lock.newCondition();

public BlockingArrayWithCondition(int size) {
array = (T[]) new Object[size];
}

public void put(T item) throws InterruptedException {
lock.lock();
try {
while (count == array.length) {
isFull.await();
}

array[tail] = item;
if (++tail == array.length) {
tail = 0;
}
count++;
System.out.println("Add item: " + item);
// 通知isEmpty条件队列有元素进入
isEmpty.signal();
} finally {
lock.unlock();
}
}

public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
isEmpty.await();
}

T item = array[head];
if (++head == array.length) {
head = 0;
}
count--;
System.out.println("Take item: " + item);
// 通知isFull条件队列有元素出去
isFull.signal();
return item;
} finally {
lock.unlock();
}
}
}

下面我们写一个测试用例对这个阻塞队列进行测试

1. 使用100个线程往阻塞队列里面put() 1到100的数字

2. 使用100个线程从阻塞队列take一个数

3. 最后的结果应该是放入了1到100个数字,取出了1到100个数字,不会有重复数字,也不会有数字丢失

4. 一个数肯定是先put后take

package com.zc.lock;

import java.util.concurrent.atomic.AtomicInteger;

public class BlockingArrayTest {
public static void main(String[] args){
//final BlockingArray<Integer> blockingArray = new BlockingArray<Integer>(10);

final BlockingArrayWithCondition<Integer> blockingArray = new BlockingArrayWithCondition<Integer>(10);


final AtomicInteger count = new AtomicInteger(0);

for(int i = 0; i < 100; i ++){
Thread t = new Thread(new Runnable(){

@Override
public void run() {
try {
blockingArray.put(count.incrementAndGet());
} catch (InterruptedException e) {
e.printStackTrace();
}
}

});
t.start();
}

for(int i = 0; i < 100; i ++){
Thread t = new Thread(new Runnable(){

@Override
public void run() {
try {
blockingArray.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

});
t.start();
}

}
}

测试结果如下,证明阻塞队列的实现是正确的:

1. 放入了100个数,取出了100个数,没有重复的数字,也有没有数字丢失

2. 数字先放入后取出

Add item: 1
Add item: 2
Add item: 3
Add item: 4
Add item: 5
Add item: 6
Add item: 7
Add item: 8
Add item: 9
Add item: 10
Take item: 1
Take item: 2
Add item: 11
Add item: 12
Take item: 3
Take item: 4
Add item: 13
Take item: 5
Take item: 6
Take item: 7
Take item: 8
Take item: 9
Add item: 14
Take item: 10
Take item: 11
Take item: 12
Add item: 15
Take item: 13
Take item: 14
Take item: 15
Add item: 16
Add item: 17
Add item: 18
Take item: 16
Take item: 17
Take item: 18
Add item: 19
Take item: 19
Add item: 20
Take item: 20
Add item: 21
Take item: 21
Add item: 22
Take item: 22
Add item: 23
Add item: 24
Take item: 23
Take item: 24
Add item: 25
Take item: 25
Add item: 26
Take item: 26
Add item: 27
Take item: 27
Add item: 28
Take item: 28
Add item: 29
Take item: 29
Add item: 30
Take item: 30
Add item: 31
Take item: 31
Add item: 32
Take item: 32
Add item: 33
Take item: 33
Add item: 34
Take item: 34
Add item: 35
Take item: 35
Add item: 36
Take item: 36
Add item: 37
Take item: 37
Add item: 38
Take item: 38
Add item: 39
Take item: 39
Add item: 40
Take item: 40
Add item: 41
Take item: 41
Add item: 42
Take item: 42
Add item: 43
Take item: 43
Add item: 44
Take item: 44
Add item: 45
Take item: 45
Add item: 46
Take item: 46
Add item: 47
Take item: 47
Add item: 48
Take item: 48
Add item: 49
Take item: 49
Add item: 50
Take item: 50
Add item: 51
Take item: 51
Add item: 52
Take item: 52
Add item: 53
Take item: 53
Add item: 54
Take item: 54
Add item: 55
Take item: 55
Add item: 56
Take item: 56
Add item: 57
Take item: 57
Add item: 58
Take item: 58
Add item: 59
Take item: 59
Add item: 60
Take item: 60
Add item: 61
Take item: 61
Add item: 62
Take item: 62
Add item: 63
Take item: 63
Add item: 64
Take item: 64
Add item: 65
Take item: 65
Add item: 66
Take item: 66
Add item: 67
Take item: 67
Add item: 68
Take item: 68
Add item: 69
Take item: 69
Add item: 70
Take item: 70
Add item: 71
Take item: 71
Add item: 72
Take item: 72
Add item: 73
Take item: 73
Add item: 74
Take item: 74
Add item: 75
Take item: 75
Add item: 76
Take item: 76
Add item: 77
Take item: 77
Add item: 78
Take item: 78
Add item: 79
Take item: 79
Add item: 80
Take item: 80
Add item: 81
Take item: 81
Add item: 82
Take item: 82
Add item: 83
Take item: 83
Add item: 84
Take item: 84
Add item: 85
Take item: 85
Add item: 86
Take item: 86
Add item: 87
Take item: 87
Add item: 88
Take item: 88
Add item: 89
Take item: 89
Add item: 90
Take item: 90
Add item: 91
Take item: 91
Add item: 92
Take item: 92
Add item: 93
Take item: 93
Add item: 94
Take item: 94
Add item: 95
Take item: 95
Add item: 96
Take item: 96
Add item: 97
Take item: 97
Add item: 98
Take item: 98
Add item: 99
Take item: 99
Add item: 100
Take item: 100




转载请注明来源: http://blog.csdn.net/iter_zc