Java 并发工具包-java.util.concurrent-源码jdk1.7全面解析

时间:2022-08-15 17:36:32

先来看看类图:

Java 并发工具包-java.util.concurrent-源码jdk1.7全面解析


其实从类图我们能发现concurrent包(除去java.util.concurrent.atomic 和 java.util.concurrent.locks)中的内容并没有特别多,大概分为四类:BlockingQueue阻塞队列体系、Executor线程组执行框架、Future线程返回值体系、其他各种单独的并发工具等。


我们把它分为7个章节来描述:1、并发容器;2、同步设备;3、执行器和线程池;4、Fork-join框架;5、锁 Lock;6、原子对象;7、其它。


接下来我们分别概述:


1、并发容器



这些容器的关键方法大部分都实现了线程安全的功能,却不使用同步关键字(synchronized)。值得注意的是Queue接口本身定义的几个常用方法的区别,

  1. add方法和offer方法的区别在于超出容量限制时前者抛出异常,后者返回false;
  2. remove方法和poll方法都从队列中拿掉元素并返回,但是他们的区别在于空队列下操作前者抛出异常,而后者返回null;
  3. element方法和peek方法都返回队列顶端的元素,但是不把元素从队列中删掉,区别在于前者在空队列的时候抛出异常,后者返回null。
概述:

1.1阻塞队列

  • BlockingQueue.class,阻塞队列接口
  • BlockingDeque.class,双端阻塞队列接口
  • ArrayBlockingQueue.class,阻塞队列,数组实现
  • LinkedBlockingDeque.class,阻塞双端队列,链表实现
  • LinkedBlockingQueue.class,阻塞队列,链表实现
  • DelayQueue.class,阻塞队列,并且元素是Delay的子类,保证元素在达到一定时间后才可以取得到
  • PriorityBlockingQueue.class,优先级阻塞队列
  • SynchronousQueue.class,同步队列,但是队列长度为0,生产者放入队列的操作会被阻塞,直到消费者过来取,所以这个队列根本不需要空间存放元素;有点像一个独木桥,一次只能一人通过,还不能在桥上停留

1.2非阻塞队列

  • ConcurrentLinkedDeque.class,非阻塞双端队列,链表实现
  • ConcurrentLinkedQueue.class,非阻塞队列,链表实现

1.3转移队列

  • TransferQueue.class,转移队列接口,生产者要等消费者消费的队列,生产者尝试把元素直接转移给消费者
  • LinkedTransferQueue.class,转移队列的链表实现,它比SynchronousQueue更快

1.4其它容器

  • ConcurrentMap.class,并发Map的接口,定义了putIfAbsent(k,v)、remove(k,v)、replace(k,oldV,newV)、replace(k,v)这四个并发场景下特定的方法
  • ConcurrentHashMap.class,并发HashMap
  • ConcurrentNavigableMap.class,NavigableMap的实现类,返回最接近的一个元素
  • ConcurrentSkipListMap.class,它也是NavigableMap的实现类(要求元素之间可以比较),同时它比ConcurrentHashMap更加scalable——ConcurrentHashMap并不保证它的操作时间,并且你可以自己来调整它的load factor;但是ConcurrentSkipListMap可以保证O(log n)的性能,同时不能自己来调整它的并发参数,只有你确实需要快速的遍历操作,并且可以承受额外的插入开销的时候,才去使用它
  • ConcurrentSkipListSet.class,和上面类似,只不过map变成了set
  • CopyOnWriteArrayList.class,copy-on-write模式的array list,每当需要插入元素,不在原list上操作,而是会新建立一个list,适合读远远大于写并且写时间并苛刻的场景
  • CopyOnWriteArraySet.class,和上面类似,list变成set而已

1.1、阻塞队列

  • BlockingQueue.class,阻塞队列接口

阻塞队列 BlockingQueue

java.util.concurrent 包里的 BlockingQueue 接口表示一个线程安放入和提取实例的队列。本小节我将给你演示如何使用这个 BlockingQueue。
本节不会讨论如何在 Java 中实现一个你自己的 BlockingQueue。如果你对那个感兴趣,参考《Java 并发指南

BlockingQueue 用法

BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:

Java 并发工具包-java.util.concurrent-源码jdk1.7全面解析
一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。
负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。

BlockingQueue 的方法

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
  抛异常 特定值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除 remove(o) poll(o) take(o) poll(timeout, timeunit)
检查 element(o) peek(o)    

四组不同的行为方式解释:
  1. 抛异常:如果试图的操作无法立即执行,抛一个异常。
  2. 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  3. 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  4. 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。
可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

BlockingQueue 的实现

BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 接口的实现(Java 6):
其他:
  • BlockingDeque.class,双端阻塞队列接口

阻塞双端队列 BlockingDeque

java.util.concurrent 包里的 BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。本小节我将给你演示如何使用 BlockingDeque。
BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。
deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

BlockingDeque 的使用

在线程既是一个队列的生产者又是这个队列的消费者的时候可以使用到 BlockingDeque。如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这个时候也可以使用 BlockingDeque。BlockingDeque 图解:

Java 并发工具包-java.util.concurrent-源码jdk1.7全面解析
一个 BlockingDeque - 线程在双端队列的两端都可以插入和提取元素。
一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。

BlockingDeque 的方法

BlockingDeque 具有 4 组不同的方法用于插入、移除以及对双端队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
  抛异常 特定值 阻塞 超时
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
移除 removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
检查 getFirst(o) peekFirst(o)    



  抛异常 特定值 阻塞 超时
插入 addLast(o) offerLast(o) putLast(o) offerLast(o, timeout, timeunit)
移除 removeLast(o) pollLast(o) takeLast(o) pollLast(timeout, timeunit)
检查 getLast(o) peekLast(o)    

四组不同的行为方式解释:
  1. 抛异常:如果试图的操作无法立即执行,抛一个异常。
  2. 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  3. 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  4. 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

BlockingDeque 继承自 BlockingQueue

BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你可以像使用一个 BlockingQueue 那样使用 BlockingDeque。如果你这么干的话,各种插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法一样。
以下是 BlockingDeque 对 BlockingQueue 接口的方法的具体内部实现:
BlockingQueue BlockingDeque
add() addLast()
offer() x 2 offerLast() x 2
put() putLast()
   
remove() removeFirst()
poll() x 2 pollFirst()
take() takeFirst()
   
element() getFirst()
peek() peekFirst()

BlockingDeque 的实现

既然 BlockingDeque 是一个接口,那么你想要使用它的话就得使用它的众多的实现类的其中一个。java.util.concurrent 包提供了以下 BlockingDeque 接口的实现类:



  • ArrayBlockingQueue.class,阻塞队列,数组实现

数组阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue 类实现了 BlockingQueue 接口。
ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。
ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

  • LinkedBlockingDeque.class,阻塞双端队列,链表实现

链阻塞双端队列 LinkedBlockingDeque

LinkedBlockingDeque 类实现了 BlockingDeque 接口。
deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。(译者注:唐僧啊,受不了。)
LinkedBlockingDeque 是一个双端队列,在它为空的时候,一个试图从中抽取数据的线程将会阻塞,无论该线程是试图从哪一端抽取数据。

  • LinkedBlockingQueue.class,阻塞队列,链表实现

 链阻塞队列 LinkedBlockingQueue

LinkedBlockingQueue 类实现了 BlockingQueue 接口。
LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

  • DelayQueue.class,阻塞队列,并且元素是Delay的子类,保证元素在达到一定时间后才可以取得到

DelayQueue

DelayQueue 实现了 BlockingQueue 接口。
DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口,

  • PriorityBlockingQueue.class,优先级阻塞队列

具有优先级的阻塞队列 PriorityBlockingQueue

PriorityBlockingQueue 类实现了 BlockingQueue 接口。
PriorityBlockingQueue 是一个*的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。
所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。
注意 PriorityBlockingQueue 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。
同时注意,如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。


  • SynchronousQueue.class,同步队列  但是队列长度为0,生产者放入队列的操作会被阻塞,直到消费者过来取,所以这个队列根本不需要空间存放元素;有点像一个独木桥,一次只能一人通过,还不能在桥上停留   

SynchronousQueue

SynchronousQueue 类实现了 BlockingQueue 接口。
SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。


1.2、非阻塞队列

  • ConcurrentLinkedDeque.class,非阻塞双端队列,链表实现
  • ConcurrentLinkedQueue.class,非阻塞队列,链表实现

1.3、转移队列

  • TransferQueue.class,转移队列接口,生产者要等消费者消费的队列,生产者尝试把元素直接转移给消费者
  • LinkedTransferQueue.class,转移队列的链表实现,它比SynchronousQueue更快

1.4、其它容器

  • ConcurrentMap.class,并发Map的接口,定义了putIfAbsent(k,v)、remove(k,v)、replace(k,oldV,newV)、replace(k,v)这四个并发场景下特定的方法
  • ConcurrentHashMap.class,并发HashMap
  • ConcurrentNavigableMap.class,NavigableMap的实现类,返回最接近的一个元素
  • ConcurrentSkipListMap.class,它也是NavigableMap的实现类(要求元素之间可以比较),同时它比ConcurrentHashMap更加scalable——ConcurrentHashMap并不保证它的操作时间,并且你可以自己来调整它的load factor;但是ConcurrentSkipListMap可以保证O(log n)的性能,同时不能自己来调整它的并发参数,只有你确实需要快速的遍历操作,并且可以承受额外的插入开销的时候,才去使用它
  • ConcurrentSkipListSet.class,和上面类似,只不过map变成了set
  • CopyOnWriteArrayList.class,copy-on-write模式的array list,每当需要插入元素,不在原list上操作,而是会新建立一个list,适合读远远大于写并且写时间并苛刻的场景
  • CopyOnWriteArraySet.class,和上面类似,list变成set而已

并发 Map(映射) ConcurrentMap

java.util.concurrent.ConcurrentMap

java.util.concurrent.ConcurrentMap 接口表示了一个能够对别人的访问(插入和提取)进行并发处理的 java.util.Map。
ConcurrentMap 除了从其父接口 java.util.Map 继承来的方法之外还有一些额外的原子性方法。

ConcurrentMap 的实现

既然 ConcurrentMap 是个接口,你想要使用它的话就得使用它的实现类之一。java.util.concurrent 包具备 ConcurrentMap 接口的以下实现类:
  • ConcurrentHashMap

ConcurrentHashMap

ConcurrentHashMap 和 java.util.HashTable 类很相似,但 ConcurrentHashMap 能够提供比 HashTable 更好的并发性能。在你从中读取对象的时候 ConcurrentHashMap 并不会把整个 Map 锁住。此外,在你向其中写入对象的时候,ConcurrentHashMap 也不会锁住整个 Map。它的内部只是把 Map 中正在被写入的部分进行锁定。
另外一个不同点是,在被遍历的时候,即使是 ConcurrentHashMap 被改动,它也不会抛 ConcurrentModificationException。尽管 Iterator 的设计不是为多个线程的同时使用。
更多关于 ConcurrentMap 和 ConcurrentHashMap 的细节请参考官方文档。


并发导航映射 ConcurrentNavigableMap

java.util.concurrent.ConcurrentNavigableMap 是一个支持并发访问的 java.util.NavigableMap,它还能让它的子 map 具备并发访问的能力。所谓的 "子 map" 指的是诸如 headMap(),subMap(),tailMap() 之类的方法返回的 map。

NavigableMap 中的方法不再赘述,本小节我们来看一下 ConcurrentNavigableMap 添加的方法。



2、同步设备

这些类大部分都是帮助做线程之间同步的,简单描述,就像是提供了一个篱笆,线程执行到这个篱笆的时候都得等一等,等到条件满足以后再往后走。

  • CountDownLatch.class,一个线程调用await方法以后,会阻塞地等待计数器被调用countDown直到变成0,功能上和下面的CyclicBarrier有点像
  • CyclicBarrier.class,也是计数等待,只不过它是利用await方法本身来实现计数器“+1”的操作,一旦计数器上显示的数字达到Barrier可以打破的界限,就会抛出BrokenBarrierException,线程就可以继续往下执行;请参见我写过的这篇文章《同步、异步转化和任务执行》中的Barrier模式
  • Semaphore.class,功能上很简单,acquire()和release()两个方法,一个尝试获取许可,一个释放许可,Semaphore构造方法提供了传入一个表示该信号量所具备的许可数量。
  • Exchanger.class,这个类的实例就像是两列飞驰的火车(线程)之间开了一个神奇的小窗口,通过小窗口(exchange方法)可以让两列火车安全地交换数据。
  • Phaser.class,功能上和第1、2个差不多,但是可以重用,且更加灵活,稍微有点复杂(CountDownLatch是不断-1,CyclicBarrier是不断+1,而Phaser定义了两个概念,phase和party),我在下面画了张图,希望能够帮助理解:
    • 一个是phase,表示当前在哪一个阶段,每碰到一次barrier就会触发advance操作(触发前调用onAdvance方法),一旦越过这道barrier就会触发phase+1,这很容易理解;
    • 另一个是party,很多文章说它就是线程数,但是其实这并不准确,它更像一个用于判断advance是否被允许发生的计数器:
      • 任何时候都有一个party的总数,即注册(registered)的party数,它可以在Phaser构造器里指定,也可以任意时刻调用方法动态增减;
      • 每一个party都有unarrived和arrived两种状态,可以通过调用arriveXXX方法使得它从unarrived变成arrived;
      • 每一个线程到达barrier后会等待(调用arriveAndAwaitAdvance方法),一旦所有party都到达(即arrived的party数量等于registered的数量),就会触发advance操作,同时barrier被打破,线程继续向下执行,party重新变为unarrived状态,重新等待所有party的到达;
      • 在绝大多数情况下一个线程就只负责操控一个party的到达,因此很多文章说party指的就是线程,但是这是不准确的,因为一个线程完全可以操控多个party,只要它执行多次的arrive方法。



2.1、闭锁 CountDownLatch

java.util.concurrent.CountDownLatch 是一个并发构造,它允许一个或多个线程等待一系列指定操作的完成。
CountDownLatch 以一个给定的数量初始化。countDown() 每被调用一次,这一数量就减一。通过调用 await() 方法之一,线程可以阻塞等待这一数量到达零。



2.2、栅栏 CyclicBarrier
java.util.concurrent.CyclicBarrier 类是一种同步机制,它能够对处理一些算法的线程实现同步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。



2.3、交换机 Exchanger

java.util.concurrent.Exchanger 类表示一种两个线程可以进行互相交换对象的会和点。这种机制图示如下:

Java 并发工具包-java.util.concurrent-源码jdk1.7全面解析
两个线程通过一个 Exchanger 交换对象。
交换对象的动作由 Exchanger 的两个 exchange() 方法的其中一个完成。



2.4、信号量 Semaphore
java.util.concurrent.Semaphore 类是一个计数信号量。这就意味着它具备两个主要方法:
  • acquire()
  • release()
计数信号量由一个指定数量的 "许可" 初始化。每调用一次 acquire(),一个许可会被调用线程取走。每调用一次 release(),一个许可会被返还给信号量。因此,在没有任何 release() 调用时,最多有 N 个线程能够通过 acquire() 方法,N 是该信号量初始化时的许可的指定数量。这些许可只是一个简单的计数器。这里没啥奇特的地方。

Semaphore 用法

信号量主要有两种用途:
  1. 保护一个重要(代码)部分防止一次超过 N 个线程进入。
  2. 在两个线程之间发送信号。

保护重要部分

如果你将信号量用于保护一个重要部分,试图进入这一部分的代码通常会首先尝试获得一个许可,然后才能进入重要部分(代码块),执行完之后,再把许可释放掉。

在线程之间发送信号

如果你将一个信号量用于在两个线程之间传送信号,通常你应该用一个线程调用 acquire() 方法,而另一个线程调用 release() 方法。
如果没有可用的许可,acquire() 调用将会阻塞,直到一个许可被另一个线程释放出来。同理,如果无法往信号量释放更多许可时,一个 release() 调用也会阻塞。
通过这个可以对多个线程进行协调。比如,如果线程 1 将一个对象插入到了一个共享列表(list)之后之后调用了 acquire(),而线程 2 则在从该列表中获取一个对象之前调用了 release(),这时你其实已经创建了一个阻塞队列。信号量中可用的许可的数量也就等同于该阻塞队列能够持有的元素个数。

公平

没有办法保证线程能够公平地可从信号量中获得许可。也就是说,无法担保掉第一个调用 acquire() 的线程会是第一个获得一个许可的线程。如果第一个线程在等待一个许可时发生阻塞,而第二个线程前来索要一个许可的时候刚好有一个许可被释放出来,那么它就可能会在第一个线程之前获得许可。
如果你想要强制公平,Semaphore 类有一个具有一个布尔类型的参数的构造子,通过这个参数以告知 Semaphore 是否要强制公平。强制公平会影响到并发性能,所以除非你确实需要它否则不要启用它。

更多方法

java.util.concurrent.Semaphore 类还有很多方法,比如:
  • availablePermits()
  • acquireUninterruptibly()
  • drainPermits()
  • hasQueuedThreads()
  • getQueuedThreads()
  • tryAcquire()
  • 等等

这些方法的细节请参考 Java 文档。


3、执行器和线程池


3.1、ExecutorService

  • Future.class,异步计算的结果对象,get方法会阻塞线程直至真正的结果返回
  • Callable.class,用于异步执行的可执行对象,call方法有返回值,它和Runnable接口很像,都提供了在其他线程中执行的方法,二者的区别在于:
    • Runnable没有返回值,Callable有
    • Callable的call方法声明了异常抛出,而Runnable没有
  • RunnableFuture.class,实现自Runnable和Future的子接口,成功执行run方法可以完成它自身这个Future并允许访问其结果,它把任务执行和结果对象放到一起了
  • FutureTask.class,RunnableFuture的实现类,可取消的异步计算任务,仅在计算完成时才能获取结果,一旦计算完成,就不能再重新开始或取消计算;它的取消任务方法cancel(boolean mayInterruptIfRunning)接收一个boolean参数表示在取消的过程中是否需要设置中断
  • Executor.class,执行提交任务的对象,只有一个execute方法
  • Executors.class,辅助类和工厂类,帮助生成下面这些ExecutorService
  • ExecutorService.class,Executor的子接口,管理执行异步任务的执行器,AbstractExecutorService提供了默认实现
  • AbstractExecutorService.class,ExecutorService的实现类,提供执行方法的默认实现,包括:
    • ① submit的几个重载方法,返回Future对象,接收Runnable或者Callable参数
    • ② invokeXXX方法,这类方法返回的时候,任务都已结束,即要么全部的入参task都执行完了,要么cancel了
  • ThreadPoolExecutor.class,线程池,AbstractExecutorService的子类,除了从AbstractExecutorService继承下来的①、②两类提交任务执行的方法以外,还有:
    • ③ 实现自Executor接口的execute方法,接收一个Runnable参数,没有返回值
  • RejectedExecutionHandler.class,当任务无法被执行的时候,定义处理逻辑的地方,前面已经提到过了
  • ThreadFactory.class,线程工厂,用于创建线程
3.2、ScheduledExecutor
  • Delayed.class,延迟执行的接口,只有long getDelay(TimeUnit unit)这样一个接口方法
  • ScheduledFuture.class,Delayed和Future的共同子接口
  • RunnableScheduledFuture.class,ScheduledFuture和RunnableFuture的共同子接口,增加了一个方法boolean isPeriodic(),返回它是否是一个周期性任务,一个周期性任务的特点在于它可以反复执行
  • ScheduledExecutorService.class,ExecutorService的子接口,它允许任务延迟执行,相应地,它返回ScheduledFuture
  • ScheduledThreadPoolExecutor.class,可以延迟执行任务的线程池

3.3、CompletionService

  • CompletionService.class,它是对ExecutorService的改进,因为ExecutorService只是负责处理任务并把每个任务的结果对象(Future)给你,却并没有说要帮你“管理”这些结果对象,这就意味着你得自己建立一个对象容器存放这些结果对象,很麻烦;CompletionService像是集成了一个Queue的功能,你可以调用Queue一样的方法——poll来获取结果对象,还有一个方法是take,它和poll差不多,区别在于take方法在没有结果对象的时候会返回空,而poll方法会block住线程直到有结果对象返回
  • ExecutorCompletionService.class,是CompletionService的实现类
4、Fork-join框架


这是一个JDK7引入的并行框架,它把流程划分成fork(分解)+join(合并)两个步骤(怎么那么像MapReduce?),传统线程池来实现一个并行任务的时候,经常需要花费大量的时间去等待其他线程执行任务的完成,但是fork-join框架使用work stealing技术缓解了这个问题:

  1. 每个工作线程都有一个双端队列,当分给每个任务一个线程去执行的时候,这个任务会放到这个队列的头部;
  2. 当这个任务执行完毕,需要和另外一个任务的结果执行合并操作,可是那个任务却没有执行的时候,不会干等,而是把另一个任务放到队列的头部去,让它尽快执行;
  3. 当工作线程的队列为空,它会尝试从其他线程的队列尾部偷一个任务过来;
  4. 取得的任务可以被进一步分解。
  • ForkJoinPool.class,ForkJoin框架的任务池,ExecutorService的实现类
  • ForkJoinTask.class,Future的子类,框架任务的抽象
  • ForkJoinWorkerThread.class,工作线程
  • RecursiveTask.class,ForkJoinTask的实现类,compute方法有返回值,下文中有例子
  • RecursiveAction.class,ForkJoinTask的实现类,compute方法无返回值,只需要覆写compute方法,对于可继续分解的子任务,调用coInvoke方法完成(参数是RecursiveAction子类对象的可变数组)
4.1概述、

使用 ForkJoinPool 进行分叉和合并

ForkJoinPool 在 Java 7 中被引入。它和  ExecutorService  很相似,除了一点不同。ForkJoinPool 让我们可以很方便地把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。可能听起来有些抽象,因此本节中我们将会解释 ForkJoinPool 是如何工作的,还有任务分割是如何进行的。

分叉和合并解释

在我们开始看 ForkJoinPool 之前我们先来简要解释一下分叉和合并的原理。
分叉和合并原理包含两个递归进行的步骤。两个步骤分别是分叉步骤和合并步骤。

分叉

一个使用了分叉和合并原理的任务可以将自己分叉(分割)为更小的子任务,这些子任务可以被并发执行。如下图所示:
Java 并发工具包-java.util.concurrent-源码jdk1.7全面解析
通过把自己分割成多个子任务,每个子任务可以由不同的 CPU 并行执行,或者被同一个 CPU 上的不同线程执行。
只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一定开销,因此对于小型任务,这个分割的消耗可能比每个子任务并发执行的消耗还要大。
什么时候把一个任务分割成子任务是有意义的,这个界限也称作一个阀值。这要看每个任务对有意义阀值的决定。很大程度上取决于它要做的工作的种类。

合并

当一个任务将自己分割成若干子任务之后,该任务将进入等待所有子任务的结束之中。
一旦子任务执行结束,该任务可以把所有结果合并到同一个结果。图示如下:
Java 并发工具包-java.util.concurrent-源码jdk1.7全面解析
当然,并非所有类型的任务都会返回一个结果。如果这个任务并不返回一个结果,它只需等待所有子任务执行完毕。也就不需要结果的合并啦。



4.2、ForkJoinPool
ForkJoinPool 是一个特殊的线程池,它的设计是为了更好的配合 分叉-和-合并 任务分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整类名为 java.util.concurrent.ForkJoinPool。


创建一个 ForkJoinPool

你可以通过其构造子创建一个 ForkJoinPool。作为传递给 ForkJoinPool 构造子的一个参数,你可以定义你期望的并行级别。并行级别表示你想要传递给 ForkJoinPool 的任务所需的线程或 CPU 数量。以下是一个 ForkJoinPool 示例:
[java] view plain copy print?
  1. ForkJoinPool forkJoinPool = new ForkJoinPool(4);  

这个示例创建了一个并行级别为 4 的 ForkJoinPool。

提交任务到 ForkJoinPool

就像提交任务到 ExecutorService 那样,把任务提交到 ForkJoinPool。你可以提交两种类型的任务。一种是没有任何返回值的(一个 "行动"),另一种是有返回值的(一个"任务")。这两种类型分别由 RecursiveAction 和 RecursiveTask 表示。接下来介绍如何使用这两种类型的任务,以及如何对它们进行提交。


4.3、RecursiveAction

RecursiveAction 是一种没有任何返回值的任务。它只是做一些工作,比如写数据到磁盘,然后就退出了。
一个 RecursiveAction 可以把自己的工作分割成更小的几块,这样它们可以由独立的线程或者 CPU 执行。
你可以通过继承来实现一个 RecursiveAction。



4.4、RecursiveTask

RecursiveTask 是一种会返回结果的任务。它可以将自己的工作分割为若干更小任务,并将这些子任务的执行结果合并到一个集体结果。可以有几个水平的分割和合并。



5、锁 Lock

  • AbstractOwnableSynchronizer.class,这三个AbstractXXXSynchronizer都是为了创建锁和相关的同步器而提供的基础,锁,还有前面提到的同步设备都借用了它们的实现逻辑
  • AbstractQueuedLongSynchronizer.class,AbstractOwnableSynchronizer的子类,所有的同步状态都是用long变量来维护的,而不是int,在需要64位的属性来表示状态的时候会很有用
  • AbstractQueuedSynchronizer.class,为实现依赖于先进先出队列的阻塞锁和相关同步器(信号量、事件等等)提供的一个框架,它依靠int值来表示状态
  • Lock.class,Lock比synchronized关键字更灵活,而且在吞吐量大的时候效率更高,根据JSR-133的定义,它happens-before的语义和synchronized关键字效果是一模一样的,它唯一的缺点似乎是缺乏了从lock到finally块中unlock这样容易遗漏的固定使用搭配的约束,除了lock和unlock方法以外,还有这样两个值得注意的方法:
    • lockInterruptibly:如果当前线程没有被中断,就获取锁;否则抛出InterruptedException,并且清除中断
    • tryLock,只在锁空闲的时候才获取这个锁,否则返回false,所以它不会block代码的执行
  • ReadWriteLock.class,读写锁,读写分开,读锁是共享锁,写锁是独占锁;对于读-写都要保证严格的实时性和同步性的情况,并且读频率远远大过写,使用读写锁会比普通互斥锁有更好的性能。
  • ReentrantLock.class,可重入锁(lock行为可以嵌套,但是需要和unlock行为一一对应),有几点需要注意:
    • 构造器支持传入一个表示是否是公平锁的boolean参数,公平锁保证一个阻塞的线程最终能够获得锁,因为是有序的,所以总是可以按照请求的顺序获得锁;不公平锁意味着后请求锁的线程可能在其前面排列的休眠线程恢复前拿到锁,这样就有可能提高并发的性能
    • 还提供了一些监视锁状态的方法,比如isFair、isLocked、hasWaiters、getQueueLength等等
  • ReentrantReadWriteLock.class,可重入读写锁
  • Condition.class,使用锁的newCondition方法可以返回一个该锁的Condition对象,如果说锁对象是取代和增强了synchronized关键字的功能的话,那么Condition则是对象wait/notify/notifyAll方法的替代。在下面这个例子中,lock生成了两个condition,一个表示不满,一个表示不空;在put方法调用的时候,需要检查数组是不是已经满了,满了的话就得等待,直到“不满”这个condition被唤醒(notFull.await());在take方法调用的时候,需要检查数组是不是已经空了,如果空了就得等待,直到“不空”这个condition被唤醒(notEmpty.await())
java.util.concurrent.locks.Lock 是一个类似于 synchronized 块的线程同步机制。但是 Lock 比 synchronized 块更加灵活、精细。
顺便说一下,在《Java 并发指南》中我对如何实现你自己的锁进行了描述。


5.1、Java Lock 

既然 Lock 是一个接口,在你的程序里需要使用它的实现类之一来使用它。

首先创建了一个 Lock 对象。之后调用了它的 lock() 方法。这时候这个 lock 实例就被锁住啦。任何其他再过来调用 lock() 方法的线程将会被阻塞住,直到锁定 lock 实例的线程调用了 unlock() 方法。最后 unlock() 被调用了,lock 对象解锁了,其他线程可以对它进行锁定了。

Java Lock 实现

java.util.concurrent.locks 包提供了以下对 Lock 接口的实现类:
  • ReentrantLock

Lock 和 synchronized 代码块的主要不同点

一个 Lock 对象和一个 synchronized 代码块之间的主要不同点是:
  • synchronized 代码块不能够保证进入访问等待的线程的先后顺序。
  • 你不能够传递任何参数给一个 synchronized 代码块的入口。因此,对于 synchronized 代码块的访问等待设置超时时间是不可能的事情。
  • synchronized 块必须被完整地包含在单个方法里。而一个 Lock 对象可以把它的 lock() 和 unlock() 方法的调用放在不同的方法里。

Lock 的方法

Lock 接口具有以下主要方法:
  • lock()
  • lockInterruptibly()
  • tryLock()
  • tryLock(long timeout, TimeUnit timeUnit)
  • unlock()
lock() 将 Lock 实例锁定。如果该 Lock 实例已被锁定,调用 lock() 方法的线程将会阻塞,直到 Lock 实例解锁。
lockInterruptibly() 方法将会被调用线程锁定,除非该线程被打断。此外,如果一个线程在通过这个方法来锁定 Lock 对象时进入阻塞等待,而它被打断了的话,该线程将会退出这个方法调用。
tryLock() 方法试图立即锁定 Lock 实例。如果锁定成功,它将返回 true,如果 Lock 实例已被锁定该方法返回 false。这一方法永不阻塞。
tryLock(long timeout, TimeUnit timeUnit) 的工作类似于 tryLock() 方法,除了它在放弃锁定 Lock 之前等待一个给定的超时时间之外。

unlock() 方法对 Lock 实例解锁。一个 Lock 实现将只允许锁定了该对象的线程来调用此方法。其他(没有锁定该 Lock 对象的线程)线程对 unlock() 方法的调用将会抛一个未检查异常(RuntimeException)。




5.2、读写锁 ReadWriteLock

java.util.concurrent.locks.ReadWriteLock 读写锁是一种先进的线程锁机制。它能够允许多个线程在同一时间对某特定资源进行读取,但同一时间内只能有一个线程对其进行写入。
读写锁的理念在于多个线程能够对一个共享资源进行读取,而不会导致并发问题。并发问题的发生场景在于对一个共享资源的读和写操作的同时进行,或者多个写操作并发进行。
本节只讨论 Java 内置 ReadWriteLock。如果你想了解 ReadWriteLock 背后的实现原理,请参考我的《Java 并发指南》主题中的《 读写锁 》小节。

ReadWriteLock 锁规则

一个线程在对受保护资源在读或者写之前对 ReadWriteLock 锁定的规则如下:
  • 读锁:如果没有任何写操作线程锁定 ReadWriteLock,并且没有任何写操作线程要求一个写锁(但还没有获得该锁)。因此,可以有多个读操作线程对该锁进行锁定。
  • 写锁:如果没有任何读操作或者写操作。因此,在写操作的时候,只能有一个线程对该锁进行锁定。

ReadWriteLock 实现

ReadWriteLock 是个接口,如果你想用它的话就得去使用它的实现类之一。java.util.concurrent.locks 包提供了 ReadWriteLock 接口的以下实现类:
  • ReentrantReadWriteLock
6、原子对象

这些对象都的行为在不使用同步的情况下保证了原子性。值得一提的有两点:

  1. weakCompareAndSet方法:compareAndSet方法很明确,但是这个是啥?根据JSR规范,调用weakCompareAndSet时并不能保证happen-before的一致性,因此允许存在重排序指令等等虚拟机优化导致这个操作失败(较弱的原子更新操作),但是从Java源代码看,它的实现其实和compareAndSet是一模一样的;
  2. lazySet方法:延时设置变量值,这个等价于set方法,但是由于字段是volatile类型的,因此次字段的修改会比普通字段(非volatile字段)有稍微的性能损耗,所以如果不需要立即读取设置的新值,那么此方法就很有用。
  • AtomicBoolean.class
  • AtomicInteger.class
  • AtomicIntegerArray.class
  • AtomicIntegerFieldUpdater.class
  • AtomicLong.class
  • AtomicLongArray.class
  • AtomicLongFieldUpdater.class
  • AtomicMarkableReference.class,它是用来高效表述Object-boolean这样的对象标志位数据结构的,一个对象引用+一个bit标志位
  • AtomicReference.class
  • AtomicReferenceArray.class
  • AtomicReferenceFieldUpdater.class
  • AtomicStampedReference.class,它和前面的AtomicMarkableReference类似,但是它是用来高效表述Object-int这样的“对象+版本号”数据结构,特别用于解决ABA问题
6.1、原子性布尔 AtomicBoolean
AtomicBoolean 类为我们提供了一个可以用原子方式进行读和写的布尔值,它还拥有一些先进的原子性操作,比如 compareAndSet()。AtomicBoolean 类位于 java.util.concurrent.atomic 包,完整类名是为 java.util.concurrent.atomic.AtomicBoolean。本小节描述的 AtomicBoolean 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。


6.2、原子性整型 AtomicInteger   
AtomicInteger 类为我们提供了一个可以进行原子性读和写操作的 int 变量,它还包含一系列先进的原子性操作,比如 compareAndSet()。AtomicInteger 类位于 java.util.concurrent.atomic 包,因此其完整类名为 java.util.concurrent.atomic.AtomicInteger。本小节描述的 AtomicInteger 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。


6.3、原子性长整型 AtomicLong


AtomicLong 类为我们提供了一个可以进行原子性读和写操作的 long 变量,它还包含一系列先进的原子性操作,比如 compareAndSet()AtomicLong 类位于 java.util.concurrent.atomic 包,因此其完整类名为 java.util.concurrent.atomic.AtomicLong。本小节描述的 AtomicLong 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。




6.4、原子性引用型 AtomicReference


AtomicReference 提供了一个可以被原子性读和写的对象引用变量。原子性的意思是多个想要改变同一个 AtomicReference 的线程不会导致 AtomicReference 处于不一致的状态。AtomicReference 还有一个 compareAndSet() 方法,通过它你可以将当前引用于一个期望值(引用)进行比较,如果相等,在该 AtomicReference 对象内部设置一个新的引用。




7、其它:


  • ThreadLocalRandom.class,随机数生成器,它和Random类差不多,但是它的性能要高得多,因为它的种子内部生成后,就不再修改,而且随机对象不共享,就会减少很多消耗和争用,由于种子内部生成,因此生成随机数的方法略有不同:ThreadLocalRandom.current().nextX(…)
  • TimeUnit.class
   计时

   TimeUnit类为指定和控制基于超时的操作提供了多重粒度(包括纳秒级)。该包中的大多数类除了包含不确定的等待之外,还包含基于超时的操作。在使用超时的所有情况中,超时指定了在表明已超时前该方法应该等待的最新少时间。在超时发生后,实现会尽力”的检测超时。但是,在检测超时与超时之后再次实际执行线程之间可能要经过不确定的时间。




请勿随便转载!谢谢!