Java并发——使用Condition线程间通信

时间:2022-12-20 17:58:04

线程间通信

线程之间除了同步互斥,还要考虑通信。在Java5之前我们的通信方式为:wait 和 notify。Condition的优势是支持多路等待,即可以定义多个Condition,每个condition控制线程的一条执行通路。传统方式只能是一路等待

Condition提供不同于Object 监视器方法的行为和语义,如受保证的通知排序,或者在执行通知时不需要保持一个锁。

 

Condition接口

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

public interface Condition {
    void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }

说明:

await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。

awaitUninterruptibly()方法与await()方法基本相同,但awaitUninterruptibly()方法不会在等待过程中响应中断。

singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。这和Obejct.notify()方法类似。

condition.await()方法必须在lock.lock()与lock.unlock()方法之间调用。

 

获取Condition

Condition实例实质上被绑定到一个锁上。一个锁内部可以有多个Condition,即有多路等待和通知。要为特定 Lock 实例获得 Condition 实例,请使用其 newCondition() 方法。

Condition newCondition() 返回用来与当前Lock实例一起使用的Condition 实例。

类似于 object.wait()和object.notify()的功能。object.wait()与object.notify()需要结合synchronized使用。Condition需要结合ReentrantLock使用。

 

"虚假唤醒"

所谓"虚假唤醒",即其他地方的代码触发了condition.signal(),唤醒condition上等待的线程。但被唤醒的线程仍然不满足执行条件。

condition通常与条件语句一起使用:

if(!条件){
    condition.await(); //不满足条件,当前线程等待;
}

更好的方法是使用while:

while(!条件){
    condition.await(); //不满足条件,当前线程等待;
}

在等待Condition时,允许发生"虚假唤醒",这通常作为对基础平台语义的让步。若使用"if(!条件)"则被"虚假唤醒"的线程可能继续执行。所以"while(!条件)"可以防止"虚假唤醒"。建议总是假定这些"虚假唤醒"可能发生,因此总是在一个循环中等待。

 

例:缓冲队列的实现。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();// 锁对象
    final Condition notFull = lock.newCondition(); //写线程条件
    final Condition notEmpty = lock.newCondition();//读线程条件
    final Object[] items = new Object[100];// 初始化一个长度为100的队列
    int putptr/* 写索引 */, takeptr/* 读索引 */, count/* 队列中存在的数据个数 */;

    public void put(Object x) throws InterruptedException {
        lock.lock(); //获取锁
        try {
            while (count == items.length)
                notFull.await();// 当计数器count等于队列的长度时,不能再插入,因此等待。阻塞写线程。
            items[putptr] = x;//赋值
            putptr++;

            if (putptr == items.length)
                putptr = 0;// 若写索引写到队列的最后一个位置了,将putptr置为0。
            count++; // 每放入一个对象就将计数器加1。
            notEmpty.signal(); // 一旦插入就唤醒取数据线程。
        } finally {
            lock.unlock(); // 最后释放锁
        }
    }

    public Object take() throws InterruptedException {
        lock.lock(); // 获取锁
        try {
            while (count == 0)
                notEmpty.await(); // 如果计数器等于0则等待,即阻塞读线程。
            Object x = items[takeptr]; // 取值
            takeptr++;
            if (takeptr == items.length)
                takeptr = 0; //若读锁应读到了队列的最后一个位置了,则读锁应置为0;即当takeptr达到队列长度时,从零开始取
            count++; // 每取一个将计数器减1。
            notFull.signal(); //枚取走一个就唤醒存线程。
            return x;
        } finally {
            lock.unlock();// 释放锁
        }
    }

}

此即Condition的强大之处,假设缓存队列中已经存满,那么阻塞的肯定是写线程,唤醒的肯定是读线程,相反,阻塞的肯定是读线程,唤醒的肯定是写线程,那么假设只有一个Condition会有什么效果呢,缓存队列中已经存满,这个Lock不知道唤醒的是读线程还是写线程了,如果唤醒的是读线程,皆大欢喜,如果唤醒的是写线程,那么线程刚被唤醒,又被阻塞了,这时又去唤醒,这样就浪费了很多时间。

 

 

例:经典问题:三个线程依次打印ABC,代码示例如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Business {
    private Lock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();
    private String type = "A"; //内部状态

    /*
     * 方法的基本要求为:
     * 1、该方法必须为原子的。
     * 2、当前状态必须满足条件。若不满足,则等待;满足,则执行业务代码。
     * 3、业务执行完毕后,修改状态,并唤醒指定条件下的线程。
     */
    public void printA() {
        lock.lock(); //锁,保证了线程安全。
        try {
            while (type != "A") { //type不为A,
                try {
                    conditionA.await(); //将当前线程阻塞于conditionA对象上,将被阻塞。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //type为A,则执行。
            System.out.println(Thread.currentThread().getName() + " 正在打印A");
            type = "B"; //将type设置为B。
            conditionB.signal(); //唤醒在等待conditionB对象上的一个线程。将信号传递出去。
        } finally {
            lock.unlock(); //解锁
        }
    }

    public void printB() {
        lock.lock(); //
        try {
            while (type != "B") { //type不为B,
                try {
                    conditionB.await(); //将当前线程阻塞于conditionB对象上,将被阻塞。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //type为B,则执行。
            System.out.println(Thread.currentThread().getName() + " 正在打印B");
            type = "C"; //将type设置为C。
            conditionC.signal(); //唤醒在等待conditionC对象上的一个线程。将信号传递出去。
        } finally {
            lock.unlock(); //解锁
        }
    }

    public void printC() {
        lock.lock(); //
        try {
            while (type != "C") {
                try {
                    conditionC.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println(Thread.currentThread().getName() + " 正在打印C");
            type = "A";
            conditionA.signal();
        } finally {
            lock.unlock(); //解锁
        }
    }
}


public class Test{

    public static void main(String[] args) {
        final Business business = new Business();//业务对象。

        //线程1号,打印10次A。
        Thread ta = new Thread(new Runnable() {

            @Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printA();
                }
            }
        });

        //线程2号,打印10次B。
        Thread tb = new Thread(new Runnable() {

            @Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printB();
                }
            }
        });

        //线程3号,打印10次C。
        Thread tc = new Thread(new Runnable() {

            @Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printC();
                }
            }
        });

        //执行3条线程。
        ta.start();
        tb.start();
        tc.start();
    }

}

执行代码,控制台依次显示了A、B、C,10次。可以看到3条线程之间共享Business类中的资源type,且3条线程之间进行了有效的协调。