看了前两篇你肯定已经理解了 java 并发编程的低层构建。然而,在实际编程中,应该经可能的远离低层结构,毕竟太底层的东西用起来是比较容易出错的,特别是并发编程,既难以调试,也难以发现问题,我们还是使用由并发处理的专业人员实现的较高层次的结构要方便、安全得多。
阻塞队列
对于许多线程问题,都可以使用一个或多个队列来安全、优雅的进行数据的传递。比如经典的生产者--消费者问题,生产者不停的生成某些数据,消费者需要处理数据,在多线程环境中,如何安全的将数据从生产者线程传递到消费者线程?
无需使用锁和条件对象,java 自带的阻塞队列就能够完美的解决这个问题。阻塞队列中所有方法都是线程安全的,所以我们进行读取、写入操作时无需考虑并发问题。阻塞队列主要有以下几种方法:
方法 | 正常结果 | 异常结果 |
---|---|---|
add | 添加一个元素 | 队列满,抛出 IllegalStateException 异常 |
element | 返回队列头元素 | 队列空,抛出 NoSuckElementException 异常 |
offer | 添加一个元素,返回 true | 队列满,返回 false |
peek | 返回队列的头元素 | 队列空,返回 null |
poll | 移出并返回队列头元素 | 队列空,返回 null |
put | 添加一个元素 | 队列满,阻塞 |
remove | 移出并返回头元素 | 队列空,抛出 NoSuckElementException 异常 |
take | 移出并返回头元素 | 队列空,则阻塞 |
上面的方法主要分成了三类,第一类:异常情况下抛出异常;第二类:异常情况返回 false/null;第三类:异常情况下阻塞。可以根据自身情况选择合适的方法来操作队列。
阻塞队列的实现
在 java.util.concurrent 包中,提供了阻塞队列的几种实现,当前也可以自己实现 BlockingQueue 接口,实现自己的阻塞队列。
- LinkdedBlockingQueue:链式阻塞队列。一般情况下链式的结构容量都是没有上限的,但是也可以选择手动指定最大容量。
- LinkdedBlockingDeque:链式阻塞双端队列。
- PriorityBlockingQueue:优先级队列。按照优先级移出,无容量上限。
- ArrayBlockingQueue:数组队列,需指定容量。可选指定是否需要公平性,如果设置了公平性,等待了最长时间的线程会优先得到处理,但是会降低性能。
延迟队列
DelayQueue 也是阻塞队列的一种,不过它要求队列中的元素实现Delayed
接口。需要重新两个方法:
- long getDelay(TimeUnit unit)返回延迟的时间,负值表示延迟结束,只有延迟结束的情况下,元素才能从队列中移出。
- int compareTo(Delayed o)比较方法,DelayQueue 使用该方法对元素进行排序。
传递队列
在 Java SE 7 中新增了一个 TransferQueue 接口,允许生产者等待,直到消费者消费了某个元素。原本生产者消费者是没有关系的,生产者并不知道某个元素是否被消费者消费了。通过此接口可以让生产者知道某个元素确实被消费了。如果生产者调用:
q.transer(item)
方法,这个调用会阻塞,知道 item 被消费线程取出消费。LinkedTransferQueue 实现了此接口。
线程安全的集合
如果多个线程并发的操作集合,会很容易出现问题,我们可以选择锁来保护共享数据,但是更好的选择是使用线程安全的集合来作为替代。本节介绍 Java 类库中提供的线程安全的集合(上一节介绍的阻塞队列也在其中)。
这类集合,size 是通过便利得出的,较慢。而且如果 size 数量大于 20 亿,有可能超过 int 的范围,使用 size 方法无法获取到大小,在 java8 中引入了 mappingCount 方法,返回值类型为 long。
映射 map
映射是日常使用中非常常见的一种数据结构。共有以下几种线程安全的映射:
- ConcurrentSkipListMap:有序映射,根据键排序
- ConcurrentHashMap:无序映射
映射条目的原子更新
一旦涉及到多线程环境,做啥都比较麻烦,比如更新一个 map 中某个键值对的值,下面的操作显然是不正确的:
int old = map.get(key);
map.put(key,old+1);
假如有两个线程同时操作一个 key,虽然 put 方法是线程安全的,但是由于两个线程之前读取的 old 是一样的,这样就会导致某个线程的修改被覆盖掉。
有以下几种安全的更新方法:
- 使用 repalce(key,oldValue,newValue)方法,此方法会在 key,oldValue 完全匹配时将 oldValue 换为 newValue 返回 true,否则返回 false。
- 使用 AtomicLong 或者 LongAdder 作为映射的值,这两个的操作方法是原子性的,因此可以安全的修改值。 3.使用 compute 类似方法完成更新。比如下面的:
# 如果key不再map中,v的值为null
map.compute(key,(k,v)->v==null?1:v+1);
# 如果不存在key
map.computeIfAbsent(key,key->new LongAdder())
# 如果存在key
map.computeIfPresent(key,key->key+1)
# 和compute方法类似,不过不处理键
map.merge(key,value,(existingValue,newValue)->existingValue+newValue+1)
批操作
java8 引入的,即使有其他线程在处理映射,批操作也能安全的执行。批操作会遍历映射,处理便利过程中找到的元素,且无需冻结当前映射的快照。显然通过批操作获取的结果不是完全精确的,因为遍历过程中,元素可能会被改变。
有以下三种不同的操作:
- 搜索(search),遍历结果直到返回一个非 null 的结果
- 归约(reduce),组合所有键或值,需提供累加函数
- forEach,遍历所有的键值对
每个操作都有 4 个版本: - operationKeys:处理键
- operationValues:处理值
- operation:处理键值
- operationEntries:处理需要 map.Entry 对象
并发集合
线程安全的 set 集合只有以下一种:
- ConcurrentSkipListSet:有序 set
如果我们想要一个 hash 结构的,线程安全的 set,有以下几种办法.
- 通过 ConcurrentHashMap.<Key>newKeySet()生成一个 Set,比如:
Set<String> sets = ConcurrentHashMap.<String>newKeySet();
这其实只是 ConcurrentHashMap<Key,Boolean>的一个包装器,所有的值都为 true
- 通过现有映射对象的 keySet 方法,生成这个映射的键集。如果删除这个集的某个元素,映射上对于元素也会被删除。但是不能添加元素,因为没有相应的值。java8 新增了一个 keySet 方法,可以设置一个默认值,这样就能为向集合中增加元素。
数组
在 Concurrent 包中只有一个CopyOnWriteArrayList
数组。该数组所有的修改都会对底层数组进行复制,也就是每插入一个元素都会将原来的数组复制一份并加入新的元素。
当构建一个迭代器时,迭代器指向的是当前数组的引用,如果后来数组被修改了,迭代器指向的任然是旧的数组。
任何集合类都可以通过使用同步包装器变成线程安全的,如下:
//线程安全的列表
List<String> list1 = Collections.synchronizedList(new ArrayList<>());
//线程安全的map
Map<String,String> map1 = Collections.synchronizedMap(new HashMap<>());
//线程安全的set
Set<String> set1 = Collections.synchronizedSet(new HashSet<>());
并行数组算法
在 java 8 中,Arrays 类提供了大量的并行化操作。
- Arrays.parallelSort
对一个基本数据类型或对象的数组进行排序
- Arrays.paralletSetAll
用一个函数计算得到的值填充一个数组。这个函数接收元素索引,然后计算值。例如:
# 将所有值加上对于的序号
Arrays.parallelSetAll(arr,i->i+ arr[i]);
- parallelPrefix
用对应一个给定结合操作的前缀的累加结果替换各个数组元素。看文字描述不太容易看懂,这里用一个例子说明:
int[] arr = {1,2,3,4}
Arrays.parallelPrefix(arr,(x,y)->x*y);
// arr变成:[1,1*2,1*2*3,1*2*3*4]