Java高并发程序设计笔记(六)之CopyOnWriteArrayList与BlockingQuene

时间:2021-10-04 23:50:54

高效读取:CopyOnWriteArrayList

JDK提供了CopyOnWriteArrayList,对于它来说读取是完全不用加锁的,写入也不会阻塞读取操作,只有写入和写入之间需要同步等待。这样一来度的性能就会大幅度提升。

CopyOnWrite就是在写入操作时,进行一次自我复制,换句话说。当这个List需要修改时,并不修改原有内容(这对于保证当前读线程的数据一致性非常重要),而是对原有数据进行一次复制,将修改的内容保存到副本中去。写完之后再将修改完的副本替换原来的数据。这样就可以保证读操作不受影响了。

下面代码展示了有关读取的实现

private volatile transient Object[] array;
public E get(int index){
return get(get Array(),index);
}
final Object getArray(){
return array;
}
private E get(Object[] a, int index){
return (E) a[index];
}

读代码没有任何同步控制和锁控制,因为内部array不会发生修改,只会被另一个Array替换,因此可以保证数据安全。

写操作代码如下

public boolean add E{
final ReentrantLock lock=this.lock;
lock.lock;
try{
Object[] elements=getArray();
int len=elements.length;
Object[] newElements=Array.copyOf(elements,lenth+1);
newElements[len]=e;
setArray(newElements);
return true;
}
finanlly{
lock.unlock();
}
}

写入操作使用锁,这个锁仅用于控制写写的情况。修改完成后,读线程可以立即察觉到这个修改,因为array变量是volatile类型。

数据共享通道:BlockingQuene

Blocking是阻塞的意思,当服务线程处理完队列中的所有消息之后,他如何知道下一条消息的到来的呢?它会让服务线程在队列为空时,进行等待,当有新消息进入队列时,自动将线程唤醒,以ArraBlockingQuene为例,内部元素都放置在一个对象数组中:

final Object[] items;

put( ) take( )方法是Blocking的关键
put( )方法是将元素压入队列末尾,如果队列满了他会一直等待,直到队列中有空闲位置。
take( )方法是弹出元素,而队列为空时,take( )方法会等待,直到队列内有可用元素。

为了做好等待和通知两件事,在BlockingQuene内部定义了以下字段:

final ReentrantLock lock ;
private final Condition notEmpty;
private final COndition notFull;

当执行take( )操作时,如果队列为空,则让线程等待在notEmpty上

public E take() throws InterruptException{
final ReentrantLock lock=this.lock();
lock.lockInterruptibly();
try{
while(count==0)
{
notEmpty.await();
}
return extract();
}finally{
lock.unlock();
}
}

当队列中有新元素时,线程会得到一个通知,下面是元素入队时的一段代码

private void insert(E x){
items[putIndex]=x;
putIndex=inc(putIndex);
++count;
notEmpty.signal();
}

同理,对于put( )操作也是一样的,当队列满时,需要让压入线程等待

public void put(E e) throws InterruptException{
checkNotNull(e);
final ReentrantLock lock=this.lock();
lock.lockInterruptibly();
try{
while(count==items.length)
{
notFull.await();
}
insert(e);
finally{
lock.unlock();
}
}
}

当元素从队列中被挪走,队列中出现空位时,也需要等待入队的线程

private E extract(){
final Object[] items=this.items;
E x=this.<E>cast(items[takeIndex]);
items[takeIndex]=null;
takeIndex=inc(takeIndex);
--count;
notFull.signal();
return x;
}