深入java并发Lock一

时间:2021-04-22 03:24:15

java有像syncronized这种内置锁,但为什么还须要lock这种外置锁?



性能并非选择syncronized或者lock的原因,jdk6中syncronized的性能已经与lock相差不大。



假设要选择lock的话,会基于lock拥有的几个长处(内置锁所不具备):

 1.假设希望当获取锁时,有一个等待时间,不会无限期等待下去。

  2.希望当获取不到锁时,可以响应中断

  3.当读多,写少的应用时,希望提高性能

 4.获取不到锁时,马上返回false。获取到锁时返回true。



lock接口定义下面方法:

public interface Lock {



    void lockInterruptibly() throws InterruptedException;



    boolean tryLock();



    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;



    void unlock();



    Condition newCondition();

}



当中lockInterruptibly(),表明加锁时,当前拥有这个锁的线程可被中断。

tryLock()则用于尝试获取锁,能获取返回true,否则返回false。

tryLock(long time, TimeUnit unit),与tryLock类似,仅仅是会尝试一定的时间后再依据是否可以获取锁返回对应的true或false。

unlock()用于拥有锁的线程释放锁。



newCondition()方法之后介绍。



有些操作须要满足一些前提条件才干进行,这就涉及状态并发的控制。

如一个有界缓存,对于存放操作须要推断当前缓存是否满了,满了的话须要堵塞等待。不满则放入数据,并唤醒等待取数据的线程。

对于取操作,须要推断当前缓存是否非空,为空则堵塞等待。不为空则取出数据,并唤醒堵塞的进行存放操作的线程。



考虑,这种一个有界缓存怎样设计?



首先对于存放数据的数据结构能够是数组或者是一个链表。

这里我们如果选择数组。

然后定义两个操作方法,一个是存放数据到缓存的方法put,一个是取数据的方法take。

还须要一个int型的count代表当前已有元素数量,int型的header用于指向当前要取元素的位置,一个tail用于指向当前存放元素的位置。

接着关键是要保证put与take在并发的情况下,保证数据操作完整性,不出现异常行为。

这就须要保证并发调用put操作时是加锁相互排斥的,否则会发生下面情况:

当前缓存数组大小为3,当前已经在缓存的数据有两个。

这时线程一进行下面存放步骤操作:

  1.线程一首先推断当前数组是否未满

  2.这时未满接着线程一往缓存存数据

但当线程一进行第二步操作:往缓存存数据时,线程二提前将数据放入缓存,这时数组大小为3

这样线程一再往数组放数据时,就超出数组长度了。



所以put操作必需同步控制。

其次,因为须要保存count当前元素数量,因此也须要保证存取操作put及take方法相互排斥。

简单实现上述buffer代码例如以下:

public class BoundedBuffer{

	private static final BoundedBuffer bufferInstance = new BoundedBuffer();

	private static final int DEFAULT_BUFFER_SIZE = 1;

	private final Object[] buffer = new Object[DEFAULT_BUFFER_SIZE];

	private static final int EMPTY = 0;

	private int header;

	private int tail;

	private int count;

	private BoundedBuffer(){

	}

	public static BoundedBuffer getInstanceOfBuffer(){
return bufferInstance;
} public synchronized void put(Object obj) throws InterruptedException {
while (count >= DEFAULT_BUFFER_SIZE) {
System.out.println("the buffer is full,wait for a moment,thread:"
+ Thread.currentThread().getId());
wait();
}
if (tail >= DEFAULT_BUFFER_SIZE) {
tail = 0;
}
System.out.println("success to put the data:"+obj+" into the buffer,thread:"+Thread.currentThread().getId());
buffer[tail++] = obj; count++; // then we invoke the thread in the notEmptyCondition wait queue
notifyAll();
} /**
* take the data from header of the queue
*
* @return
* @throws InterruptedException
*/
public synchronized Object take() throws InterruptedException {
Object res;
while (count <= EMPTY) {
System.out.println("the buffer is empty,just wait a moment,thread:"
+ Thread.currentThread().getId());
wait();
}
res = buffer[header];
if (++header >= DEFAULT_BUFFER_SIZE) {
header = 0;
}
count--;
if(count<0){
count=0;
} //notify the thread which wait to put the data to buffer when the buffer is null
notifyAll(); return res;
} private static class BufferProducor implements Runnable { private Object target; public void run() {
try {
BoundedBuffer.getInstanceOfBuffer().put(target);
} catch (InterruptedException e) {
e.printStackTrace();
System.out
.println("client interrupt the task added the data to the buffer");
}
} public void setTarget(Object target) {
this.target = target;
} } private static class BufferConsumer implements Runnable {
public void run() {
try {
Object res = BoundedBuffer.getInstanceOfBuffer().take();
System.out.println("we get the result from buffer:" + res);
} catch (InterruptedException e) {
e.printStackTrace();
System.out
.println("client interrupt the task take the data from the buffer");
}
} } public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(5); BufferProducor bufferProducor1 = new BufferProducor();
bufferProducor1.setTarget("a"); BufferProducor bufferProducor2 = new BufferProducor();
bufferProducor2.setTarget("b"); BufferConsumer bufferConsumer1 = new BufferConsumer();
BufferConsumer bufferConsumer2 = new BufferConsumer(); service.submit(bufferProducor1);
service.submit(bufferProducor2);
service.submit(bufferConsumer1);
service.submit(bufferConsumer2);
}
}

分析这个程序,有什么问题?

首先程序想实现通过wait方法来堵塞存取线程,通过notifyAll来唤醒存取线程。

这里说明下,因为用的是内置锁syncronized,而且当前锁对象是bufferInstance单例实例。所以当调用wait时,当前线程被挂起放入当前bufferInstance相关的内置条件队列其中。兴许调用notifyAll则是将这个条件队列中全部堵塞的线程唤醒。

这样因为仅仅有一个条件队列用于存放堵塞的线程,所以存数据线程及取数据线程都是放在一个堵塞条件队列其中。



notifyAll会唤醒全部堵塞的线程,比方,当前在堵塞队列中有10个等待存数据到buffer的线程。

然后有一个消费线程从元素满的buffer中取出数据,并通过notifyAll唤醒全部在堵塞队列中的线程,然后在堵塞队列中的三个线程都醒了,当中一个线程能够将数据放入buffer,其他9个线程因为buffer空间已满,又被挂起进入到堵塞队列。





假设须要优化这段代码性能的话,一种是仅仅在引起存取线程堵塞的状态变化上才进行唤醒操作,即假设取操作线程要唤醒被堵塞的存操作线程,条件是:取操作线程进入take方法时,buffer元素是满的,然后取线程取出一个元素,使得buffer有空暇空间让存线程存数据。



进一步优化的话,能不能每次仅仅唤醒一个线程?

对于如今一个条件队列存放两种类型的堵塞线程来讲,这样是不同意的。

考虑假设当前buffer能够容纳一个元素,这时先有三个存线程往buffer放数据,这样当中两个线程被堵塞到条件队列。

然后这时一个取数据线程,从buffer取走一个数据并调用notify方法唤醒条件队列中一个存线程。

这样条件队列中另一个存线程。

接着存线程要存数据到buffer,但有一个取线程先来到take方法然后发现buffer还是空的,然后这个取线程被放入到了条件队列。

这样条件队列中就有一个存线程及一个取线程。

然后刚才被唤醒的存线程继续做存操作,然后调用notify唤醒条件队列中的一个线程,因为内置锁的条件队列取操作是非公平的因此非常有可能这时唤醒的是条件队列中的

存线程。其实是没有意义的。



全部对于以上一个条件队列中有两种等待不同条件被堵塞的线程的情况时,不能用单个notify。



假设想用单个notify就要想办法将之前堵塞的存线程与取线程分别放在两个队列。

这就要用到Lock的newCondition方法。



重构代码例如以下:

public class ConditionBoundedBuffer {

	private static final ConditionBoundedBuffer bufferInstance = new ConditionBoundedBuffer();

	private static final int DEFAULT_BUFFER_SIZE = 1;

	private final Object[] buffer = new Object[DEFAULT_BUFFER_SIZE];

	private static final int EMPTY = 0;

	private final Lock lock = new ReentrantLock();

	private int header;

	private int tail;

	private int count;

	private final Condition notFullCondition = lock.newCondition();

	private final Condition notEmptyCondition = lock.newCondition();

	private ConditionBoundedBuffer(){

	}

	public static ConditionBoundedBuffer getInstanceOfConditionBoundedBuffer(){
return bufferInstance;
} public void put(Object obj) throws InterruptedException {
lock.lock();
try {
while (count == DEFAULT_BUFFER_SIZE) {
System.out.println("the buffer is full,wait for a moment for putting ["+obj+"] to the buffer"+",thread:"+Thread.currentThread().getId());
notFullCondition.await();
}
if (tail >= DEFAULT_BUFFER_SIZE) {
tail = 0;
}
buffer[tail++] = obj; count++; System.out.println("success put the data ["+obj+"] to buffer,thread:"+Thread.currentThread().getId()); // then we invoke the thread in the notEmptyCondition wait queue
notEmptyCondition.signal(); } finally {
lock.unlock();
}
} /**
* take the data from header of the queue
*
* @return
* @throws InterruptedException
*/
public Object take() throws InterruptedException {
lock.lock();
Object res;
try {
while (count == EMPTY) {
System.out.println("the buffer is empty,just wait a moment,thread:"+Thread.currentThread().getId());
notEmptyCondition.await();
}
res = buffer[header];
if (++header >= DEFAULT_BUFFER_SIZE) {
header = 0;
} count--;
if(count<EMPTY){
count=0;
} notFullCondition.signal(); } finally {
lock.unlock();
}
return res;
} private static class BufferProducor implements Runnable { private Object target; public void run() {
try {
ConditionBoundedBuffer.getInstanceOfConditionBoundedBuffer().put(target);
} catch (InterruptedException e) {
e.printStackTrace();
System.out
.println("client interrupt the task added the data to the buffer");
}
} public void setTarget(Object target) {
this.target = target;
} } private static class BufferConsumer implements Runnable {
public void run() {
try {
Object res = ConditionBoundedBuffer.getInstanceOfConditionBoundedBuffer().take();
System.out.println("we get the result from buffer:" + res);
} catch (InterruptedException e) {
e.printStackTrace();
System.out
.println("client interrupt the task take the data from the buffer");
}
} } public static void main(String[] args) { ExecutorService service=Executors.newFixedThreadPool(5); BufferProducor bufferProducor1=new BufferProducor();
bufferProducor1.setTarget("a"); BufferProducor bufferProducor2=new BufferProducor();
bufferProducor2.setTarget("b"); BufferConsumer bufferConsumer1=new BufferConsumer();
BufferConsumer bufferConsumer2=new BufferConsumer(); service.submit(bufferProducor1);
service.submit(bufferProducor2);
service.submit(bufferConsumer1);
service.submit(bufferConsumer2);
}
}

接下去的任务是搞清楚,lock内部实现原理,整个实现主要组件组成,学习当中的一些优秀想法,设计。