《实战Java高并发程序设计》笔记

时间:2021-05-05 23:48:48

线程锁

  • 非公平锁 :大多数情况下,锁的申请都是非公平的。如果多个线程几乎是同时申请同一个锁,系统是在这个锁的申请队列中随机挑选一个。因此不能保证其公平性

  • 公平锁:公平锁会按照申请锁的时间先后顺序,保证先到先得。公平锁不会造成饥饿现象,但是公平锁需要系统维持一个有序的队列,所以公平锁的实现成本高

synchronized

synchronized的作用是实现线程间的同步,它的工作时对同步的代码加锁,使得每一次只能有一个线程进入同步块,从而保证线程间的安全。

  1. 特点

    • synchronized是非公平锁
  2. 常用方法

    • 同步代码块:相当于给对象加锁,进入同步代码前要获得给定对象的锁
    • 直接作用于实例方法:相当于对当前实例加锁,进入同步代码前要获得当前类的锁
    • 直接作用于静态方法:相当于对当前类加锁,进入同步代码前要获得当前类的锁
public class VolatileThread implements Runnable {
    static VolatileThread instance = new VolatileThread();
    static volatile int i = 0;
    static volatile int m = 0;
    static volatile int n = 0;

    /** * synchronized 作用于实例方法,相当于给某个对象加锁 */
    public synchronized void increase() {
        i++;
    }

    /** * synchronized 作用于静态方法,相当于给某个类加锁 */
    public static synchronized void increaseM() {
        m++;
    }


    @Override
    public void run() {
        for (int j = 0; j < 10000000; j++) {
            increase();
            increaseM();

            /** * synchronize 作用于代码块,相当于给对象加锁 */
            synchronized (this) {
                n++;
            }
        }
        System.out.println("---------" );
        System.out.println("一个线程执行完毕:" + i);
        System.out.println("一个线程执行完毕:" + m);
        System.out.println("一个线程执行完毕:" + n);
    }

    /** * volatile 的作用是,告诉虚拟机该变量是易变的,所有的线程都能看到该变量的改变 */

    public static void main(String[] a) throws Exception {
        Thread t1 = new Thread(new VolatileThread());
        Thread t2 = new Thread(new VolatileThread());
        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println("---------" + i);
        System.out.println("---------" + m);
        System.out.println("---------" + n);

        i = 0;
        m = 0;
        n = 0;
        Thread t3 = new Thread(instance);
        Thread t4 = new Thread(instance);
        t3.start();
        t4.start();
        t3.join();
        t4.join();

        System.out.println("---------" );
        System.out.println("---------" + i);
        System.out.println("---------" + m);
        System.out.println("---------" + n);
    }
}
--------- 一个线程执行完毕:19113402 一个线程执行完毕:19289139 一个线程执行完毕:19230795 ---------
一个线程执行完毕:19825393
一个线程执行完毕:20000000
一个线程执行完毕:19940130
---------19825393
---------20000000
---------19940130 ---------
一个线程执行完毕:19941228
一个线程执行完毕:19941227
一个线程执行完毕:19942571 ---------
一个线程执行完毕:20000000
一个线程执行完毕:20000000
一个线程执行完毕:20000000 ---------
---------20000000
---------20000000
---------20000000

重入锁 ReentrantLock

ReentrantLock 默认是非公平锁,可设置为公平锁

  1. 特点

    • 灵活性:远比 synchronized 好
    • 支持中断响应
    • 锁申请等待限时
  2. 常用方法

    • lock():获得锁,如果锁已经被占用,则等待
    • lockInterruptibly() :获得锁,但优先响应中断
    • tryLock() :尝试获得锁,如果成功,则返回true,失败返回false。该方法不等待,直接返回。
    • tryLock(long time, TimeUnit unit) : 在给定的时间内尝试获得锁。
    • unlock() :释放锁。
  3. ReenrantLock实现三要素

    • 原子状态:原子状态使用CAS操作来存储当前锁的状态,判断锁是否已经被其他线程占用了。
    • 等待队列:所有没有请求到锁的线程,都会进入等待队列。待有线程释放锁后,系统就能从队列中唤醒一个线程,继续工作。
    • 阻塞原语park()和unpark():用于挂起线程和恢复线程。
public class TryLock implements Runnable {
    private ReentrantLock lock1 = new ReentrantLock();
    private ReentrantLock lock2 = new ReentrantLock();

    private int lock;

    public TryLock(int lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        if (lock == 1) {
            //循环尝试获取锁完成工作
            while (true) {
                try {
                    if (lock1.tryLock()) {

                        try {
                            Thread.sleep(500);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    if (lock2.tryLock()) {
                        try {
                            System.out.println(Thread.currentThread().getId() + ": My job done!");
                            return;
                        } finally {
                            lock2.unlock();
                        }
                    }
                }finally {
                    lock1.unlock();
                }
            }

        } else {
            while(true){
                try {
                    if (lock2.tryLock()) {
                        try{
                            Thread.sleep(500);
                        }catch (Exception e){}

                        if(lock1.tryLock()){
                            try {
                                System.out.println(Thread.currentThread().getId() + ": My job done!");
                                return;
                            }finally {
                                lock1.unlock();
                            }
                        }
                    }
                }finally {
                    lock2.unlock();
                }
            }
        }
    }

    public static void main(String[] a){
        TryLock tryLock1 = new TryLock(1);
        TryLock tryLock2 = new TryLock(2);
        /** 两个线程都想获取到两个锁完成工作,但是都先后分别占据lock1和lock2,处于饥饿状态, 直至先有一方休眠500秒后,释放一个锁,另外一个线程首先完成工作,然后释放两个锁。 然后前一个线程也能获取到两个锁完成工作。 **/
        Thread t1 = new Thread(tryLock1);
        Thread t2 = new Thread(tryLock2);
        t1.start();
        t2.start();
    }
}

条件 condiction

和synchronized的wait()、notify()作用相同,Condiction是和重入锁ReentrantLock关联。

  1. 常用方法
    • await():使当前线程进入等待,同时释放当前锁,当其他线程中使用了signal()或者signalAll(),线程会重新获得锁并继续进行。或者当线程被中断时,也能跳出等待。
    • awaitUninterruptibly():和await()方法基本相同,但是线程不会在等待过程中响应中断。
    • sinal():唤醒一个在等待中的线程。
public class ReentrantLockCondition implements Runnable {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    @Override
    public void run() {
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + " get lock");
            //线程要调用Condition.await()时,必须先获得锁。调用await()后,线程进入等待,同时释放锁
            condition.await();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + " finally block");
            lock.unlock();
        }
    }

    public static void main(String[] args) throws Exception {
        ReentrantLockCondition reentrantLockCondition = new ReentrantLockCondition();
        Thread t1 = new Thread(reentrantLockCondition, "t1");
        t1.start();
        Thread.sleep(2000);
        lock.lock();
        System.out.println("sinal");
        //线程要调用Condition.signal()时,也必须先获得锁。signal()后,线程进入等待,同时释放锁
        condition.signal();
        lock.unlock();
    }
}
  • 线程要调用Condition.await()时,必须先获得锁。调用await()后,线程进入等待,同时释放锁
  • 线程要调用Condition.signal()时,也必须先获得锁。signal()后,线程进入等待,同时释放锁
  • 调用了signal()方法后,系统会在Condition对象的等待队列里唤醒一个线程。一旦线程被唤醒,它会尝试获取与之绑定的重入锁。

运行结果

t1 get lock
sinal
t1 finally block

信号量 Semaphore

一个计数信号量。 在概念上,信号量维持一组许可证。 Semaphore只保留可用数量的计数,并相应地执行。
信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。

  1. 构造函数

    • public Semaphore(int permits)
    • public Semaphore(int permits, boolean fair)
  2. 常用方法

    • accquire() :获取一个准入的许可证。若无法获取则进入等待,直到有线程释放一个许可或者当前线程被中断。
    • accquireUninterrptibly():和accquire()方法类似,但是不能响应中断。
    • tryAccquire():尝试获取一个许可证,成功则返回true,失败则返回false,失败后不进入等待。
    • tryAccquire(long time, TimeUnit unit):和tryAccquire()类似,但有个等待时间。
    • release() : 释放一个许可证。
public class SemaphoreDemo implements Runnable {
    private Semaphore semaphore = new Semaphore(5);

    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getId() + " get semaphore");
            Thread.sleep(2000);
            semaphore.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        SemaphoreDemo semaphoreDemo = new SemaphoreDemo();

        for (int i = 0; i < 20; i++) {
            exec.submit(semaphoreDemo);
        }
    }
}

ReadWriteLock 读写锁

ReadWriteLock 是JDK1.5 中提供的读写分离锁。读写分离锁可以有效地减少锁竞争,以提高系统性能。

  1. 读写锁的访问束约情况
-
非阻塞 阻塞
阻塞 阻塞

- 读-读不互斥: 读读之间不阻塞;
- 读-写互斥:读阻塞写,写也会阻塞读;
- 写-写互斥:写写互斥。

public class ReadWriteLockDemo {
    //读写锁
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    //读锁
    private static ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
    //写锁
    private static ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

    private int value;

    public int handleRead(Lock lock) throws InterruptedException {
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getId() + ", get readLock");
            Thread.sleep(1000);
            return value;
        } finally {
            lock.unlock();
        }
    }

    public void handleWriteLock(Lock lock, int value) throws InterruptedException {
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getId() + ", get writeLock");
            Thread.sleep(1000);
            this.value = value;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ReadWriteLockDemo demo = new ReadWriteLockDemo();

        Runnable readRunanble = new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleRead(readLock);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        Runnable writeRunnable = new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleWriteLock(writeLock, new Random().nextInt());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < 18; i++) {
            //读-读之间是非阻塞的,所以可以并行
            new Thread(readRunanble).start();
        }

        for (int i = 18 ; i<20; i++){
            //写-读、写-写都是阻塞的,到这里执行时,不是并行的,执行比较慢
            new Thread(writeRunnable).start();
        }
    }
}

线程复用

线程池

多线程的软件设计方法确实可以最大限度地发挥现代多核处理器的计算能力,提高生产系统的吞吐量和性能。但是,若不加控制和管理的随意使用线程,对系统的性能反而产生不利的影响。使用线程池后,创建线程变成从线程池里获取空闲线程,关闭线程变成向线程池归还线程。相关链接:
https://blog.csdn.net/javazejian/article/details/77410889?locationNum=1&fps=1

  1. 线程池的作用

    • 当创建线程不受控制时,盲目创建大量的线程,会耗用系统宝贵的内存资源和CPU,如果处理不当会导致Out of Memory异常。同时大量的线程回收也会给GC带来压力,延长GC的停顿时间。
    • 线程的创建和销毁耗费时间。如果为每一个小任务都创建一个小线程,很有可能出现创建和销毁线程所占用的时间大于工程实际工作所消耗的时间;
  2. 常用方法

    • newFixedThreadPool(): 该方法返回一个固定线程数量的线程池。
    • newSingleThreadExcecutor(): 该方法返回一个只有一个线程的线程池。
    • newCacheThreadPool(): 该方法返回一个可以根据实际情况调整线程数据量的线程池。
    • newSingleThreadScheduledExcecutor(): 该方法返回一个ScheduleExecutorService对象,线程池大小是1。
    • newScheduledPool(): 该方法返回ScheduleExecutorService对象,但是可以指定线程数量。

核心线程池的内部实现

对于核心的几个线程池,无论是newFixedThreadPool()、newSingleThreadExcecutor()或其他方法,其内部实现都是使用了ThreadPoolExecutor实现。

    public ThreadPoolExecutor(int corePoolSize,
                              int maxinumPoolSize,
                              long keepAliveTime,
                              TimeUtil unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

参数含义:
- corePoolSize: 指定了线程池中的线程数量。
- maximumPoolSize:线程池最大的线程数量;
- keepAliveTime: 当线程池数量超过corePoolSize时,多余的空闲线程的存活时间。即,超过corePoolSize的空闲线程,在多久时间后会被销毁。
- unit: keepAliveTime的单元;
- workQueue: 任务队列,被提交但尚未被执行的任务。
- threadFactory: 线程工厂,用户创建线程,一般用默认的即可。
- handler: 拒绝策略。当任务太多来不及处理,如何拒绝任务。

任务队列

workQueue是指提交了但是未被执行的任务队列, 它是一个BlockingQueue接口实现的对象,仅用于保存Runnable对象。

  • 直接提交队列(SynchronousQueue):使用SynchronousQueue,提交的任务队列不会保存,会直接把新任务提交给线程执行。如果线程池没有空闲的线程,就尝试创建新线程。如果线程数量达到上线,则执行拒绝策略。
    使用直接提交队列的线程池方法:newCachedThreadPool() 。newCachedThreadPool()返回corePoolSize=0,maximumPoolSize无穷大的线程池。如果任务提交速度远大于处理数度,则会出现耗尽系统资源情况。

  • 有界的任务队列(ArrayBlockingQueue): 当有新的任务要执行时,如果线程池的线程数量小于corePoolSize时,直接创建新线程;如果线程数大于corePoolSize,会把任务加入等待队列,如果等待队列已满,无法加入,则在总线程数不大于maximumPoolSize情况下,创建新的线程,若线程数达到maximumPoolSize,则执行拒绝策略。除非系统任务很繁忙,否则ArrayBlockingQueue保持核心线程数在corePoolSize

  • *的任务队列(LinkedBlockingQueue):LinkedBlockingQueue和ArrayBlockingQueue相比,除非系统资源耗尽,否则不会出现任务入队列失败情况。当有新任务到来,如果线程池线程数小于corePoolSize,则创建新线程;如果线程数大于corePoolSize,则把任务加入队列等待。若任务创建和处理时间差异太大,*队列持续快速增长,直到耗尽内存
    使用*任务队列的线程池方法:newFixedThreadPool()。如果任务提交速度远大于处理数度,则会出现耗尽系统资源情况。

  • 优先任务队列(PriorityBlockingQueue): 优先任务队列是根据优先级来执行的特殊*队列

拒绝机制

当线程池中的线程用完,同时任务队列也满了,需要一套拒绝机制保证系统正常运行。

  1. JDK内置4种拒绝机制:

    • AbortPolicy: 中断策略,该机制会直接抛出RuntimeException异常,阻止系统正常工作。
    • CallerRunsPolicy:线程池拒绝接收任务,让提交任务的线程(Caller)去执行(Run)任务。
    • DisCardOldestPolicy:丢弃队列中最老的任务,并且尝试再次提交当前任务。
    • DiscardPolicy:直接丢弃无法处理的任务。
  2. 自定义拒绝策略

    JDK4种拒绝策略都实现了RejectedExecutionHandler接口,若是仍无法满足实际应用,完全可以自己扩展RejectExecutionHandler接口。

// RejectExecutionHandler 接口定义:
public interface RejectedExecutionHandler{
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
// r 为请求执行的任务,executor为当前的线程池

拓展线程池

ThreadPoolExecutor是一个可以拓展的线程池,提供了beforeExecute()、afterExecute()和 terminated()三个接口对线程池进行控制。

JDK的并发容器

并发开发实例

  1. java多线程之火车售票系统模拟实例
  2. Java代码实践12306售票算法