Java:concurrent包下面的Collection接口框架图( CopyOnWriteArraySet, CopyOnWriteArrayList,ConcurrentLinkedQueue,BlockingQueue)

时间:2022-09-29 16:23:51

Java集合大致可分为Set、List和Map三种体系,其中Set代表无序、不可重复的集合;List代表有序、重复的集合;而Map则代表具有映射关系的集合。Java 5之后,增加了Queue体系集合,代表一种队列集合实现。

JDK1.5版本中,加入java.uill.concurrent包,其中包含集合的线程安全方式的实现。本文仅探讨concurrent包下面的Collection接口实现。

目录

1. concurrent包下面Collection子接口、类框架图
2. CopyOnWriteArraySet类
3. CopyOnWriteArrayList类
4. ConcurrentLinkedQueue类
5. BlockingQueue接口
    5.1 ArrayBlockingQueue
    5.2 LinkedBlockingQueue
    5.3 DelayQueue
    5.4 PriorityBlockingQueue
    5.5 SynchronousQueue

1. concurrent包下面Collection子接口、类框架图

Java:concurrent包下面的Collection接口框架图( CopyOnWriteArraySet, CopyOnWriteArrayList,ConcurrentLinkedQueue,BlockingQueue)

2. CopyOnWriteArraySet类

特点:

  • 它最适合于 set 大小通常保持很小、只读操作远多于可变操作以及需要在遍历期间防止线程间冲突的应用程序。
  • 它是线程安全的。
  • 因为通常需要复制整个基础数组,所以可变操作(添加、设置、移除,等等)的开销巨大。
  • 迭代器不支持可变移除操作。
  • 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。

以下代码使用了一个写时复制(copy-on-write)的 set,以维护在状态更新时执行某项操作的一组 Handler 对象。

class Handler { void handle(); ... }

class X {
private final CopyOnWriteArraySet<Handler> handlers = new CopyOnWriteArraySet<Handler>();
public void addHandler(Handler h) { handlers.add(h); } private long internalState;
private synchronized void changeState() { internalState = ...; } public void update() {
changeState();
for (Handler handler : handlers)
handler.handle();
}
}

3. CopyOnWriteArrayList

CopyOnWriteArrayList是concurrent包中提供的一个非阻塞型的,线程安全的List实现。其中所有可变操作(添加、设置,等等)都是通过对基础数组进行一次新的复制来实现的。

CopyOnWriteArrayList在进行数据修改时,都不会对数据进行锁定,每次修改时,先拷贝整个数组,然后修改其中的一些元素,完成上述操作后,替换整个数组的指针。

对CopyOnWriteArrayList进行读取时,也不进行数据锁定,直接返回需要查询的数据,如果需要返回整个数组,那么会将整个数组拷贝一份,再返回,保证内部array在任何情况下都是只读的。

CopyOnWriteArrayLIst在少量修改,频繁读取的场景下,有很好的并发性能。

4. ConcurrentLinkedQueue类

一个基于链接节点的、*的、线程安全的队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部是队列中时间最长的元素。队列的尾部是队列中时间最短的元素。新的元素插入到队列的尾部,队列检索操作从队列头部获得元素。当许多线程共享访问一个公共collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许 null 元素。

此实现采用了有效的“无等待 (wait-free)”算法。与大多数 collection 不同,size 方法不是 一个固定时间的操作。由于这些队列的异步特性,确定当前元素的数量需要遍历这些元素。

5. BlockingQueue接口

BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。

BlockingQueue特点:

如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒。同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作。

  • BlockingQueue 不接受 null 元素。
  • BlockingQueue 可以是限定容量的。
  • BlockingQueue 实现主要用于生产者-使用者队列。
  • BlockingQueue 实现是线程安全的。

BlockingQueue常用方法:

  • boolean add(E o): 将指定的元素添加到此队列中(如果立即可行),在成功时返回 true,其他情况则抛出 IllegalStateException。
  • boolean offer(E o): 如果可能的话,将指定元素插入此队列中。成功返回true,否则返回false。
  • void put(E o): 将指定元素添加到此队列中,如果没有可用空间,将一直等待(如果有必要)
  • E poll(long timeout, TimeUnit unit): 检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要)。
  • E take(): 检索并移除此队列的头部,如果此队列不存在任何元素,则一直等待。

以下是基于典型的生产者-使用者场景的一个用例:

class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
} class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
} class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}

5.1 ArrayBlockingQueue类

一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。

这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。

此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。

5.2 LinkedBlockingQueue类

一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列检索操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。

5.3 DelayQueue类

Delayed 元素的一个*阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满。此队列不允许使用 null 元素。
DelayQueue使用场景较少,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

5.4 PriorityBlockingQueue类

一个*的阻塞队列,它使用与类 PriorityQueue 相同的顺序规则:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),并且提供了阻塞检索的操作。虽然此队列逻辑上是*的,但是由于资源被耗尽,所以试图执行添加操作可能会失败(导致 OutOfMemoryError)。此类不允许使用 null 元素。依赖自然顺序的优先级队列也不允许插入不可比较的对象(因为这样做会抛出 ClassCastException)。

5.5 SynchronousQueue类

一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要取得元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)添加元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队线程元素;如果没有已排队线程,则不添加元素并且头为 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空集合。此队列不允许 null 元素。

同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。

对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。公平通常会降低吞吐量,但是可以减小可变性并避免得不到服务。