在上一章中,我们介绍了阻塞队列blockingqueue,下面我们介绍它的常用实现类arrayblockingqueue。
一. 用数组来实现队列
因为队列这种数据结构的特殊要求,所以它天然适合用链表的方式来实现,用两个变量分别记录链表头和链表尾,当删除或插入队列时,只要改变链表头或链表尾就可以了,而且链表使用引用的方式链接的,所以它的容量几乎是无限的。
那么怎么使用数组来实现队列,我们需要四个变量:object[] array来存储队列中元素,headindex和tailindex分别记录队列头和队列尾,count记录队列的个数。
- 因为数组的长度是固定,所以当count==array.length时,表示队列已经满了,当count==0的时候,表示队列是空的。
- 当添加元素的时候,将array[tailindex] = e将tailindex位置设置成新元素,之后将tailindex++自增,然后将count++自增。但是有两点需要注意,在添加之前必须先判断队列是否已满,不然会出现覆盖已有元素。当tailindex的值等于数组最后一个位置的时候,需要将tailindex=0,循环利用数组
- 当删除元素的时候,将先记录下array[headindex] 元素,之后将headindex++自增,然后将count--自减。但是有两点需要注意要注意,在删除之前,必须先判断队列是否为空,不然可能会删除已删除的元素。
这里用了一个很巧妙的方式,我们知道当向队列中插入一个元素,那么就占用了数组的一个位置,当删除一个元素的时候,那么其实数组的这个位置就空闲出来了,表示这个位置又可以插入新元素了。
所以我们插入新元素前,必须检查队列是否已满,删除元素之前,必须检查队列是否为空。
二. arrayblockingqueue中重要成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
/** 储存队列的中元素 */
final object[] items;
/** 队列头的位置 */
int takeindex;
/** 队列尾的位置 */
int putindex;
/** 当前队列拥有的元素个数 */
int count;
/** 用来保证多线程操作共享变量的安全问题 */
final reentrantlock lock;
/** 当队列为空时,就会调用notempty的wait方法,让当前线程等待 */
private final condition notempty;
/** 当队列为满时,就会调用notfull的wait方法,让当前线程等待 */
private final condition notfull;
|
就是多了lock、notempty、notfull变量来实现多线程安全和线程等待条件的,它们三个是怎么操作的呢?
2.1 lock的作用
因为arrayblockingqueue是在多线程下操作的,所以修改items、takeindex、putindex和count这些成员变量时,必须要考虑多线程安全问题,所以这里使用lock独占锁,来保证并发操作的安全。
2.2 notempty与notfull的作用
因为阻塞队列必须实现,当队列为空或队列已满的时候,队列的读取或插入操作要等待。所以我们想到了并发框架下的condition对象,使用它来控制。
在aqs中,我们介绍了这个类的作用:
- await系列方法,会释放当前锁,并让当前线程等待。
- signal与signalall方法,会唤醒当前线程。其实它并不是唤醒当前线程,而是将在这个condition条件上等待的线程,添加到lock锁上的等待线程池中,所以当锁被释放时,会唤醒lock锁上的等待线程池中一个线程。具体在aqs中有源码分析。
三. 添加元素方法
3.1 add(e e)与offer(e e)方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// 调用abstractqueue父类中的方法。
public boolean add(e e) {
// 通过调用offer来时实现
if (offer(e))
return true ;
else
throw new illegalstateexception( "queue full" );
}
//向队列末尾新添加元素。返回true表示添加成功,false表示添加失败,不会抛出异常
public boolean offer(e e) {
checknotnull(e);
final reentrantlock lock = this .lock;
// 使用lock来保证,多线程修改成员属性的安全
lock.lock();
try {
// 队列已满,添加元素失败,返回false。
if (count == items.length)
return false ;
else {
// 调用enqueue方法将元素插入队列中
enqueue(e);
return true ;
}
} finally {
lock.unlock();
}
}
|
add方法是调用offer方法实现的。在offer方法中,必须先判断队列是否已满,如果已满就直接返回false,而不会阻塞当前线程。如果不满就调用enqueue方法将元素插入队列中。
注意:这里使用lock.lock()是保证同一时间只有一个线程修改成员变量,防止出现并发操作问题。虽然它也会阻塞当前线程,但是它并不是条件等待,只是因为锁被其他线程持有,而arrayblockingqueue中方法操作时间都不长,这里相当于不阻塞线程。
3.2 put方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// 向队列末尾新添加元素,如果队列已满,当前线程就等待。响应中断异常
public void put(e e) throws interruptedexception {
checknotnull(e);
final reentrantlock lock = this .lock;
// 使用lock来保证,多线程修改成员属性的安全
lock.lockinterruptibly();
try {
// 队列已满,则调用notfull.await()方法,让当前线程等待,直到队列不是满的
while (count == items.length)
notfull.await();
// 调用enqueue方法将元素插入队列中
enqueue(e);
} finally {
lock.unlock();
}
}
|
与offer方法大体流程一样,只是当队列已满的时候,会调用notfull.await()方法,让当前线程阻塞等待,直到队列被别的线程移除了元素,队列不满的时候,会唤醒这个等待线程。
3.3 offer(e e, long timeout, timeunit unit)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
/**
* 向队列末尾新添加元素,如果队列中没有可用空间,当前线程就等待,
* 如果等待时间超过timeout了,那么返回false,表示添加失败
*/
public boolean offer(e e, long timeout, timeunit unit)
throws interruptedexception {
checknotnull(e);
// 计算一共最多等待的时间值nanos
long nanos = unit.tonanos(timeout);
final reentrantlock lock = this .lock;
// 使用lock来保证,多线程修改成员属性的安全
lock.lockinterruptibly();
try {
// 队列已满
while (count == items.length) {
// nanos <= 0表示最大等待时间已到,那么不用再等待了,返回false,表示添加失败。
if (nanos <= 0 )
return false ;
// 调用notfull.awaitnanos(nanos)方法,超时nanos时间会被自动唤醒,
// 如果被提前唤醒,那么返回剩余的时间
nanos = notfull.awaitnanos(nanos);
}
// 调用enqueue方法将元素插入队列中
enqueue(e);
return true ;
} finally {
lock.unlock();
}
}
|
与put的方法大体流程一样,只不过是调用notfull.awaitnanos(nanos)方法,让当前线程等待,并设置等待时间。
四. 删除元素方法
4.1 remove()和poll()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// 调用abstractqueue父类中的方法。
public e remove() {
// 通过调用poll来时实现
e x = poll();
if (x != null )
return x;
else
throw new nosuchelementexception();
}
// 删除队列第一个元素(即队列头),并返回它。如果队列是空的,它不会抛出异常,而是会返回null。
public e poll() {
final reentrantlock lock = this .lock;
// 使用lock来保证,多线程修改成员属性的安全
lock.lock();
try {
// 如果count == 0,列表为空,就返回null,否则调用dequeue方法,返回列表头元素
return (count == 0 ) ? null : dequeue();
} finally {
lock.unlock();
}
}
|
remove方法是调用poll()方法实现的。在 poll()方法中,如果列表为空,就返回null,否则调用dequeue方法,返回列表头元素。
4.2 take()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
/**
* 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。响应中断异常
*/
public e take() throws interruptedexception {
final reentrantlock lock = this .lock;
// 使用lock来保证,多线程修改成员属性的安全
lock.lockinterruptibly();
try {
// 如果队列为空,就调用notempty.await()方法,让当前线程等待。
// 直到有别的线程向队列中插入元素,那么这个线程会被唤醒。
while (count == 0 )
notempty.await();
// 调用dequeue方法,返回列表头元素
return dequeue();
} finally {
lock.unlock();
}
}
|
take()方法当队列为空的时候,当前线程必须等待,直到有别的线程向队列中插入新元素,那么这个线程会被唤醒。
4.3 poll(long timeout, timeunit unit)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/**
* 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。
* 如果等待时间超过timeout了,那么返回false,表示获取元素失败
*/
public e poll( long timeout, timeunit unit) throws interruptedexception {
// 计算一共最多等待的时间值nanos
long nanos = unit.tonanos(timeout);
final reentrantlock lock = this .lock;
// 使用lock来保证,多线程修改成员属性的安全
lock.lockinterruptibly();
try {
// 队列为空
while (count == 0 ) {
// nanos <= 0表示最大等待时间已到,那么不用再等待了,返回null。
if (nanos <= 0 )
return null ;
// 调用notempty.awaitnanos(nanos)方法让档期线程等待,并设置超时时间。
nanos = notempty.awaitnanos(nanos);
}
// 调用dequeue方法,返回列表头元素
return dequeue();
} finally {
lock.unlock();
}
}
|
与take()方法流程一样,只不过调用awaitnanos(nanos)方法,让当前线程等待,并设置等待时间。
五. 查看元素的方法
5.1 element()与peek() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// 调用abstractqueue父类中的方法。
public e element() {
e x = peek();
if (x != null )
return x;
else
throw new nosuchelementexception();
}
// 查看队列头元素
public e peek() {
final reentrantlock lock = this .lock;
// 使用lock来保证,多线程修改成员属性的安全
lock.lock();
try {
// 返回当前队列头的元素
return itemat(takeindex); // null when queue is empty
} finally {
lock.unlock();
}
}
|
六. 其他重要方法
6.1 enqueue和dequeue方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// 将元素x插入队列尾
private void enqueue(e x) {
// assert lock.getholdcount() == 1;
// assert items[putindex] == null; //当前putindex位置元素一定是null
final object[] items = this .items;
items[putindex] = x;
// 如果putindex等于items.length,那么将putindex重新设置为0
if (++putindex == items.length)
putindex = 0 ;
// 队列数量加一
count++;
// 因为插入一个元素,那么当前队列肯定不为空,那么唤醒在notempty条件下等待的一个线程
notempty.signal();
}
// 删除队列头的元素,返回它
private e dequeue() {
// assert lock.getholdcount() == 1;
// assert items[takeindex] != null;
final object[] items = this .items;
// 得到当前队列头的元素
@suppresswarnings ( "unchecked" )
e x = (e) items[takeindex];
// 将当前队列头位置设置为null
items[takeindex] = null ;
if (++takeindex == items.length)
takeindex = 0 ;
// 队列数量减一
count--;
if (itrs != null )
itrs.elementdequeued();
// 因为删除了一个元素,那么队列肯定不满了,那么唤醒在notfull条件下等待的一个线程
notfull.signal();
return x;
}
|
这两个方法分别代表,向队列中插入元素和从队列中删除元素。而且它们要唤醒等待的线程。当插入元素后,那么队列一定不为空,那么就要唤醒在notempty条件下等待的线程。当删除元素后,那么队列一定不满,那么就要唤醒在notfull条件下等待的线程。
6.2 remove(object o)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// 删除队列中对象o元素,最多删除一条
public boolean remove(object o) {
if (o == null ) return false ;
final object[] items = this .items;
// 使用lock来保证,多线程修改成员属性的安全
final reentrantlock lock = this .lock;
lock.lock();
try {
// 当队列中有值的时候,才进行删除。
if (count > 0 ) {
// 队列尾下一个位置
final int putindex = this .putindex;
// 队列头的位置
int i = takeindex;
do {
// 当前位置元素与被删除元素相同
if (o.equals(items[i])) {
// 删除i位置元素
removeat(i);
// 返回true
return true ;
}
if (++i == items.length)
i = 0 ;
// 当i==putindex表示遍历完所有元素
} while (i != putindex);
}
return false ;
} finally {
lock.unlock();
}
}
|
从队列中删除指定对象o,那么就要遍历队列,删除第一个与对象o相同的元素,如果队列中没有对象o元素,那么返回false删除失败。
这里有两点需要注意:
如何遍历队列,就是从队列头遍历到队列尾。就要靠takeindex和putindex两个变量了。
为什么object[] items = this.items;这句代码没有放到同步锁lock代码块内。items是成员变量,那么多线程操作的时候,不会有并发问题么?
这个是因为items是个引用变量,不是基本数据类型,而且我们对队列的插入和删除操作,都是针对这一个items数组,没有改变数组的引用,所以在lock代码中,items会得到其他线程对它最新的修改。但是如果这里将int putindex = this.putindex;方法lock代码块外面,就会产生问题。
removeat(final int removeindex)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
// 删除队列removeindex位置的元素
void removeat( final int removeindex) {
// assert lock.getholdcount() == 1;
// assert items[removeindex] != null;
// assert removeindex >= 0 && removeindex < items.length;
final object[] items = this .items;
// 表示删除元素是列表头,就容易多了,与dequeue方法流程差不多
if (removeindex == takeindex) {
// 移除removeindex位置元素
items[takeindex] = null ;
// 到了数组末尾,就要转到数组头位置
if (++takeindex == items.length)
takeindex = 0 ;
// 队列数量减一
count--;
if (itrs != null )
itrs.elementdequeued();
} else {
// an "interior" remove
final int putindex = this .putindex;
for ( int i = removeindex;;) {
int next = i + 1 ;
if (next == items.length)
next = 0 ;
// 还没有到队列尾,那么就将后一个位置元素覆盖前一个位置的元素
if (next != putindex) {
items[i] = items[next];
i = next;
} else {
// 将队列尾元素置位null
items[i] = null ;
// 重新设置putindex的值
this .putindex = i;
break ;
}
}
// 队列数量减一
count--;
if (itrs != null )
itrs.removedat(removeindex);
}
// 因为删除了一个元素,那么队列肯定不满了,那么唤醒在notfull条件下等待的一个线程
notfull.signal();
}
|
在队列中删除指定位置的元素。需要注意的是删除之后的数组还能保持队列形式,分为两种情况:
- 如果删除位置是队列头,那么简单,只需要将队列头的位置元素设置为null,将将队列头位置加一。
- 如果删除位置不是队列头,那么麻烦了,这个时候,我们就要将从removeindex位置后的元素全部左移一位,覆盖前一个元素。最后将原来队列尾的元素置位null。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.jianshu.com/p/766b97ba9d01