同步容包括Vector和Hashtable,这些同步的封装器类是由Collections.synchronizedXxx等工厂方法创建的。
这些类实现线程安全的试是: 将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。
同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。
容器上常见的复合操作:迭代(反复访问元素,直到遍历完容器中所有元素)、跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算(例如“若没有则添加”)。
有时候开发人员并不希望在迭代期间对容器加锁。例如,某些线程在可以访问容器之前,必须等待迭代过程结束,如果容器的规模很大,或者在每个元素上执行操作的时间很长,那么这些线程将长时间等待。
持有锁的时间越长,那么在锁上的竞争就可能越激烈,如果许多线程都在等待锁被释放,那么将极大地降低吞量和CPU的利用率。
编译器将字符中的连接操作转换为调用StringBuilder.apppend(Object),而这个方法又会调用容器的toString方法,标准容器的toString方法将迭代容器,并在每个元素上调用toString来生成容器内容的格式化表示。
如果状态与保护它的同步代码之间相隔越远,那么开发人员就越容易忘记在访问状态时使用正确的同步。
容器的hashCode和equals等方法也会间接地执行迭代操作,当容器作为另一个容器的元素或键值时,就会出现这种情况。同样,containsAll、removeAll和retainAl等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。所有这些间接的迭代操作都可能抛出ConcurrentModificationException。
通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。
虽然可以用List来模拟Queue的行为——事实上,下是通过LinkedList来实现Queue的,但还需要一个Queue的类,因为它能去年List的随机访问需求,从而实现更高效的并发。
与HashMap一样,ConcurrentHashMap也是一个基于散列的Map, 但它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器, 而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁(Lock Striping)。在这种机制中,任意数量的读取线程可以并发地访问Map,执行读取操作的线程和执行写入操作的线程可以并发地访问Map,并且一定数量的写入线程可以并发地修改Map。ConcurrentHashMap带来的结果是,在并发访问环境下将实现更高的吞吐量,而在单线程环境中只损失非常小的性能。
ConcurrentHashMap与其他并发容器一起增强了同步容器类:它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。ConcurrentHashMap返回的迭代器具有弱一致性(Weakly Consistent),而并非“及时失败”。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的夫互,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。
尽管有这些改进,但仍然有一些需要权衡的因素。对于一些需要在整个Map上进行计算的方法,例如size和isEmpty,这些方法的语义被略微减弱了以反映容器的并发我。由于size返回的结果在计算时可能已经过期了,它实际上只是一个估计值,因此允许size返回一个近似值而不是一个精确值。虽然这看上去有些令人不安,但事实上size和isEmpty这样的方法在并发环境下的用处很小,因为它们的返回值总在不断变化。因此,这些操作的需求被弱化了,以换取对其他更重要操作的性能优化,包括get、put、containsKey和remove等。
阻塞队列提供了可队守的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。队列可以是有界的也可以是*的,*队列永远都不会充满,因此*队列上的put方法也永远不会阻塞。
线程可能会阻塞或暂停执行,原因有很多种:等待I/O操作结束,等待获得一个锁,等待从Thread.sleep方法中醒来,或是等待另一个线程的计算结果。当线程阻塞时,它通常被挂起,并处于某种阻塞状态(BLOCKED、WAITING或TIMED_WAITING)。
中断是一种协作机制。一个线程不能强制其他线程停止正在执行的操作而去执行其他操作。
阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。
FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。
“运行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。
由于在构造函数或静态初始化方法中启动线程并不是一种好方法,因此提供了一个start方法来启动线程。
栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。
在不涉及I/O操作或共享数据访问的计算问题中,当线程数量为Ncpu或Ncpu+1时将获得最优的吞吐量,更多的线程并不会带来任何帮助,甚至在某种程度上会降低性能,因为多个线程将会在CPU和内存等资源上发生竞争。
ConcurrentHashMap是线程安全的,因此在访问底层Map时就不需要乾同步。
interface Computable<A, V> {
V compute(A arg) throws InterruptedException;
}
public class Memoizer<A, V> implements Computable<A, V> {
private final ConcurrentMap<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
当缓存的是Future而不是值时,将导致缓存污染(Cache Pollution)问题:如果某个计算被取消或者失败,那么在计算这个结果时将指明计算过程被取消或者失败。为了避免这种情况,如果Memoizer发现计算被取消,那么将把Future从缓存中移除。如果检测到RuntimeException,那么也会移除Future,这样将来的计算才可能成功。Memoizer同样没有解决缓存逾期的问题,但它可以通过使用FutureTask的子类来解决,在子类中为每个结果指定一个逾期时间,并定期扫描缓存中逾期的元素。(同样,它也没有解决缓存清理的问题,即移除旧的计算结果以便为新的计算结果腾出空间,从而使缓存不会消耗过多的内存。)