Java并发编程实战 第5章 构建基础模块

时间:2023-12-17 12:29:44

同步容器类

Vector和HashTable和Collections.synchronizedXXX

都是使用监视器模式实现的。

暂且不考虑性能问题,使用同步容器类要注意:

  • 只能保证单个操作的同步。

这会引起两个问题:

第一个问题:

如果有一个功能,需要计算得到最后一个的值,有这个方法:

list.get(list.size -1)

这是一个复合操作,其实是两步的。存在并发问题。可能会导致ArrayIndexOutOfBoundsException。

如何解决:

synchronized(list)

{

list.get(list.size -1)

}

这样就占用了list的监视器锁,和list的其他方法一起达到了同步。

第二个问题:

对list进行迭代:

不论是使用for循环,还是使用iterator,都是多步操作,所以会有并发隐患。(作者说还有会导致迭代的hashcode equals containsAll等方法,也会存在这种问题。其实我查看了Vector的源代码,这些方法上是加了synchronize的)。

看看java文档的描述:由 Vector 的 iterator 和 listIterator 方法所返回的迭代器是快速失败的:如果在迭代器创建后的任意时间从结构上修改了向量(通过迭代器自身的 remove 或 add 方法之外的任何其他方式),则迭代器将抛出 ConcurrentModificationException。

解决方法,和上面一样。

并发容器

ConcurrentHashMap

CopyOnWriteArrayList

Queue接口 不可阻塞

ConcurrentLinkedQueue Queue实现非阻塞 先进先出 可并发

BlockingQueue接口 继承自Queue 可阻塞

ArrayBlockingQueue可阻塞 可并发 先进先出 有界

LinkedBlockingQueue可阻塞 可并发 先进先出 *

PriorityQueue Queue实现非阻塞 优先级 不可并发 有界

ConcurrentHashMap

ConcurrentHashMap使用分段锁(可以理解为不是在每个Map上的get put上加锁 而是在Map的entry的put get上加锁)的策略,并没有在每个方法上使用同一个锁。

ConcurrentHashMap返回的迭代器不会返回ConcurrentModificationException。具有弱一致性。弱一致性的迭代器可以容忍并发的修改。当创建迭代器时会遍历所有的已有元素(应该是拷贝一份),并可以(但是不保证)在迭代器被构造后将修改操作反应给容器。

有一些方法,如size和isEmpty,为了保证并发的特性,被减弱了。有可能返回的是一个过期的值。这些不常用的需求被弱化了,以换取一些更加常用的功能的并发性,如get,put,containsKey,remove等。

CopyOnWriteArrayList

ArrayList 的一个线程安全的变体,其中所有可变操作(添加、设置,等等)都是通过对基础数组进行一次新的复制来实现的。

这一般需要很大的开销,但是当遍历操作的数量大大超过可变操作的数量时,这种方法可能比其他替代方法更 有效。在不能或不想进行同步遍历,但又需要从并发线程中排除冲突时,它也很有用。"快照"风格的迭代器方法在创建迭代器时使用了对数组状态的引用。此数组在迭代器的生存期内绝不会更改,因此不可能发生冲突,并且迭代器保证不会抛出 ConcurrentModificationException。自创建迭代器以后,迭代器就不会反映列表的添加、移除或者更改。不支持迭代器上更改元素的操作(移除、设置和添加)。这些方法将抛出 UnsupportedOperationException。

它的迭代器不需要对数组进行加锁或复制。

串行线程封闭

阻塞队列实现的生产者消费者模式,可以达到串行线性封闭。生产者和消费者不可能同时访问流转的对象。如果生产者内部或者消费者内部不会并发的处理流转对象,那么对象在一个时间点只会有一个线程在访问和操作。

双端队列

Deque接口

BlockingDeque接口

ArrayDeque类

LinkedBlockingDeque类

同步工具类

闭锁:

CountDownLatch

注:传入1的时候可以作为开关。前提是在其他线程的第一步先执行开关的await。使用开关的countDown方法就可以打开开关。

使用Future接口(实现类为FutureTask)也可以实现闭锁。其实就是使用get()方法。这个方法wait的。

信号量:

一个计数信号量。从概念上讲,信号量维护了一个许可数量。通过初始化的时候,设置一个许可的数量。acquire()可以获得许可,在许可可用前会阻塞,每个 release() 释放一个许可,从而可能释放一个正在阻塞的获取者。

Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:

class Pool {

private static final MAX_AVAILABLE = 100;

private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException {

available.acquire();

return getNextAvailableItem();

}

public void putItem(Object x) {

if (markAsUnused(x))

available.release();

}

// Not a particularly efficient data structure; just for demo

protected Object[] items = ... whatever kinds of items being managed

protected boolean[] used = new boolean[MAX_AVAILABLE];

protected synchronized Object getNextAvailableItem() {

for (int i = 0; i < MAX_AVAILABLE; ++i) {

if (!used[i]) {

used[i] = true;

return items[i];

}

}

return null; // not reached

}

protected synchronized boolean markAsUnused(Object item) {

for (int i = 0; i < MAX_AVAILABLE; ++i) {

if (item == items[i]) {

if (used[i]) {

used[i] = false;

return true;

} else

return false;

}

}

return false;

}

}

将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。相当于一个不可重入的锁。

ps:我的理解:信号量+queue可以实现线程池。

CyclicBarrier

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

  1. class Solver {
  2.    final int N;
  3.    final float[][] data;
  4.    final CyclicBarrier barrier;
  5.    class Worker implements Runnable {
  6.       int myRow;
  7.       Worker(int row) {
  8.          myRow = row;
  9.       }
  10.       public void run() {
  11.          while (!done()) {
  12.             processRow(myRow);
  13.             try {
  14.                barrier.await();
  15.             } catch (InterruptedException ex) {
  16.                return;
  17.             } catch (BrokenBarrierException ex) {
  18.                return;
  19.             }
  20.          }
  21.       }
  22.    }
  23.    public Solver(float[][] matrix) {
  24.         data = matrix;
  25.         N = matrix.length;
  26.         barrier = new CyclicBarrier(N,
  27.                                     new Runnable() {
  28.                                       public void run() {
  29.                                         mergeRows(...);
  30.                                       }
  31.                                     });
  32.         for (int i = 0; i < N; ++i)
  33.           new Thread(new Worker(i)).start();
  34.         waitUntilDone();
  35.       }
  36. }

如果任何一个线程在await的时候被中断,或者调用await超时,那么所有的线程的await方法都将终止并且抛出BrokenBarrierException(栅栏已经破碎)。

构建高效且可伸缩的结果缓存

几乎所有的服务器应用都会使用某种形式的缓存。重用之前的计算结果能降低延迟,提高吞吐量,但却要消耗更多内存。看上去简单的缓存,可能会将性能瓶颈转变成伸缩性瓶颈,即使缓存是用来提高单线程性能的。本文将开发一个高效且可伸缩的缓存,用于改进一个高计算开销的计算,我们会从HashMap开始,逐步完善功能,分析它们的并发问题,并讨论如何修改它们。

下面基于一个计算任务开始缓存的设计

public interface Computable <A, R>{

R compute(A a) throws InterruptedException;

}

public class Function implements Computable <String, BigInteger>{

@Override

public BigInteger compute(String a) throws InterruptedException {

return new BigInteger(a);

}

}

第一阶段 HashMap

public class Memorizer1<A, V> implements Computable<A, V>{

private final Computable<A, V> compute;

private final Map<A, V> cache;

public Memorizer1(Computable<A, V> compute){

this.compute = compute;

cache = new HashMap<A, V>();

}

@Override

public synchronized V compute(A a) throws InterruptedException {

V result = cache.get(a);

if(result == null){

result = compute.compute(a);

cache.put(a, result);

}

return result;

}

}

如上所示,Memorizer1将Computable实现类的计算结果缓存在Map<A, V> cache。因为HashMap不是线程安全的,为了保证并发性,Memorizer1用了个很保守的方法,对整个compute方法进行同步。这导致了Memorizer1会有很明显的可伸缩性问题,当有很多线程调用compute方法,将排一列很长的队,考虑到这么多线程的阻塞,线程状态切换,内存占用,这种方式甚至不如不使用缓存。

第二阶段 ConcurrentHashMap

public class Memorizer2<A, V> implements Computable<A, V>{

private final Computable<A, V> compute;

private final Map<A, V> cache;

public Memorizer2(Computable<A, V> compute){

this.compute = compute;

cache = new ConcurrentHashMap<A, V>();

}

@Override

public V compute(A a) throws InterruptedException {

V result = cache.get(a);

if(result == null){

result = compute.compute(a);

cache.put(a, result);

}

return result;

}

}

Memorizer2比Memorizer1拥有更好的并发性,并且具有良好的伸缩性。但它仍然有一些不足——当两个线程同时计算同一个值,它们并不知道有其它线程在做同一的事,存在着资源被浪费的可能。这个不足,对于缓存的对象只提供单次初始化,会带来安全性问题。

第三阶段 ConcurrentHashMap+FutureTask 
事实上,第二阶段的功能已经符合大部分情况的功能,但是当计算时间很长导致很多线程进行同一个运算,或者缓存的对象只提供单次初始化,问题就会很棘手,在这里,我们引入FutureTask来让进行运算的线程获知是否已经有其它正在,或已经进行该运算的线程。

public class Memorizer3<A, V> implements Computable<A, V>{

private final Computable<A, V> compute;

private final Map<A, FutureTask<V>> cache;

public Memorizer3(Computable<A, V> compute){

this.compute = compute;

cache = new ConcurrentHashMap<A, FutureTask<V>>();

}

@Override

public V compute(A a) throws InterruptedException {

V f = cache.get(a);

if(f == null){

Callable<V> eval = new Callable<V>(){

public V call() throw InterruptedException{

return c.compute(arg);

}

}

FutureTask<V> ft = new FutureTask<V>(eval);

f = ft;

cache.put(a, ft);

ft.run();

}

try{

return f.get();

}cache(ExecutionException e){

throw launderThrowable(e.getCause());

}

}

}

Memorizer3缓存的不是计算的结果,而是进行运算的FutureTask。因此Memorizer3首先检查有没有执行该任务的FutureTask。如果有,则直接获得FutureTask,如果计算已经完成,FutureTask.get()方法可以立刻获得结果,如果计算未完成,后进入的线程阻塞直到get()返回结果;如果没有,则创建一个FutureTask进行运算,后续进了的同样的运算可以直接拿到结果或者等待运算完成获得结果。 
Memorizer3的实现近乎完美,但是仍然存在一个问题,当A线程判断没有缓存是,进入到cache.put(a, ft);这一步前,B线程恰好判断缓存为空,B线程创建的FutureTask会把A创建的FutureTask覆盖掉。虽然这相比Memorizer2已经是小概率事件,但是问题还是没根本解决。

第四阶段 ConcurrentHashMap + FutureTask + Map原子操作

第三阶段的ConcurrentHashMap + FutureTask由于存在"先检查再执行"的操作,会有并发问题,我们给cache使用复合操作("若没有则添加"),避免该问题。

public class Memorizer4<A, V> implements Computable<A, V>{

private final Computable<A, V> compute;

private final Map<A, FutureTask<V>> cache;

public Memorizer4(Computable<A, V> compute){

this.compute = compute;

cache = new ConcurrentHashMap<A, FutureTask<V>>();

}

@Override

public V compute(A a) throws InterruptedException {

while(true){

V f = cache.get(a);

if(f == null){

Callable<V> eval = new Callable<V>(){

public V call() throw InterruptedException{

return c.compute(arg);

}

}

FutureTask<V> ft = new FutureTask<V>(eval);

f = cache.putIfAbsent(a, ft);

if(f == null){

f = ft;

ft.run();

}

}

try{

return f.get();

}catch(CancellationException e){

cache.remove(arg, f);

}catch(ExecutionException e){

throw launderThrowable(e.getCause());

}

}

}

}

Memorizer4做了两点改进: 
1. 插入时会再次检查是否有缓存,并且这是个复合操作

f = cache.putIfAbsent(a, ft);

if(f == null){

f = ft;

ft.run();

这里考虑到了一种情况,如果正在运行的FutureTask被终止,那进行该运算的所有请求都会出问题,始料未及的遭遇CancellationException异常。Memorizer4的compute操作是一个循环,当在get()阻塞的线程catch到CancellationException异常,则会再一次申请一个创建FutureTask的机会。

至此,整个设计过程就结束了。我们得到了一个在极端环境下依然能够保证高效且可伸缩运行的结果缓存。