Java多线程与并发知识点梳理

时间:2021-04-12 01:15:13


目录

1、多线程基础

1.1、线程生命周期

1.1.1、新建

1.1.2、就绪

1.1.3、运行

1.1.4、阻塞

1.1.5、死亡

1.2、终止线程的方式

1.2.1、正常运行结束

1.2.2、使用退出标志

1.2.3、使用interrupt()方法来中断线程

1.2.3、stop方法强制结束

1.3、sleep与wait方法区别

1.4、守护线程daemon

1.5、线程基本方法

1.5.1、wait

1.5.2、线程睡眠(sleep)

1.5.3、线程让步(yield)

1.5.4、notify与notifyAll

1.5.5、为什么wait, notify 和 notifyAll这些方法不在thread类里面

1.5.6、interrupted和 isInterrupted方法的区别

1.5.7、 有三个线程T1,T2,T3,怎么确保它们按顺序执行

1.6、多线程最佳实践

1.6.1、给你的线程起个有意义的名字

1.6.2、避免锁定和缩小同步的范围

1.6.3、多用同步类少用wait 和 notify

1.6.4、多用并发集合少用同步集合

2、线程池

2.1、线程池原理

2.2、线程复用原理

2.3、自定义线程池

2.3.1、线程池类

2.3.2、工作线程类

2.3.3、任务类

2.3.4、测试类

2.4、线程池组成

2.4.1、类关系图 

2.4.2、ThreadPoolExecutor

2.4.3、拒绝策略

2.4.4、工作过程

2.5、线程池的作用 

2.6、不建议使用 Executors静态工厂构建现成的线程池

2.7、线程池如何处理异常

2.8、线程池的工作队列

3、阻塞队列

3.1、阻塞队列API

3.2、阻塞队列家族

3.2.1、ArrayBlockingQueue

3.2.2、LinkedBlockingQueue

3.2.3、DelayQueue

3.2.4、SynchronousQueue

3.2.5、PriorityBlockingQueue

3.2.6、LinkedTransferQueue

3.2.7、LinkedBlockingDeque

4、线程工具类

4.1、FutureTask

4.1.1、构造方法

4.1.2、示例 

4.2、AQS(AbstractQueuedSynchronizer抽象队列同步器)

4.2.1、AQS原理

4.2.2、自定义同步器

4.3、Condition(更高效)

4.4、Semaphore(信号量-控制同时访问的线程个数)

4.5、CountDownLatch(线程计数器)

5、java锁

5.1、synchronized关键字

5.1.1、使用方式

5.1.2、底层原理

5.1.3、关于synchronized的其它知识点

5.2、Lock接口

5.2.1、synchronized与lock的区别

5.2.2、可重入锁ReentrantLock

5.2.3、读写锁ReadWriteLock

5.3、锁的思想(非实际锁)

5.3.1、公平锁、非公平锁

5.3.2、分段锁

5.3.3、可重入锁

5.3.4、乐观锁与悲观锁

5.3.5、共享锁和独占锁

5.3.6、自旋锁

5.4、锁状态(针对synchronized)

5.5、锁优化

5.5.1、减少锁持有时间

5.5.2、减小锁粒度

5.5.3、锁分离

5.5.4、锁粗化

5.5.5、锁消除


1、多线程基础

1.1、线程生命周期

1.1.1、新建

对应线程状态为new

1.1.2、就绪

当线程对象调用了start()方法之后,该线程处于就绪状态。Java虚拟机会为其创建方法调用栈和程序计数器,等待调度运行

1.1.3、运行

对应线程状态为RUNNABLE

1.1.4、阻塞

等待阻塞:WAITING

超时阻塞:TIMED_WAITING,调用下列方法之一会导致超时阻塞

  • Thread.sleep
  • 带时限(timeout)的 Object.wait
  • 带时限(timeout)的 Thread.join
  • LockSupport.parkNanos
  • LockSupport.parkUntil

阻塞:BLOCKED,等待监视器锁

1.1.5、死亡

1.2、终止线程的方式

1.2.1、正常运行结束

1.2.2、使用退出标志

while(!exit){
    //do something
}

这里的exit是自定义的变量

public class ThreadStopTest {
    public static boolean exit = false;
    public static void main(String[] args) {
        //这个线程的作用是每3秒打印出:helloworld
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (!exit){
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("helloworld");
                }
            }
        }).start();
        //这个线程的作用是,10秒后停止上面那个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                exit = true;
            }
        }).start();
    }
}

1.2.3、使用interrupt()方法来中断线程

分两种情况:

a)线程处于阻塞状态:如使用了sleep,同步锁的wait,socket中的receiver,accept等方法时,会使线程处于阻塞状态。当调用线程的interrupt()方法时,会抛出InterruptException异常。阻塞中的那个方法抛出这个异常,通过代码捕获该异常,然后break跳出循环状态,从而让我们有机会结束这个线程的执行。通常很多人认为只要调用interrupt方法线程就会结束,实际上是错的, 一定要先捕获InterruptedException异常之后通过break来跳出循环,才能正常结束run方法。

public class StopThreadTest implements Runnable{
    public static void main(String[] args) throws InterruptedException {

        Thread t = new Thread(new StopThreadTest());
        t.start();
        Thread.sleep(3000);
        t.interrupt();
    }

    @Override
    public void run() {
        while(true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
            System.out.println("hello");
        }
    }
}

b)线程未处于阻塞状态:使用isInterrupted()判断线程的中断标志来退出循环。当使用interrupt()方法时,中断标志就会置true,和使用自定义的标志来控制循环是一样的道理

public class StopThreadTest implements Runnable{
    public static void main(String[] args) throws InterruptedException {

        Thread t = new Thread(new StopThreadTest());
        t.start();
        Thread.sleep(3000);
        t.interrupt();
    }

    @Override
    public void run() {
        while(!Thread.currentThread().isInterrupted()){
            System.out.println("hello");
        }
    }
}

1.2.4、stop方法强制结束

程序中可以直接使用thread.stop()来强行终止线程,但是stop方法是很危险的,就象突然关闭计算机电源,而不是按正常程序关机一样,可能会产生不可预料的结果

不安全主要是:thread.stop()调用之后,创建子线程的线程就会抛出ThreadDeatherror的错误,并且会释放子线程所持有的所有锁。一般任何进行加锁的代码块,都是为了保护数据的一致性,如果在调用thread.stop()后导致了该线程所持有的所有锁的突然释放(不可控制),那么被保护数据就有可能呈现不一致性,其他线程在使用这些被破坏的数据时,有可能导致一些很奇怪的应用程序错误。因此,并不推荐使用stop方法来终止线程

可能会造成死锁

1.3、sleep与wait方法区别

  • 对于sleep()方法,我们首先要知道该方法是属于Thread类中的。而wait()方法,则是属于Object类中的
  • sleep()方法导致了程序暂停执行指定的时间,让出cpu该其他线程,但是他的监控状态依然保持者,当指定的时间到了又会自动恢复运行状态
  • 在调用sleep()方法的过程中,线程不会释放对象锁
  • 而当调用wait()方法的时候,线程会放弃对象锁,进入等待此对象的等待锁定池,只有针对此对象调用notify()方法后本线程才进入对象锁定池准备获取对象锁进入运行状态

1.4、守护线程daemon

  • 守护线程也称服务线程,他是后台线程,它有一个特性,即为非守护线程提供服务,在没有非守护线程可服务时会自动离开
  • 垃圾回收线程就是一个经典的守护线程,当我们的程序中不再有任何运行的Thread,程序就不会再产生垃圾,垃圾回收器也就无事可做,所以当垃圾回收线程是JVM上仅剩的线程时,垃圾回收线程会自动离开
  • 守护线程不依赖于终端,但是依赖于系统,与系统“同生共死”。当JVM中所有的线程都是守护线程的时候,JVM就可以退出了;如果还有一个或以上的非守护线程则JVM不会退出
public class StopThreadTest implements Runnable{


    public static void main(String[] args) throws InterruptedException {

        Thread t1 = new Thread(new StopThreadTest());
        Thread t2 = new Thread(new StopThreadTest());
        t1.setDaemon(true);
        t2.setDaemon(true);
        t1.start();
        t2.start();

        System.out.println(t1.isDaemon());
        System.out.println(t2.isDaemon());
        System.out.println(Thread.currentThread().isDaemon());

    }

    @Override
    public void run() {

        while(true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId());
        }
    }
}

输出结果:

true

true

false

如果t1和t2都设置为守护线程,那么当main线程结束后(main线程是非守护线程),系统中就没有非守护线程了,所以t1和t2线程都会自动退出,不会一直打印各自的线程ID

1.5、线程基本方法

1.5.1、wait

调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回,需要注意的是调用wait()方法后,会释放对象的锁。因此,wait方法一般用在同步方法或同步代码块中

1.5.2、线程睡眠(sleep)

sleep导致当前线程休眠,与wait方法不同的是sleep不会释放当前占有的锁,sleep(long)会导致线程进入TIMED-WATING(超时等待)状态,而wait()方法会导致当前线程进入WATING状态

1.5.3、线程让步(yield)

yield会使当前线程让出CPU执行时间片,与其他线程一起重新竞争CPU时间片。一般情况下,优先级高的线程有更大的可能性成功竞争得到CPU时间片,但这又不是绝对的,有的操作系统对线程优先级并不敏感

1.5.4、notify与notifyAll

Object 类中的 notify() 方法,唤醒在此对象监视器上等待的单个线程,如果所有线程都在此对象上等待,则会选择唤醒其中一个线程,选择是任意的,被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争。类似的方法还有 notifyAll() ,唤醒再次监视器上等待的所有线程

1.5.5、为什么wait, notify 和 notifyAll这些方法不在thread类里面

这是个设计相关的问题,一个很明显的原因是JAVA提供的锁是对象级的而不是线程级的,每个对象都有锁,通过线程获得。如果线程需要等待某些锁那么调用对象中的wait()方法就有意义了。如果wait()方法定义在Thread类中,线程正在等待的是哪个锁 就不明显了。简单的说,由于wait,notify和notifyAll都是锁级别的操作,所以把他们定义在Object类中因为锁属于对象

1.5.6、interrupted和 isInterrupted方法的区别

首先,打断一个线程调用interrupt方法,interrupted和 isInterrupted都返回boolean值(是否中断),interrupted和 isInterrupted的主要区别是前者会将中断状态清除而后者不会

任何抛出了InterruptedException异常的方法都会将中断状态清零,所以我们测试两者区别的时候,不要用Thread.sleep

测试代码:

Thread t = new Thread(() -> {
            //不要用Thread.sleep
            /* try {

                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/

            int i = 0;
            for(;;){
                i++;
                if(i>99999){
                    break;
                }
            }

            //System.out.println(Thread.interrupted());
            //System.out.println(Thread.interrupted());

        });

        t.start();

        t.interrupt();


        System.out.println(t.isInterrupted());
        System.out.println(t.isInterrupted());

1.5.7、 有三个线程T1,T2,T3,怎么确保它们按顺序执行

利用Thread对象的join方法,当执行join方法时,父线程会阻塞,知道子线程运行结束

1.6、多线程最佳实践

1.6.1、给你的线程起个有意义的名字

这样可以方便找bug或追踪。OrderProcessor, QuoteProcessor or TradeProcessor 这种名字比 Thread-1. Thread-2 and Thread-3 好多了,给线程起一个和它要完成的任务相关的名字,所有的主要框架甚至JDK都遵循这个最佳实践

1.6.2、避免锁定和缩小同步的范围

锁花费的代价高昂且上下文切换更耗费时间空间,试试最低限度的使用同步和锁,缩小临界区。因此相对于同步方法我更喜欢同步块,它给我拥有对锁的绝对控制权

1.6.3、多用同步类少用wait 和 notify

首先,CountDownLatch, Semaphore, CyclicBarrier 和 Exchanger 这些同步类简化了编码操作,而用wait和notify很难实现对复杂控制流的控制。其次,这些类是由最好的企业编写和维护在后续的JDK中它们还会不断 优化和完善,使用这些更高等级的同步工具你的程序可以不费吹灰之力获得优化

1.6.4、多用并发集合少用同步集合

这是另外一个容易遵循且受益巨大的最佳实践,并发集合比同步集合的可扩展性更好,所以在并发编程时使用并发集合效果更好。如果下一次你需要用到map,你应该首先想到用ConcurrentHashMap

ConcurrentHashMap,它内部细分了若干个小的HashMap,称之为段(Segment)。默认情况下一个ConcurrentHashMap被进一步细分为16个段,既就是锁的并发度

2、线程池

2.1、线程池原理

线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行

他的主要特点为:线程复用;控制最大并发数;管理线程

2.2、线程复用原理

每一个 Thread 的类都有一个 start 方法。 当调用start启动线程时Java虚拟机会调用该类的 run 方法。 那么该类的 run() 方法中就是调用了 Runnable 对象的 run() 方法。 我们可以继承重写 Thread 类,在其 start 方法中添加不断循环调用传递过来的 Runnable 对象。 这就是线程池的实现原理。循环方法中不断获取 Runnable 是用 Queue 实现的,在获取下一个 Runnable 之前可以是阻塞的

2.3、自定义线程池

2.3.1、线程池类

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * 自定义定长线程池
 */
public class MyThreadPool {
    /**
     * 线程池数组
     */
    private Thread[] threadArr;
    private int threadNum;
    /**
     * 线程队列
     */
    private BlockingQueue<Runnable> runnableQueue = new ArrayBlockingQueue<Runnable>(100);

    /**
     * 定长线程池构造方法
     */
    public MyThreadPool(int threadNum){
        this.threadNum = threadNum;
        initThreadPool();
    }

    /**
     * 初始化线程池
     */
    private void initThreadPool(){
        threadArr = new WorkThread[threadNum];
        for (int i = 0; i < threadNum; i++) {
            threadArr[i] = new WorkThread(this);
            threadArr[i].start();
        }
    }

    /**
     * 线程池执行器
     */
    public void execute(Runnable r){
        runnableQueue.add(r);
    }

    public BlockingQueue<Runnable> getRunnableQueue() {
        return runnableQueue;
    }

    public void setRunnableQueue(BlockingQueue<Runnable> runnableQueue) {
        this.runnableQueue = runnableQueue;
    }
}

2.3.2、工作线程类

public class WorkThread extends Thread{
    private MyThreadPool myThreadPool;
    public WorkThread(MyThreadPool myThreadPool){
        this.myThreadPool = myThreadPool;
    }

    @Override
    public void run() {
        while(true){
            try {
                Runnable r = myThreadPool.getRunnableQueue().take();
                r.run();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

2.3.3、任务类

public class MyTask implements Runnable{
    private String name;

    public MyTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name);
        }
    }
}

2.3.4、测试类

public class MyThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        MyThreadPool myThreadPool = new MyThreadPool(5);

        MyTask task1 = new MyTask("name1");
        MyTask task2 = new MyTask("name2");
        MyTask task3 = new MyTask("name3");
        MyTask task4 = new MyTask("name4");
        MyTask task5 = new MyTask("name5");

        MyTask task6 = new MyTask("name6");
        MyTask task7 = new MyTask("name7");

        myThreadPool.execute(task1);
        myThreadPool.execute(task2);
        myThreadPool.execute(task3);
        myThreadPool.execute(task4);
        myThreadPool.execute(task5);
        myThreadPool.execute(task6);
        myThreadPool.execute(task7);
    }

}

2.4、线程池组成

一般的线程池主要分为以下4个组成部分:

  • 线程池管理器:用于创建并管理线程池
  • 工作线程:线程池中的线程
  • 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
  • 任务队列:用于存放待处理的任务,提供一种缓冲机制 Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable和Future、FutureTask这几个类

2.4.1、类关系图 

Java多线程与并发知识点梳理

2.4.2、ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
  • corePoolSize:指定了线程池中的线程数量
  • maximumPoolSize:指定了线程池中的最大线程数量
  • keepAliveTime:当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即多次时间内会被销毁
  •  unit:keepAliveTime的单位
  •  workQueue:任务队列,被提交但尚未被执行的任务
  •  threadFactory:线程工厂,用于创建线程,一般用默认的即可
  •  handler:拒绝策略,当任务太多来不及处理(即工作队列消费的速度赶不上生产的速度),如何拒绝任务
int corePoolSize = 5;
        int maximumPoolSize = 10;
        long keepAliveTime = 60;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100,false);
        ThreadFactory threadFactory =  r -> new Thread(r,"bobo");

        ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();



        ExecutorService es = new ThreadPoolExecutor(corePoolSize, 
                                                    maximumPoolSize, 
                                                    keepAliveTime, 
                                                    unit, 
                                                    workQueue,
                                                    threadFactory,
                                                    callerRunsPolicy);

        for (int i = 0; i < 1000; i++) {
            final int a =i;
            es.execute(()->{
                System.out.println(a);
            });
        }

2.4.3、拒绝策略

线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题

JDK内置的拒绝策略如下:

  • AbortPolicy : 直接抛出异常,阻止系统正常运行(默认的)
  • CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降
  • DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务
  •  DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案
  • 以上内置拒绝策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际需要,完全可以自己扩展RejectedExecutionHandler接口

2.4.4、工作过程

1、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们

2、当调用 execute() 方法添加一个任务时,线程池会做如下判断:

    a)如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务

    b)如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列

    c)如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务

    d)如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异RejectExecutionException

3、当一个线程完成任务时,它会从队列中取下一个任务来执行。

4、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小

2.5、线程池的作用 

假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能

  • 线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的
  • 显著减少了创建线程的数目
  • 管理线程,避免增加创建线程和销毁线程的资源损耗

2.6、不建议使用 Executors静态工厂构建现成的线程池

阿里巴巴Java开发手册,明确指出不允许使用Executors静态工厂构建线程池
原因如下:
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors返回的线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadPool:允许的请求队列(底层实现是LinkedBlockingQueue)长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
  • CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM
     

注意:使用*队列的线程池会导致内存飙升

2.7、线程池如何处理异常

Java多线程与并发知识点梳理

2.8、线程池的工作队列

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • DelayQueue
  • PriorityBlockingQueue
  • SynchronousQueue

3、阻塞队列

3.1、阻塞队列API

Java多线程与并发知识点梳理

3.2、阻塞队列家族

队列

有界性


数据结构

ArrayBlockingQueue

bounded(有界)

加锁

arrayList

LinkedBlockingQueue

optionally-bounded

加锁

linkedList

PriorityBlockingQueue

unbounded

加锁

heap

DelayQueue

unbounded

加锁

heap

SynchronousQueue

bounded

加锁


LinkedTransferQueue

unbounded

加锁

heap

LinkedBlockingDeque

unbounded

无锁

heap

3.2.1、ArrayBlockingQueue

是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先申请获取锁的线程先得到锁,那么这个锁就是公平的。反之,这个锁就是不公平的】

ArrayBlockingQueue的构造方法:

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

capacity:队列大小,即队列只能容纳capacity个元素

fail:是否公平

3.2.2、LinkedBlockingQueue

一个由链表结构组成的有界队列,此队列的默认长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序

LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能

LinkedBlockingQueue构造方法:

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

3.2.3、DelayQueue

一个实现延迟获取的*队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。DelayQueue内部使用非线程安全的优先队列(PriorityQueue)

DelayQueue可以运用在以下应用场景:

  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了
  • 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的
public static void main(String[] args) {

        BlockingQueue<Task> blockingQueue = new DelayQueue<Task>();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        long begin = System.currentTimeMillis();
                        Task element = blockingQueue.take();
                        System.out.println("get----"+element.getTaskId()+"----"+(System.currentTimeMillis()-begin));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }


            }
        }).start();

        Random r = new Random();
        Task t1 = new Task(r.nextInt(20),r.nextInt(30)+1,TimeUnit.SECONDS);
        Task t2 = new Task(r.nextInt(20),r.nextInt(30)+1,TimeUnit.SECONDS);
        Task t3 = new Task(r.nextInt(20),r.nextInt(30)+1,TimeUnit.SECONDS);
        Task t4 = new Task(r.nextInt(20),r.nextInt(30)+1,TimeUnit.SECONDS);
        Task t5 = new Task(r.nextInt(20),r.nextInt(30)+1,TimeUnit.SECONDS);
        try {
            blockingQueue.put(t1);
            blockingQueue.put(t2);
            blockingQueue.put(t3);
            blockingQueue.put(t4);
            blockingQueue.put(t5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("完成");
    }
class Task implements Delayed{

    private Integer taskId;
    /**
     * 触发时间
     */
    private long triggerTime;

    /**
     *
     * @param delayTime 延迟时间
     * @param unit 时间单位
     */
    public Task(Integer taskId,long delayTime,TimeUnit unit){
        System.out.println(taskId+"----"+delayTime);
        this.taskId = taskId;
        //计算触发时间
        this.triggerTime = System.currentTimeMillis()+unit.toMillis(delayTime);
    }

    /**
     * 获取延迟时间,该方法由阻塞队列周期执行
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return triggerTime-System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        Task t = (Task)o;
        return this.taskId.compareTo(t.getTaskId());
    }

    public Integer getTaskId() {
        return taskId;
    }

    public void setTaskId(Integer taskId) {
        this.taskId = taskId;
    }
}

输出:

3----13
5----15
1----3
17----20
0----16
完成
get----0----16008
get----1----0
get----3----0
get----5----0
get----17----4000

3.2.4、SynchronousQueue

一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收

3.2.5、PriorityBlockingQueue

 一个支持元素优先级排序的*队列,默认使用元素的compareTo方法进行比较来确定元素的优先级,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序

BlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>(10,(a,b)->{return a.compareTo(b);});

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Integer element = blockingQueue.take();
                        System.out.println(element);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }


            }
        }).start();

        Random r = new Random();
        for (int i = 0; i < 50; i++) {
            int a = r.nextInt(100);
            try {
                blockingQueue.put(a);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("完成");

3.2.6、LinkedTransferQueue

一个由链表结构组成的*阻塞队列,相当于其它队列,LinkedTransferQueue实现TransferQueue接口,而TransferQueue

继承BlockingQueue接口

TransferQueue接口的方法:

  • tryTransfer(E):将元素立刻给消费者。准确的说就是立刻给一个等待接收元素的线程,如果没有消费者就会返回false,而不将元素放入队列
  • transfer(E):将元素给消费者,如果没有消费者就会等待
  • tryTransfer(E,long,TimeUnit):将元素立刻给消费者,如果没有就等待指定时间。给失败返回false
  • hasWaitingConsumer():返回当前是否有消费者在等待元素
  • getWaitingConsumerCount():返回等待元素的消费者个数

3.2.7、LinkedBlockingDeque

 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

4、线程工具类

4.1、FutureTask

4.1.1、构造方法

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

4.1.2、示例 

FutureTask<String> ft = new FutureTask<>(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务完成了");
        }, "success");

        ft.run();

        try {
            //此方法会阻塞
            String s = ft.get();
            System.out.println(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

4.2、AQS(AbstractQueuedSynchronizer抽象队列同步器)

AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch

AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现,AQS这里只定义了一个接口

ReentrantLock 内部有两个内部类,分别是 FairSync 和 NoFairSync,对应公平锁和非公平锁。他们都继承自 Sync。Sync 又继承自AQS

AQS 中有两个重要的成员:

  • 成员变量 state:用于表示锁现在的状态,用 volatile 修饰,保证内存一致性。同时所用对 state 的操作都是使用 CAS 进行的。state 为0表示没有任何线程持有这个锁,线程持有该锁后将 state 加1,释放时减1。多次持有释放则多次加减
  • 还有一个双向链表,链表除了头结点外,每一个节点都记录了线程的信息,代表一个等待线程。这是一个 FIFO 的链表

AQS定义两种资源共享方式

  • Exclusive独占资源-ReentrantLock Exclusive(独占,只有一个线程能执行,如ReentrantLock)
  • Share共享资源-Semaphore/CountDownLatch Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)

4.2.1、AQS原理

请求锁时有三种可能:

  • 如果没有线程持有锁,则请求成功,当前线程直接获取到锁。
  • 如果当前线程已经持有锁,则使用 CAS 将 state 值加1,表示自己再次申请了锁,释放锁时减1。这就是可重入性的实现。
  • 如果由其他线程持有锁,那么将自己添加进等待队列

4.2.2、自定义同步器

a)自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了

b)自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false

c)同步器的实现是AQS核心(state资源状态计数):

  • 以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的
  • 以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作

d)ReentrantReadWriteLock实现独占和共享两种方式:

自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire+tryRelease或tryAcquireShared+tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

4.3、Condition(更高效)

  • Condition是个接口,基本的方法就是await()和signal()方法
  • Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() 
  • 调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用
  • Conditon中的await()对应Object的wait()
  • Condition中的signal()对应Object的notify()
  • Condition中的signalAll()对应Object的notifyAll()

4.4、Semaphore(信号量-控制同时访问的线程个数)

通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可

Semaphore类中比较重要的几个方法:

  • public void acquire(): 用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可
  • public void acquire(int permits):获取permits个许可
  • public void release() { } :释放许可。注意,在释放许可之前,必须先获获得许可
  • public void release(int permits) { }:释放permits个许可

上面4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:

  • public boolean tryAcquire():尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
  • public boolean tryAcquire(long timeout, TimeUnit unit):尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
  • public boolean tryAcquire(int permits):尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false 4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit): 尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
  • 还可以通过availablePermits()方法得到可用的许可数目

例子:

若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现

package com.bobo.group.test;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {

    /**
     * 定义机器数量
     */
    public static final int MACHINE_AMOUNT=5;
    /**
     * 定义工人数量
     */
    public static final int WORKER_AMOUNT=8;

    public Semaphore s = new Semaphore(MACHINE_AMOUNT);

    /**
     * 工人工作的方法
     */
    public void work(int number){
        System.out.println("工人"+number+"--开始work--");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("工人"+number+"--结束work--");
        s.release();
    }


    public static void main(String[] args) {
        SemaphoreTest st = new SemaphoreTest();
        for (int i = 0; i < WORKER_AMOUNT; i++) {
            final int number=i;
            new Thread(()->{
                try {
                    st.s.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                st.work(number);
            }).start();
        }

    }
}

4.5、CountDownLatch(线程计数器)

  • CountDownLatch是JDK提供的一个同步工具
  • 有countDown方法和await方法,CountDownLatch在初始化时,需要指定用给定一个整数作为计数器。当调用countDown方法时,计数器会被减1;当调用await方法时,如果计数器大于0时,线程会被阻塞,一直到计数器被countDown方法减到0时,线程才会继续执行。计数器是无法重置的,当计数器被减到0时,调用await方法都会直接返回
  • await方法的另一个重载,传入等待的超时时间,可以不用一直阻塞

5、java锁

5.1、synchronized关键字

5.1.1、使用方式

作用于静态方法:用类的class对象作为锁

作用于实例方法:用this对象作为锁

作用于实例代码块:用this对象作为锁

5.1.2、底层原理

Java多线程与并发知识点梳理

每个对象都有个monitor对象,加锁就是在竞争monitor对象

代码块加锁是在前后分别加上monitorenter和monitorexit指令来实现的

方法加锁是通过一个标记位ACC_SYNCHRONIZED 来判断的

monitor对象存在于每个Java对象的对象头中(存储的指针的指向),synchronized锁便是通过这种方式获取锁的,也是为什么Java中任意对象可以作为锁的原因,同时也是notify/notifyAll/wait等方法存在于*对象Object中的原因

monitor 对象由 C++ 实现。其中有三个关键字段:
_owner 记录当前持有锁的线程
_EntryList 是一个队列,记录所有阻塞等待锁的线程
_WaitSet 也是一个队列,记录调用 wait() 方法并还未被通知的线程。

Monitor的操作机制如下:
多个线程竞争锁时,会先进入 EntryList 队列。竞争成功的线程被标记为 Owner。其他线程继续在此队列中阻塞等待
如果 Owner 线程调用 wait() 方法,则其释放对象锁并进入 WaitSet 中等待被唤醒。Owner 被置空,EntryList 中的线程再次竞争锁
如果 Owner 线程执行完了,便会释放锁,Owner 被置空,EntryList 中的线程再次竞争锁

5.1.3、关于synchronized的其它知识点

  • 不能继承:父类的方法中有synchronized关键字,子类不能继承下来,需要显式指定
  • synchronized是一个重量级操作,需要调用操作系统相关接口,性能是低效的,有可能给线程加锁消耗的时间比有用操作消耗的时间更多
  • Java6,synchronized进行了很多的优化,有适应自旋、锁消除、锁粗化、轻量级锁及偏向锁等,效率有了本质上的提高。在之后推出的Java7与8中,均对该关键字的实现机理做了优化。引入了偏向锁和轻量级锁。都是在对象头中有标记位,不需要经过操作系统加锁
  • synchronized是悲观锁,synchronized和ReentrantLock等独占锁就是悲观锁思想的实现
  • synchronized是可重入锁
  • synchronized是非公平锁

5.2、Lock接口

5.2.1、synchronized与lock的区别

  • 首先synchronized是java内置关键字,在jvm层面,Lock是个java类
  • synchronized无法判断是否获取到锁,Lock可以判断是否获取到锁
  • synchronized会自动释放锁(执行完代码或发生异常会释放锁),Lock需在finally中手工释放锁,否则容易造成线程死锁,利用lock()与unlock()方法
  • 用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去,而Lock锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了
  • synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可中断、可以设置是否公平
  • Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码少量的同步问题
  • ReentrantLock类可以唤醒指定条件的线程,而object的唤醒是随机的
  • lock的实现原理:计数值、双向链表、CAS+自旋

5.2.2、可重入锁ReentrantLock

ReentrantLock是Lock接口的实现类,Lock接口和ReadWriteLock接口无继承关系

5.2.3、读写锁ReadWriteLock

读写锁分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,多个写锁互斥。这是由jvm自己控制的,你只要上好相应的锁即可

ReadWriteLock是接口,ReentrantReadWriteLock是具体的实现类

5.3、锁的思想(非实际锁)

5.3.1、公平锁、非公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁

非公平锁性能比公平锁高5~10倍,因为公平锁需要在多核的情况下维护一个队列

synchronized是非公平锁,ReentrantLock 默认的lock()方法采用的是非公平锁

5.3.2、分段锁

分段锁也并非一种实际的锁,而是一种思想,ConcurrentHashMap是学习分段锁的最好实践

分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发操作
当需要put元素的时候,并不是对整个hashmap进行加锁,而是先通过hashcode来知道他要放在哪一个分段中,然后对这个分段进行加锁,所以当多线程put的时候,只要不是放在一个分段中,就实现了真正的并行的插入
 

5.3.3、可重入锁

假如一把锁锁了n个地方,那么只要得到这把锁,那n个地方都可以访问

同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。ReentrantLock 和synchronized 都是 可重入锁

5.3.4、乐观锁与悲观锁

乐观锁适用于读比较多的场景,悲观锁适用于写比较多的场景,不加锁会带来大量的性能提升
乐观锁常见的两种实现方式:版本号机制或CAS算法实现

a)版本号机制

  • 一般是在数据表中加上一个数据版本号version字段,当数据被修改时,version值会加一
  • 当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功

b)CAS算法

  • 即 compare and swap(比较与交换),是一种有名的无锁算法,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步
  • check and act模式,先检查后操作模式    首先检查一个变量的值,然后再基于这个值做一些操作,check then act操作必须是原子的
  • 原子就是说”check“操作和”act“被当做一个原子代码块执行。不存在多个线程同时执行原子块
  • CAS有3个操作数,内存值V,预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做
  • CAS算法会导致ABA问题,而版本号机制不会

c) cas补充

内存值v被volatile修饰,保证了可见性

如何保证原子性(先比较再替换):

首先判断内存值与预期值是否相等,如果相等,则将内存值修改成B,预期值也改为B;如果不等,只将预期值改为V

cas问题:

  • 自旋开销大
  • 只能保证一个变量的原子操作,解决办法:将多个变量封装成对象,通过 AtomicReference 来保证原子性
  • ABA问题:在cas比较过程中,另一个线程将内存值V由A改成了B,又由B改成了A,解决办法:JDK提供AtomicStampedReference类用于控制变量的版本,不过还不如直接使用互斥锁效率高;或者如果ABA问题对程序影响不大的话,就不用解决

5.3.5、共享锁和独占锁

a)独占锁

独占锁模式下,每次只能有一个线程能持有锁,ReentrantLock就是以独占方式实现的互斥锁。独占锁是一种悲观保守的加锁策略,它避免了读/读冲突,如果某个只读线程获取锁,则其他读线程都只能等待,这种情况下就限制了不必要的并发性,因为读操作并不会影响数据的一致性

b)共享锁

共享锁则允许多个线程同时获取锁,并发访问 共享资源,如:ReadWriteLock的读锁是共享锁。共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源

5.3.6、自旋锁

  • 自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU
  • 获取不到锁的线程要么阻塞,要么自旋

5.4、锁状态(针对synchronized)

锁的状态总共有四种:无锁状态、偏向锁、轻量级锁和重量级锁

5.5、锁优化

5.5.1、减少锁持有时间

只用在有线程安全要求的程序上加锁

5.5.2、减小锁粒度

将大对象(这个对象可能会被很多线程访问),拆成小对象,大大增加并行度,降低锁竞争。降低了锁的竞争,偏向锁,轻量级锁成功率才会提高。最最典型的减小锁粒度的案例就是ConcurrentHashMap

5.5.3、锁分离

最常见的锁分离就是读写锁ReadWriteLock,根据功能进行分离成读锁和写锁,这样读读不互斥,读写互斥,写写互斥,即保证了线程安全,又提高了性能,具体也请查看[高并发Java 五] JDK并发包1。读写分离思想可以延伸,只要操作互不影响,锁就可以分离。比如LinkedBlockingQueue 从头部取出,从尾部放数据

5.5.4、锁粗化

通常情况下,为了保证多线程间的有效并发,会要求每个线程持有锁的时间尽量短,即在使用完公共资源后,应该立即释放锁。但是,凡事都有一个度,如果对同一个锁不停的进行请求、同步和释放,其本身也会消耗系统宝贵的资源,反而不利于性能的优化 

5.5.5、锁消除

锁消除是在编译器级别的事情。在即时编译器时,如果发现不可能被共享的对象,则可以消除这些对象的锁操作,多数是因为程序员编码不规范引起