JUC-并发编程的艺术

时间:2023-01-17 20:51:35

本文转自:https://my.oschina.net/tjt/blog/726522

Java并发容器和框架

ConcurrentHashMap的实现原理与使用

  • 在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非 常低下,基于以上两个原因,便有了ConcurrentHashMap
  • 多线程会导致HashMap的Entry链表 形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获 取Entry。
  • HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable 的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同 步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方 法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。
  • HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的 线程都必须竞争同一把锁
  • ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存 储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数 据也能被其他线程访问。
  • ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重 入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数 据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种 数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元 素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时, 必须首先获得与它对应的Segment锁
  • 1.初始 segments数组的源代码
    if ( concurrencyLevel > MAX_SEGMENTS ) {
    concurrencyLevel = MAX_SEGMENTS;
    }
    int sshift = 0;
    int ssize = 1;
    while ( ssize < concurrencyLevel ) {
    ++sshift;
    ssize <<= 1;
    }
    segmentShift = 32 - sshift;
    segmentMask = ssize - 1;
    this.segments = Segment.newArray( ssize );
    • 由上面的代码可知,segments数组的长度ssize是通过concurrencyLevel计算得出的。为了能 通过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方 (power-of-two size),所以必须计算出一个大于或等于concurrencyLevel的最小的2的N次方值 来作为segments数组的长度。假如concurrencyLevel等于14、15或16,ssize都会等于16,即容器里 锁的个数也是16。
  • 2.初始化segmentShift和segmentMask 这两个全局变量需要在定位segment时的散列算法里使用,sshift等于ssize从1向左移位的 次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。 segmentShift用于定位参与散列运算的位数,segmentShift等于32减sshift,所以等于28,这里之所 以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的,后面的测试中我们 可以看到这点。segmentMask是散列运算的掩码,等于ssize减1,即15,掩码的二进制各个位的 值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是 65535,对应的二进制是16位,每个位都是1。
  • 3.初始化每个segment
    • 注意前面是初始化整个segment数组
  • initialCapacity是ConcurrentHashMap的初始化容量(默认16),loadfactor是每个segment的负 载因子(默认0.75)

    if (initialCapacity > MAXIMUM_CAPACITY)
    initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
    ++c;
    // 变量cap就是segment里HashEntry数组的长度, 不是1,就是2的N次方。

    int cap = 1;
    while (cap < c)
    cap <<= 1;
    for (int i = 0; i < this.segments.length; ++i)
    this.segments[i] = new Segment<K,V>(cap, loadFactor);
  • 定位Segment
  • ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素 的时候,必须先通过散列算法定位到Segment ConcurrentHashMap会首先使用 Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。 目的是减少散列冲突,使元素能够均匀地分布在不同的Segment上, 从而提高容器的存取效率。假如散列的质量差到极点,那么所有的元素都在一个Segment中, 不仅存取元素缓慢,分段锁也会失去意义。
    private static int hash(int h) {
    h += (h << 15) ^ 0xffffcd7d;
    h ^= (h >>> 10);
    h += (h << 3);
    h ^= (h >>> 6);
    h += (h << 2) + (h << 14);
    return h ^ (h >>> 16);
    }
  • ConcurrentHashMap通过以下散列算法定位segment。
    final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
    }

    ConcurrentHashMap的操作

  • 1.get操作
    • 先经过一次再散列,然后使用这个散列值通过散 列运算定位到Segment,再通过散列算法定位到元素, 整个get过程不需要加锁,除非读到的值是空才会加锁重读( 它的get方法里将要使用的共享变量都定义成volatile类型, 能够在线 程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写,但是get方法不需要写 ),所以高效
    • 之所以不会读到过期的值,是因为根据Java内存模 型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取 volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景。
      public V get(Object key) {
      int hash = hash(key.hashCode());
      return segmentFor(hash).get(key, hash);
      }
  • 2.put操作
    • 为了线程安全,在操作共享变量时必 须加锁。put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个 步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容( 为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只 对某个segment进行扩容。 ),第二步定位添加元素的位 置,然后将其放在HashEntry数组里。

Java的阻塞队列

  • 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞 的插入和移除方法。
    • 1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不 满。
    • 2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列 变为非空。
  • 阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是 从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
  • 在阻塞队列不可用时,这两个附加操作提供了4种处理方式
  • ·抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
  • ·返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移 除方法,则是从队列里取出一个元素,如果没有则返回null。
  • ·一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者 线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队 列会阻塞住消费者线程,直到队列不为空。
  • ·超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程 一段时间,如果超过了指定的时间,生产者线程就会退出。
  • JDK 7提供了7个阻塞队列,如下。
  • ·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。 默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照 阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平 的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问 队列。为了保证公平性,通常会降低吞吐量。
  • ·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。此队列按照先进先出的原则对元素进行排序。
  • ·PriorityBlockingQueue:一个支持优先级排序的*阻塞队列。 PriorityBlockingQueue是一个支持优先级的*阻塞队列。默认情况下元素采取自然顺序 升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始 化 PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证 同优先级元素的顺序。
  • ·DelayQueue:一个使用优先级队列实现的*阻塞队列。 DelayQueue是一个支持延时获取元素的*阻塞队列。队列使用PriorityQueue来实现。队 列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。 只有在延迟期满时才能从队列中提取元素。
    • DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。
    • ·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
    • ·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从 DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。
  • ·SynchronousQueue:一个不存储元素的阻塞队列。 SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作, 否则不能继续添加元素. SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费 者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于 LinkedBlockingQueue和ArrayBlockingQueue。
  • ·LinkedTransferQueue:一个由链表结构组成的*阻塞队列。
  • ·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

Fork/Join框架

  • 工作窃取算法: 干完活的线程去帮其他线程干活,就去其他线程的队列 里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被 窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿 任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
  • Fork/Join使用两个类来完成上述任务
    • ①ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务 中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继 承它的子类,Fork/Join框架提供了以下两个子类。
    • ·RecursiveAction:用于没有返回结果的任务。
    • ·RecursiveTask:用于有返回结果的任务。
    • ②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当 一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任 务。

Java中的13个原子操作类

  • Atomic包原子更新基本类型类
    • ·AtomicBoolean:原子更新布尔类型。
    • ·AtomicInteger:原子更新整型。
    • ·AtomicLong:原子更新长整型。
    • 以AtomicInteger为例 举例一些api
      • 1 ·int addAndGet(int delta):以原子方式将输入的数值与实例中的值(AtomicInteger里的 value)相加,并返回结果。
      • 2·boolean compareAndSet(int expect,int update):如果输入的数值等于预期值,则以原子方 式将该值设置为输入的值。
      • 3·int getAndIncrement():以原子方式将当前值加1,注意,这里返回的是自增前的值。
      • 4·void lazySet(int newValue):最终会设置成newValue,使用lazySet设置值后,可能导致其他 线程在之后的一小段时间内还是可以读到旧的值。关于该方法的更多信息可以参考并发编程 网翻译的一篇文章《AtomicLong.lazySet是如何工作的》
      • 5·int getAndSet(int newValue):以原子方式设置为newValue的值,并返回旧值。
  • Atomic包 原子更新数组
    • AtomicIntegerArray:原子更新整型数组里的元素。
    • 1 ·int addAndGet(int i,int delta):以原子方式将输入值与数组中索引i的元素相加。
    • 2·boolean compareAndSet(int i,int expect,int update):如果当前值等于预期值,则以原子 方式将数组位置i的元素设置成update值。
    • ·AtomicLongArray:原子更新长整型数组里的元素。
    • ·AtomicReferenceArray:原子更新引用类型数组里的元素。
  • 原子更新引用类型(还是compareAndSet方法)
    • ·AtomicReference:原子更新引用类型。
    • ·AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
    • ·AtomicMarkableReference:原子更新带有标记位的引用类型。可以原子更新一个布尔类 型的标记位和引用类型。构造方法是AtomicMarkableReference(V initialRef,boolean initialMark)。

java中的线程池

  • 线程池的实现原理
    • 1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
    • 2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
    • 3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。 -ThreadPoolExecutor执行execute()方法的示意图
  • 1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤 需要获取全局锁)。
  • 2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
  • 3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执 行这一步骤需要获取全局锁)。
  • 4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。
    • ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能 地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后 (当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而 步骤2不需要获取全局锁。
  • 源码分析
    public void execute ( Runnable command ) {
    if ( command == null ) {
    throw new NullPointerException();
    }
    // 如果线程数小于基本线程数,则创建线程并执行当前任务
    if ( poolSize >= corePoolSize || !addIfUnderCorePoolSize( command ) ) {
    // 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
    if ( runState == RUNNING && workQueue.offer( command ) ) {
    if ( runState != RUNNING || poolSize == 0 ) {
    ensureQueuedTaskHandled( command );
    }
    }
    // 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
    else if ( !addIfUnderMaximumPoolSize( command ) ){
    // 抛出RejectedExecutionException异常
    reject( command ); // is shutdown or saturated
    }
    }
    }
  • 工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务 后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点。
    public void run() {
    try {
    Runnable task = firstTask;
    firstTask = null;
    while (task != null || (task = getTask()) != null) {
    runTask(task);
    task = null;
    }
    } finally {
    workerDone(this);
    }
    }

    线程池的使用

    • 线程池的创建: new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

      参数说明:

      1. runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择 ( ArrayBlockingQueue,LinkedBlockingQueue(静态工厂方法Executors.newFixedThreadPool()使用了这个队列),SynchronousQueue(Executors.newCachedThreadPool使用了这个队列。),PriorityBlockingQueue )。
      1. ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线 程设置有意义的名字 代码如下: new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
      1. RejectedExecutionHandler (饱和策略):当队列和线程池都满了,说明线程池处于饱和状 态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法 处理新任务时抛出异常。( AbortPolicy:直接抛出异常。 ·CallerRunsPolicy:只用调用者所在线程来运行任务。 ·DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。 ·DiscardPolicy:不处理,丢弃掉。)

        合理配置线程池

  • 可以从以下几个角度分析:
    • ·任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
    • ·任务的优先级:高、中和低。
    • ·任务的执行时间:长、中和短。
    • ·任务的依赖性:是否依赖其他系统资源,如数据库连接。
  • 1.性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的 线程,如配置N cpu +1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配 置尽可能多的线程,如2*N cpu 。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务 和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
  • 2.优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高 的任务先执行。( 如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能 执行。 )
    1. 执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让 执行时间短的任务先执行。
  • 4.依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越 长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
  • 建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点 儿,比如几千。
原文地址:https://my.oschina.net/tjt/blog/726522