Condition线程通信(七)

时间:2021-03-17 16:09:32

前言:对于线程通信,使用synchronized时使用wait、notify和notifyAll来实行线程通信。而使用Lock如何处理线程通信呢?答案就是本片的主角:Condition.

一、Condition

Condition,Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

1,Codition接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用Object.wait访问的隐式监视器类似。

但提供了更强大的功能,需要指出的是,单个lock可能与多个condition对象关联。为了避免兼容性问题,condition方法的名称与对应的object版本中不一样。

2,在condition对象中,与wait,notify,notifyAll方法对应的分别是await,signal,和signalAll。

3,condition实例实质上是被绑定到了一个锁上,要为特定Lock实例获得condition实例,请使用newCondition()方法。

生产者与消费者程序中,这里通过Lock和condition来联合处理,代替synchronized和wait方法:

/*
* 生产者消费者案例:
*/
public class TestProductorAndConsumerForLock {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor pro = new Productor(clerk);
Consumer con = new Consumer(clerk); new Thread(pro, "生产者 A").start();
new Thread(con, "消费者 B").start(); new Thread(pro, "生产者 C").start();
new Thread(con, "消费者 D").start();
}
}
class Clerk {
private int product = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
// 进货
public void get() {
// 锁住
lock.lock();
try {
// 为了避免虚假唤醒,应该总是使用在while循环中。
while (product >= 1) {
System.out.println("产品已满!");
try {
condition.await();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + " : "
+ ++product);
condition.signalAll();
} finally {
// unlock放在finally中,可确保一定会锁被释放
lock.unlock();
}
}
// 卖货
public void sale() {
lock.lock();
try {
// 为了避免虚假唤醒,应该总是使用在while循环中。
while (product <= 0) {
System.out.println("缺货!");
try {
condition.await();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + " : "
+ --product);
condition.signalAll();
} finally {
lock.unlock();
}
}
}
// 生产者
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}
// 消费者
class Consumer implements Runnable { private Clerk clerk; public Consumer(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}

​ 在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。

二、Conditon的常用方法

Condition底层模型:

Condition线程通信(七)

(1)await()

​ 这个方法由当前获得锁的线程调用,意思是释放锁,挂起自己并且唤醒等待当前线程持有的锁的其他线程(在aqs的等待队列中的其他节点),类似于synchronized同步代码块的wait方法的意思。

源码分析:

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 这个方法是将当前线程放到当前condition的队列中
Node node = addConditionWaiter();
// 释放当前线程站有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//isOnSyncQueue的意思是判断当前的线程是否在sync的队列(sync的队列就是aqs的队列)
//while循环的意思是只要封装当前线程的node没有在aqs的队列中,就一直循环。
while (!isOnSyncQueue(node)) {
// 将当前的线程挂起,也就是说等当前的线程获得锁之后,还是从这里开始运行
LockSupport.park(this);
// 这里是判断在线程挂起的时间内,有没有别的线程调用这个线程的iterrupt方法,如果没有的话返回0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//挂起结束,也就是重新运行之后,调用acquireQueued方法,即进入aqs的队列中,acquireQueued方法就是获取锁的代码,如果获取失败则被挂起。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

Condition线程通信(七)

await方法,其实就是先释放锁(说白了底层就是更改了AQS的state),然后调用LockSupport的park方法park自己,再把自己丢进Condition的等待队列中。这里不要忘记了,释放锁的同时还通知了同步队列中的线程去拿锁哦。

(2)signal()

思想:将在condition的wait queue中等待的线程转移到aqs的队列中。

public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//firstwaiter就是condition的队列中的head
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&//transferForSignal的意思就是转移first节点到aqs的队列中。
(first = firstWaiter) != null);//直到转移成功first或者队列中没有节点
} final boolean transferForSignal(Node node) {
/* If cannot change waitStatus, the node has been cancelled. */
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//将node的waitStatus设置为0,如果设置失败,返回false,继续进行操作,设置成功向下走。
return false; Node p = enq(node);//enq之前说过了,是将其加入到aqs的队列中
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//将node的waitStatus设置为signal,这样在aqs的队列中就和普通的等待着一样了。
LockSupport.unpark(node.thread);
return true;
}

signal()底层模型:

Condition线程通信(七)

通过signla方法,之前在condition的等待队列中的线程的head已经转移到aqs的队列中去等待获得锁了。

举个例子更好的理解(多个生产者和多个消费者的例子):

​ 生产者生产商品,消费者消费商品,生产出的商品放到容器中,容器中只能放一个商品。刚上来容器中为空,假设消费者获得锁,则所有的消费者在叫做empty的condition上等待,同时释放锁,这时,可能有一个或者多个消费者在empty上等待。此时生产者获得锁(因为只要是消费者获得锁就会进入empty的condition的队列中),将生产的商品放入到容器中,此时容器中已经有商品,其他的生产者发现不能放了,则在full这个condition上等待,也就是进入full这个condition的队列中,生产者放入后,会调用empty.signal,意思是将在empty中等待的线程转移到aqs的队列中,准备获得锁,然后生产者释放所, 消费者获得锁(因为现在容器中有商品了,虽然生产者有机会获得锁,但是只要是生产者持有锁,就会调用full.await,也就是现在条件不满足,虽然我获得了锁,但是必须要等待消费者消费,于是进入full的队列中,所以最终是消费者获得锁)。消费者获得锁后将商品消费,然后调用full.signal,也就是通知生产者生产,这样就回到了最开始的时候.....

​ condition基本原理就是内部维持了一个队列,在调用await的时候将线程阻塞,释放锁,将其加入到队列中等待,在调用signal的时候将自己队列中的head转移到aqs的队列中,等待获得锁。

参考链接

本文主要整理自互联网,主要是便于自己复习知识所用,侵权联系删除,以下为原文参考链接!

【1】Java并发编程序列之JUC中Condition线程通讯

【2】JUC-Condition线程通信

【3】juc - Condition源码解读