高并发第十一弹:J.U.C -AQS(AbstractQueuedSynchronizer) 组件:Lock,ReentrantLock,ReentrantReadWriteLock,StampedLock

时间:2021-11-16 05:30:05

既然说到J.U.C 的AQS(AbstractQueuedSynchronizer)   不说 Lock 是不可能的.不过实话来说,一般 JKD8 以后我一般都不用Lock了.毕竟sychronized 的效率已经很高了.Lock在我的实际开发中的需求很少,但还是需要了解一下的.

JAVA的两种锁

ReentrantLock与synchronized的区别

  可重入性:两者的锁都是可重入的,差别不大,有线程进入锁,计数器自增1,等下降为0时才可以释放锁

  锁的实现:synchronized是基于JVM实现的(用户很难见到,无法了解其实现),ReentrantLock是JDK实现的,其实就是我们敲代码实现的。

  性能区别:在最初的时候,二者的性能差别差很多,当synchronized引入了偏向锁、轻量级锁(自选锁)后,二者的性能差别不大,官方推荐synchronized(写法更容易、在优化时其实是借用了ReentrantLock的CAS技术,试图在用户态就把问题解决,避免进入内核态造成线程阻塞)

功能区别:

  (1)便利性:synchronized更便利,它是由编译器保证加锁与释放。ReentrantLock是需要手动释放锁,所以为了避免忘记手工释放锁造成死锁,所以最好在finally中声明释放锁。

  (2)锁的细粒度和灵活度,ReentrantLock优于synchronized 

ReentrantLock独有的功能

  可以指定是公平锁还是非公平锁,sync只能是非公平锁。(所谓公平锁就是先等待的线程先获得锁)

  提供了一个Condition类,可以分组唤醒需要唤醒的线程。不像是synchronized要么随机唤醒一个线程,要么全部唤醒。

  提供能够中断等待锁的线程的机制,通过lock.lockInterruptibly()实现,这种机制 ReentrantLock是一种自选锁,通过循环调用CAS操作来实现加锁。性能比较好的原因是避免了进入内核态的阻塞状态。

建议(纯个人粗见):

  除非需要用Lock的3个独有的功能,为了安全和省心一点还是用synchronized吧.最后会有一个 Lock和synchronized和Atomic的性能对比.也可以作为参考

那还是回归主题

怎么使用ReentrantLock呢

  构造方法

  ReentrantLock()创建一个 ReentrantLock的实例。

  ReentrantLock(boolean fair) 根据给定的公平政策创建一个 ReentrantLock的实例。 

//创建锁
private final static Lock lock = new ReentrantLock();
//使用锁
private static void method() {
    lock.lock();
    try {
       .......
    } finally {
        lock.unlock();
    }
}

基本方法

基本使用 上面也有了   看一下Condition的使用

Condition的使用

Condition因素出 Object监视器方法( waitnotifynotifyAll )成不同的对象,以得到具有多个等待集的每个对象,通过将它们与使用任意的组合的效果 Lock实现。 Lock替换 synchronized方法和语句的使用, Condition取代了对象监视器方法的使用。

条件(也称为条件队列条件变量 )为一个线程暂停执行(“等待”)提供了一种方法,直到另一个线程通知某些状态现在可能为真。 因为访问此共享状态信息发生在不同的线程中,所以它必须被保护,因此某种形式的锁与该条件相关联。 等待条件的关键属性是它原子地释放相关的锁并挂起当前线程,就像Object.wait

一个Condition实例本质上绑定到一个锁。 要获得特定Condition实例的Condition实例,请使用其newCondition()方法。

例如,假设我们有一个有限的缓冲区,它支持puttake方法。 如果在一个空的缓冲区尝试一个take ,则线程将阻塞直到一个项目可用; 如果put试图在一个完整的缓冲区,那么线程将阻塞,直到空间变得可用。 我们希望在单独的等待集中等待put线程和take线程,以便我们可以在缓冲区中的项目或空间可用的时候使用仅通知单个线程的优化。 这可以使用两个Condition实例来实现。

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock(); try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally { lock.unlock(); }
   }

   public Object take() throws InterruptedException {
     lock.lock(); try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally { lock.unlock(); }
   }
 } 

下面是一个更明晰的例子

public static void main(String[] args) {
    ReentrantLock reentrantLock = new ReentrantLock();
    Condition condition = reentrantLock.newCondition();//创建condition
    //线程1
    new Thread(() -> {
        try {
            reentrantLock.lock();
            log.info("wait signal"); // 1
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("get signal"); // 4
        reentrantLock.unlock();
    }).start();
    //线程2
    new Thread(() -> {
        reentrantLock.lock();
        log.info("get lock"); // 2
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        condition.signalAll();//发送信号
        log.info("send signal"); // 3
        reentrantLock.unlock();
    }).start();
}

输出过程讲解:

1、线程1调用了reentrantLock.lock(),线程进入AQS等待队列,输出1号log

2、接着调用了awiat方法,线程从AQS队列中移除,锁释放,直接加入condition的等待队列中

3、线程2因为线程1释放了锁,拿到了锁,输出2号log

4、线程2执行condition.signalAll()发送信号,输出3号log

5、condition队列中线程1的节点接收到信号,从condition队列中拿出来放入到了AQS的等待队列,这时线程1并没有被唤醒。

6、线程2调用unlock释放锁,因为AQS队列中只有线程1,因此AQS释放锁按照从头到尾的顺序,唤醒线程1 7、线程1继续执行,输出4号log,并进行unlock操作。

ReentrantReadWriteLock的使用

在没有任何读写锁的时候才可以取得写入锁(悲观读取,容易写线程饥饿),也就是说如果一直存在读操作,那么写锁一直在等待没有读的情况出现,这样我的写锁就永远也获取不到,就会造成等待获取写锁的线程饥饿。 平时使用的场景并不多。 

private final Map<String, Data> map = new TreeMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();//读锁
    private final Lock writeLock = lock.writeLock();//写锁

    //加读锁
    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }
    //加写锁
    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

StampedLock

好文章:StampedLock将是解决同步问题的新宠

 

介绍: 一种基于能力的锁,具有三种模式用于控制读/写访问。 StampedLock的状态由版本和模式组成。 锁定采集方法返回一个表示和控制相对于锁定状态的访问的印记; 这些方法的“尝试”版本可能会返回特殊值为零以表示获取访问失败。 锁定释放和转换方法要求邮票作为参数,如果它们与锁的状态不匹配则失败。 这三种模式是:
  • 写作。 方法writeLock()可能阻止等待独占访问,返回可以在方法unlockWrite(long)中使用的邮票来释放锁定。 不定时的和定时版本tryWriteLock ,还提供。 当锁保持写入模式时,不能获得读取锁定,并且所有乐观读取验证都将失败。
  • 读。 方法readLock()可能阻止等待非独占访问,返回可用于方法unlockRead(long)释放锁的戳记 。 不定时的和定时版本tryReadLock ,还提供。
  • 乐观阅读 方法tryOptimisticRead()只有当锁当前未保持在写入模式时才返回非零标记。 方法validate(long)返回true,如果在获取给定的邮票时尚未在写入模式中获取锁定。 这种模式可以被认为是一个非常弱的版本的读锁,可以随时由作家打破。 对简单的只读代码段使用乐观模式通常会减少争用并提高吞吐量。 然而,其使用本质上是脆弱的。 乐观阅读部分只能读取字段并将其保存在局部变量中,以供后验证使用。 以乐观模式读取的字段可能会非常不一致,因此只有在熟悉数据表示以检查一致性和/或重复调用方法validate()时,使用情况才适用。 例如,当首次读取对象或数组引用,然后访问其字段,元素或方法之一时,通常需要这样的步骤。

 

JDK上提供的例子

class Point {
   private double x, y;
   private final StampedLock sl = new StampedLock();
   void move(double deltaX, double deltaY) { // an exclusively locked method
     long stamp = sl.writeLock();
     try {
       x += deltaX;
       y += deltaY;
     } finally {
       sl.unlockWrite(stamp);
     }
   }
  //下面看看乐观读锁案例
   double distanceFromOrigin() { // A read-only method
     long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
     double currentX = x, currentY = y; //将两个字段读入本地局部变量
     if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
        stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
        try {
          currentX = x; // 将两个字段读入本地局部变量
          currentY = y; // 将两个字段读入本地局部变量
        } finally {
           sl.unlockRead(stamp);
        }
     }
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }
//下面是悲观读锁案例
   void moveIfAtOrigin(double newX, double newY) { // upgrade
     // Could instead start with optimistic, not read mode
     long stamp = sl.readLock();
     try {
       while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
         long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
         if (ws != 0L) { //这是确认转为写锁是否成功
           stamp = ws; //如果成功 替换票据
           x = newX; //进行状态改变
           y = newY; //进行状态改变
           break;
         }
         else { //如果不能成功转换为写锁
           sl.unlockRead(stamp); //我们显式释放读锁
           stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
         }
       }
     } finally {
       sl.unlock(stamp); //释放读锁或写锁
     }
   }
 }
  1. synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定;
  2. ReentrantLock、ReentrantReadWriteLock,、StampedLock都是对象层面的锁定,要保证锁定一定会被释放,就必须将unLock()放到finally{}中;
  3. StampedLock 对吞吐量有巨大的改进,特别是在读线程越来越多的场景下;
  4. StampedLock有一个复杂的API,对于加锁操作,很容易误用其他方法;
  5. 当只有少量竞争者的时候,synchronized是一个很好的通用的锁实现;
  6. 当线程增长能够预估,ReentrantLock是一个很好的通用的锁实现;

StampedLock 可以说是Lock的一个很好的补充,吞吐量以及性能上的提升足以打动很多人了,但并不是说要替代之前Lock的东西,毕竟他还是有些应用场景的,起码API比StampedLock容易入手.

 

锁的选择

1、当只有少量竞争者,使用synchronized

2、竞争者不少但是线程增长的趋势是能预估的,使用ReetrantLock

3、synchronized不会造成死锁,jvm会自动释放死锁。 

下面有一个例子 关闭 synchronized ,reentrantLock,Atomic的性能对比

高并发第十一弹:J.U.C -AQS(AbstractQueuedSynchronizer) 组件:Lock,ReentrantLock,ReentrantReadWriteLock,StampedLock高并发第十一弹:J.U.C -AQS(AbstractQueuedSynchronizer) 组件:Lock,ReentrantLock,ReentrantReadWriteLock,StampedLock
package com.rong.juc;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 
 * @ClassName: ReentrantLockDemo
 * @Description:TODO(这里用一句话描述这个类的作用)
 * @author: rongbo
 * @date: 2018年9月23日 下午10:57:27
 * 
 *
 */
public class ReentrantLockTest {
    public static void test(int round, int threadNum, CyclicBarrier cyclicBarrier) {
        new SyncTest("Sync", round, threadNum, cyclicBarrier).testTime();
        new LockTest("Lock", round, threadNum, cyclicBarrier).testTime();
        new AtomicTest("Atom", round, threadNum, cyclicBarrier).testTime();
    }

    public static void main(String args[]) {

        for (int i = 0; i < 5; i++) {
            int round = 10000 * (i + 1);
            int threadNum = 5 * (i + 1);
            CyclicBarrier cb = new CyclicBarrier(threadNum * 2 + 1);
            System.out.println("==========================");
            System.out.println("round:" + round + " thread:" + threadNum);
            test(round, threadNum, cb);

        }
    }
}

class SyncTest extends TestTemplate {
    public SyncTest(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
        super(_id, _round, _threadNum, _cb);
    }

    @Override
    /**
     * synchronized关键字不在方法签名里面,所以不涉及重载问题
     */
    synchronized long getValue() {
        return super.countValue;
    }

    @Override
    synchronized void sumValue() {
        super.countValue += preInit[index++ % round];
    }
}

class LockTest extends TestTemplate {
    ReentrantLock lock = new ReentrantLock();

    public LockTest(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
        super(_id, _round, _threadNum, _cb);
    }

    /**
     * synchronized关键字不在方法签名里面,所以不涉及重载问题
     */
    @Override
    long getValue() {
        try {
            lock.lock();
            return super.countValue;
        } finally {
            lock.unlock();
        }
    }

    @Override
    void sumValue() {
        try {
            lock.lock();
            super.countValue += preInit[index++ % round];
        } finally {
            lock.unlock();
        }
    }
}

class AtomicTest extends TestTemplate {
    public AtomicTest(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
        super(_id, _round, _threadNum, _cb);
    }

    @Override
    /**
     * synchronized关键字不在方法签名里面,所以不涉及重载问题
     */
    long getValue() {
        return super.countValueAtmoic.get();
    }

    @Override
    void sumValue() {
        super.countValueAtmoic.addAndGet(super.preInit[indexAtomic.get() % round]);
    }
}

abstract class TestTemplate {
    private String id;
    protected int round;
    private int threadNum;
    protected long countValue;
    protected AtomicLong countValueAtmoic = new AtomicLong(0);
    protected int[] preInit;
    protected int index;
    protected AtomicInteger indexAtomic = new AtomicInteger(0);
    Random r = new Random(47);
    // 任务栅栏,同批任务,先到达wait的任务挂起,一直等到全部任务到达制定的wait地点后,才能全部唤醒,继续执行
    private CyclicBarrier cb;

    public TestTemplate(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
        this.id = _id;
        this.round = _round;
        this.threadNum = _threadNum;
        cb = _cb;
        preInit = new int[round];
        for (int i = 0; i < preInit.length; i++) {
            preInit[i] = r.nextInt(100);
        }
    }

    abstract void sumValue();

    /*
     * 对long的操作是非原子的,原子操作只针对32位 long是64位,底层操作的时候分2个32位读写,因此不是线程安全
     */
    abstract long getValue();

    public void testTime() {
        ExecutorService se = Executors.newCachedThreadPool();
        long start = System.nanoTime();
        // 同时开启2*ThreadNum个数的读写线程
        for (int i = 0; i < threadNum; i++) {
            se.execute(new Runnable() {
                public void run() {
                    for (int i = 0; i < round; i++) {
                        sumValue();
                    }

                    // 每个线程执行完同步方法后就等待
                    try {
                        cb.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }

                }
            });
            se.execute(new Runnable() {
                public void run() {

                    getValue();
                    try {
                        // 每个线程执行完同步方法后就等待
                        cb.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }

                }
            });
        }

        try {
            // 当前统计线程也wait,所以CyclicBarrier的初始值是threadNum*2+1
            cb.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        // 所有线程执行完成之后,才会跑到这一步
        long duration = System.nanoTime() - start;
        System.out.println(id + " = " + duration);

    }

}
View Code

老电脑 ,性能很差

结果:

==========================
round:10000 thread:5
Sync = 4578771
Lock = 6079408
Atom = 2358938
==========================
round:20000 thread:10
Sync = 10253723
Lock = 6668266
Atom = 3977932
==========================
round:30000 thread:15
Sync = 19530498
Lock = 13254122
Atom = 11142416
==========================
round:40000 thread:20
Sync = 31596091
Lock = 24663350
Atom = 18504161
==========================
round:50000 thread:25
Sync = 55158877
Lock = 36521455
Atom = 32352693

 

StampedLock 和ReentrantLock的对比

就刚刚 个例子 自己试一下吧..还是需要自动动手后才 记得住的 Q_Q