并发处理相关知识

时间:2021-07-03 20:52:43
1,基本概念

Java并发的重要性毋庸置疑,Java并发的设计目的在于3个方面:

简单,意味着程序员尽可能少的操作底层或者实现起来要比较容易;

高效,意味着耗用资源要少,程序处理速度要快;

线程安全,意味着在多线程下能保证数据的正确性。

在Java并发中,有几个常见概念,需要在讲述并发之前进行解释:

 

临界资源和临界区

临界资源是一般是一种内存资源,一个时刻只允许一个进程(在java中,是线程)访问,一个线程正在使用临界资源的时候,另一个线程不能使用。临界资源是非可剥夺性资源,即使是操作系统(或JVM)也无法阻止这种资源的独享行为。

临界区是一种进程中范文临界资源的那段程序代码,注意,是程序代码,不是内存资源了,这就是临界资源与临界区的区别。我们规定临界区的使用原则(也即同步机制应遵循的准则)十六字诀:“空闲让进,忙则等待,有限等待,让权等待”–strling。让我们分别来解释一下:

(1)空闲让进:临界资源空闲时一定要让进程进入,不发生“互斥礼让”行为。

(2)忙则等待:临界资源正在使用时外面的进程等待。

(3)有限等待:进程等待进入临界区的时间是有限的,不会发生“饿死”的情况。

(4)让权等待:进程等待进入临界区是应该放弃CPU的使用。

并发

狭义的只就Java而言,Java多线程在访问同一资源时,出现竞争的问题,叫做并发问题,Java并发模型是围绕着在并发过程中如何处理原子性、可见性、有序性这3个特征来设计的。

 

线程安全

如果一个操作序列,不考虑耗时和资源消耗,在单线程执行和多线程执行的情况下,最终得到的结果永远是相同的,则这个操作序列叫做线程安全的。

如果存在不相同的概率,则就是非线程安全的。

 

原子性(Atomicity)

如果一个操作时不可分割的,那就是一个原子操作,也叫这个操作具有原子性。相反的,一个操作时可以分割的(如a++,它实际上是a=a+1),则就是非原子操作;原子操作是线程安全的,非原子操作都是非线程安全的,但是我们可以通过同步技术(lock)或同步数据模型(Concurrent容器等)把非原子操作序列变成线程安全的原子操作。

事实上,java并发主要研究的就是3个方面的问题:

1,怎么更好的使用原子操作;

2,怎么把非原子操作变得线程安全;

3,怎么提高原子操作和非原子操作的效率并减少资源消耗。

 

可见性(Visibility)

一个变量被多个线程共享,如果一个线程修改了这个变量的值,其它线程能够立即得知这个修改,则我们称这个修改具有可见性。

(可参考上一章《Java系列笔记(5)-线程》中的Java线程内存模型部分),Java线程内存模型的设计,是每个线程拥有一份自己的工作内存,当变量修改之后,将新值同步到主内存。但是对于普通变量而言,这种同步,并不能保证及时性,所以可能出现工作内存以及更改,主内存尚未同步的情况。

Java中,最简单的方法是使用volatile实现强制同步,它的实现方式是保证了变量在修改后立即同步到主内存,且每次使用该变量前,都先从主内存刷新该值。

另外,可以采用synchronized或final关键字实现可见性;

synchronized的实现原理在于,一个变量如果要执行unlock操作,必须先把改变量同步到主内存中(执行store和write)。因此一个变量如果被synchronized实现强制同步,则即使不用volatile,也可以实现强制可见性。

final的实现原理在于,一个变量被final修饰,则其值(或引用地址)不可以再被修改,所以其它线程如果能看到,也只是能看到这个变量的这个唯一值(对于对象而言,是唯一引用)。

需要注意,一个操作被保证了可见性,并不一定能保证原子性,比如:

volatile int a;
a
++;

在上面这段代码中,a是满足可见性的,但是a++仍然不是原子性操作。当有多个线程执行a++时,仍然存在并发问题。

 

有序性(Ordering)

Java线程的有序性,表现为两个方面:

在一个线程内部观察,所有操作都是有序的,所有指令按照“串行(as-if-serial,字面意思是“像排了序一样”,as-if-serial的真正含义是不管怎么重排序,一个单线程程序的执行结果都必须相同)” 的方式执行。

 

在线程间观察,也就是从某个线程观察另一个线程,则所有其他线程都可以交叉并行执行,是正序的,唯一例外的是被同步方法、同步块、volatile等字段修饰的强制同步的代码,需要在线程间保持有序。

 

注:关于指令重排序、as-if-serial、happens-before等,可以参考上一章《Java系列笔记(5)-线程》,也可以参考网上的众多资料,这里不再叙述。

 

JUC

java.util.concurrent包,这个包是从JDK1.5开始引入的,在此之前,这个包独立存在着,它是由Doug Lea开发的,名字叫backport-util-concurrent,在1.5开始引入java,命名路径为java.util.concurrent,其中的基本实现方式,也有所改变。主要包括以下类:(来源于:深入浅出Java Concurreny(http://www.blogjava.net/xylz/archive/2010/06/30/324915.html))

并发处理相关知识

 

JNI

Java native interface,java本地方法接口,由于Java本身是与平台无关的,所以在性能等方面有可能存在影响(虽然随着java的发展,这种情况很少),为了解决这种问题,使用C/C++编写了JNI接口,在java中可以直接调用这些代码的产生的机器码,从而避免严重影响性能的代码段。关于JNI,可以参考这篇文章:http://www.cnblogs.com/mandroid/archive/2011/06/15/2081093.html

 

CAS

CAS,compare and swap,比较和替换(也有人直接理解为compare and set,其实是一样的)。CAS是一种乐观锁做法,而且整个JUC的实现都是基于CAS机制的。

如果直接用synchronized加锁,这是一种悲观锁做法,所谓悲观锁,就是悲观的认为线程是绝对不安全的,必须保证在swap值之前,没有任何其它线程操作当前值。synchronized是一种独占锁,性能受限于这种悲观策略。这一点将在后面详述。

而CAS是一种乐观锁机制,所谓乐观锁,就是相信在compare 和swap之间,被其它线程影响的可能性不大,只要compare校验通过,就可以进行swap。

在Java中,compareAndSet的基本代码如下:

1 public final boolean compareAndSet(int expect, int update) {
2 return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
3 }

 

从代码中看,java的compareAndSet使用使用JNI中的unsafe接口来实现的,这是因为,现代CPU基本都提供了特殊的指令,能够做到自动更新共享数据的同时,检测其它线程的干扰,也就是说,CPU本身提供了compareAndSet功能。所以才能提供JNI的CAS接口。

有了JNI的CAS接口,基于该接口的JUC就能获得更高性能。

在 Intel 处理器中,比较并交换通过指令cmpxchg实现。比较是否和给定的数值一致,如果一致则修改,不一致则不修改。

 

AQS

AbstractQueuedSynchronizer,简称AQS,是J.U.C最复杂的一个类。这个类是CountDownLatch/FutureTask /ReentrantLock/RenntrantReadWriteLock/Semaphore的基础,是Lock和Executor实现的前提。参考:(http://www.blogjava.net/xylz/archive/2010/07/06/325390.html)

并发处理相关知识

非阻塞算法

任何一个线程的失败或挂起不应该影响其他线程的失败或挂起的算法叫做非阻塞算法。现代CPU能够提供非阻塞功能,它可以在自动更新共享数据的同时,检查其它线程的干扰。

 

2,volatile


正如前面所述,java中volatile字段的作用是保证并发过程中某个变量的可见性。而volatile保证可见性的方法如下:

1,Java内存模型不会对volatile指令进行重排序,从而保证对volatile变量的执行顺序,永远是按照其出现顺序执行的。重排序的依据是happens-before法则,happens-before法则共8条,其中有一条与volatile相关,就是:“对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作”。

注:happens-before法则:http://www.blogjava.net/xylz/archive/2010/07/03/325168.html

2,volatile变量不会被缓存在寄存器中(只有拥有线程可见)或者其他对CPU不可见的地方,每次总是从主存中读取volatile变量的结果。

不过需要注意的是:volatile字段只能保证可见性,不能保证原子性,也不能保证线程安全。

 

volatile的工作原理

下面这段话摘自《深入理解Java虚拟机》:

  “观察加入volatile关键字和没有加入volatile关键字时所生成的汇编代码发现,加入volatile关键字时,会多出一个lock前缀指令”

  lock前缀指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供3个功能:

  1)它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;

  2)它会强制将对缓存的修改操作立即写入主存;

  3)如果是写操作,它会导致其他CPU中对应的缓存行无效。

上面的说法解释了volatile的工作原理的起源。不过,建议大家复习一下本系列文章第3章JVM内存分配和第5章线程的内容,来理解下面的解释。与前面这两章中宏观的讲解内存分配和线程内存模型相区别,下面的部分专注于解析java内存模型和volatile的工作原理,但也能更好的理解以前的知识:

注:下面的图参考了:http://www.cnblogs.com/aigongsi/archive/2012/04/01/2429166.html,其中描述了内存模型的6种操作,比上一章中介绍的8种操作少了lock、unlock 2 种,这6种操作都是原子性的。

 并发处理相关知识

在上图中,如果是普通变量:

1,变量值从主内存(在堆中)load到本地内存(在当前线程的栈桢中);

2,之后,线程就不再和该变量在主内存中的值由任何关系,而是直接操作在副本变量上(这样速度很快),这时,如果主存中的count或本地内存中的副本发生任何变化,都不会影响到对方,也正是这个原因导致并发情况下出现数据不一致;

3,在修改完的某个时刻(线程退出之前),自动把本地内存中的变量副本值回写到对象在堆中的对应变量。

 

这6步操作中:

read和load是将主存变量加载到当前本地内存;

use和assign是执行线程代码,改变副本值,可以多次执行;

store和write是用本地内存回写到主存;

 

如果是volatile修饰的变量:

volatile仍然在执行一个从主存加载到工作内存,并且将变更的值写回主存的操作,但是:

1,volatile保证每次读取该变量前,都判断当前值是否已经失效(即是否已经与主存不一致),如果已经失效,则从主存load最新的变量;

2,volatile保证每次对该变量做出修改时,都立即写入主存;

 

需要注意的是,虽然volatile保证了上面的特性,但是它只是保证了可见性,却没有保证原子性,也就是说,read-load-use-assign-store-write,这些操作序列组合起来仍然是非原子操作。举个例子:

共享变量当前在主存中的值为count=10;线程1和线程2都对该值进行自增操作,按如下步骤进行:

1,线程1和2都读取最新值,得到值为count=10;

2,线程1被阻塞;

3,线程2执行自增,写回count=11;

4,线程1唤醒,由于之前已经完成了读取变量的操作,所以这里直接进行自增。于是也自增到11,回写主存,最终count=11;

与我们期望的两次自增count=12冲突;

目前来说,要保证原子性,只能通过synchronized、Lock接口、Atomic*来实现。

 

说了这么多,有同学可能会问,为什么volatile这也不行那也不行,陷阱这么多,我们还要用它呢?

volatile相对于synchronized,最大的好处是某些情况下它的性能高,而且使用起来直观简便。而且,如果你的“代码本身能保证原子性”,那么用volatile是个不错的选择:

这里所说的代码本身能保证原子性,是指:

1,对变量的写操作,不依赖于当前的值(就是说,不会先读取当前值,然后在当前值的基础上进行改变,比如,不是自增,而是赋值);

2,变量没有包含在其它变量的不变式中(这一点不是很好理解,可以参考这里:http://www.ibm.com/developerworks/cn/java/j-jtp06197.html)

 

一个最常见的volatile的应用场景是boolean的共享状态标志位,或者单例模式的双重检查锁(参考Java并发编程:volatile关键字解析,http://www.cnblogs.com/dolphin0520/p/3920373.html)

 

另外,有一个关于volatile的常见的坑就是:从上面的描述可以看出,volatile对于基本数据类型(值直接从主内存向工作内存copy)才有用。但是对于对象来说,似乎没有用,因为volatile只是保证对象引用的可见性,而对对象内部的字段,它保证不了任何事。即便是在使用ThreadLocal时,每个线程都有一份变量副本,这些副本本身也是存储在堆中的,线程栈桢中保存的仍然是基本数据类型和变量副本的引用。

所以,千万不要指望有了volatile修饰对象,对象就会像基本数据类型一样整体呈现原子性的工作了。

事实上,如果一个对象被volatile修饰,那么就表示它的引用具有了可见性。从而使得对于变量引用的任何变更,都在线程间可见。

这一点在后面将要介绍的AtomicReference中就有应用。

 

3,Atom


java中,可能有一些场景,操作非常简单,但是容易存在并发问题,比如i++,此时,如果依赖锁机制,可能带来性能损耗等问题,于是,如何更加简单的实现原子性操作,就成为java中需要面对的一个问题。

java中的atom操作,比如AtomicInteger,AtomicLong,AtomicBoolean,AtomicReference,AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray;这些操作中旺旺提供一些原子化操作,比如incrementAndGet(相当于i++),compareAndSet(安全赋值)等,相关方法和用法就不再赘述,网上有很多类似资料,或者直接读源代码也很容易懂。

在backport-util-concurrent没有被引入java1.5并成为JUC之前,这些原子类和原子操作方法,都是使用synchronized实现的。不过JUC出现之后,这些原子操作基于JNI提供了新的实现,以AtomicInteger为例,看看它是怎么做到的:

如果是读取值,很简单,将value声明为volatile的,就可以保证在没有锁的情况下,数据是线程可见的:

1     private volatile int value;public final int get() {
2 return value;
3 }

 

那么,涉及到值变更的操作呢?以AtomicInteger实现:++i为例:

并发处理相关知识
1     public final int incrementAndGet() {
2 for (;;) {
3 int current = get();
4 int next = current + 1;
5 if (compareAndSet(current, next))
6 return next;
7 }
8 }
并发处理相关知识

 

在这里采用了CAS操作,每次从内存中读取数据然后将此数据和+1后的结果进行CAS操作,如果成功就返回结果,否则重试直到成功为止。

而这里的comparAndSet(current,next)就是前面介绍CAS的时候所说的依赖JNI实现的乐观锁做法:

    public final boolean compareAndSet(int expect, int update) {  
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

 

除了基本数据类型的原子化操作以外,JUC还提供了数组的原子化、引用的原子化,以及Updater的原子化,分别为:

并发处理相关知识

下面主要介绍这3类原子化操作为什么要原子化以及分别是怎么实现的。

 

数组原子化

注意,Java中Atomic*Array,并不是对整个数组对象实现原子化(也没有必要这样做),而是对数组中的某个元素实现原子化。例如,对于一个整型原子数组,其中的原子方法,都是对每个元素的:

并发处理相关知识
1 public final int getAndDecrement(int i) {
2 while (true) {
3 int current = get(i);
4 int next = current - 1;
5 if (compareAndSet(i, current, next))
6 return current;
7 }
8 }
并发处理相关知识

 

引用原子化

有些同学可能会疑惑,引用的操作本身不就是原子的吗?一个对象的引用,从A切换到B,本身也不会出现非原子操作啊?这种想法本身没有什么问题,但是考虑下嘛的场景:对象a,当前执行引用a1,线程X期望将a的引用设置为a2,也就是a=a2,线程Y期望将a的引用设置为a3,也就是a=a3。

如果线程X和线程Y都不在意a到底是从哪个引用通过赋值改变过来的,也就是说,他们不在意a1->a2->a3,或者a1->a3->a2,那么就完全没有关系。

但是,如果他们在乎呢?

X要求,a必须从a1变为a2,也就是说compareAndSet(expect=a1,setValue=a2);Y要求a必须从a1变为a3,也就是说compareAndSet(expect=a1,setValue=a3)。如果严格遵循要求,应该出现X把a的引用设置为a2后,Y线程操作失败的情况,也就是说:

X:a==a1--> a=a2;

Y:a!=a1 --> Exception;

但是如果没有原子化,那么Y会直接将a赋值为a3,从而导致出现脏数据。

 

这就是原子引用AtomicReference存在的原因。

并发处理相关知识
1      public final V getAndSet(V newValue) {
2 while (true) {
3 V x = get();
4 if (compareAndSet(x, newValue))
5 return x;
6 }
7 }
并发处理相关知识

注意,AtomicReference要求引用也是volatile的。

Updater原子化

其它几个Atomic类,都是对被volatile修饰的基本数据类型的自身数据进行原子化操作,但是如果一个被volatile修饰的变量本身已经存在在类中,那要如何提供原子化操作呢?比如,一个Person,其中有个属性为age,private volatile int age;,如何对age提供原子化操作呢?

1 private AtomicIntegerFieldUpdater<Person> updater = AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");  
2 updater.getAndIncrement(5);//加5岁
3 updater.compareAndSet(person, 30, 35)//如果一个人的年龄是30,设置为35。

 

4,ThreadLocal


对于多线程的Java程序而言,难免存在多线程竞争资源的情况。对于竞争的资源,解决的方式往往分为以时间换空间或以空间换时间两种方式。

1,后者的做法是将一份资源复制成多份,占用多份空间,但是每个线程自己访问自己的资源,从而消除竞争,这种做法是ThreadLocal的做法,它虽然消除了竞争,但它是通过数据隔离的方法实现的,所以被隔离的各份数据是无法同步的,本节就要介绍这种做法。

2,也有很多资源是无法复制成多份或者不适合复制成多份的,如打印机资源。因此以时间换空间的做法就是只有一份资源,大家按照一定的顺序串行的去访问这个资源。这种方式的主要做法,就是在资源上加锁,加锁的方法,将在后面第9节介绍。

 

示例

下面通过一个典型的ThreadLocal的应用案例作为入口,来分析ThreadLocal的原理和用法(更详细代码请参考《Java并发编程:深入剖析ThreadLocal》http://www.cnblogs.com/dolphin0520/p/3920407.html):

设想下面的场景:

编写一个数据库连接器(或 http session管理器),要求多个线程能够连接和关闭数据库,优先考虑下面的方案:

并发处理相关知识
 1     class ConnectionManager {
2 private static Connection connect = null;
3 public static Connection openConnection() {
4 if(connect == null){
5 connect = DriverManager.getConnection();
6 }
7 return connect;
8 }
9 public static void closeConnection() {
10 if(connect!=null)
11 connect.close();
12 }
13 }
并发处理相关知识

这个方案中,多个线程公用ConnectionManager.openConnection()和ConnectionManager.closeConnnection(),由于没有同步控制,所以很容易出现并发问题,比如,同时创建了多个连接,或者线程1openConnection时,线程2恰好在执行closeConnection。

解决这个问题有两种方案:

1,对connectionManager中openConnection和closeConection加synchronized强制同步。这种方案解决了并发,却带来了新问题,由于synchronized导致了同一只可只有一个线程能访问被锁对象,所以其它线程只能等待。

2,去掉ConnectionManager中的static,使得每次访问Connectionmanager,都必须new一个对象,这样每个线程都用自己的独立对象,相互不影响。eg:

并发处理相关知识
1     public void insert() {
2 ConnectionManager connectionManager = new ConnectionManager();
3 Connection connection = connectionManager.openConnection();
4
5 //使用connection进行操作
6
7 connectionManager.closeConnection();
8 }
并发处理相关知识

 

这个确实解决了并发,并且也可以多线程同步执行,但是它存在严重的性能问题,每执行一次操作,就需要new一个对象然后再销毁。

ThreadLocal的引入,恰当的解决了上面的问题,ThreadLocal不是线程,它是一种变量,不过,它是线程变量的副本,它是一个泛型对象,例如,线程A创建时,初始化了一个对象user,那么ThreadLocal<User> userLocal就是user在线程A中的一个副本,userLocal中的值在初始时与user相同,但是在线程A运行过程中,userlocal的任何变化不会同步到user上,不会影响user的值。

如果采用ThreadLocal,上面的数据库连接管理器问题的解决方案是:

并发处理相关知识
 1     class ConnectionManager {
2 private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
3 public Connection initialValue() {
4 return DriverManager.getConnection(DB_URL);
5 }
6 };
7 public static Connection getConnection() {
8 return connectionHolder.get();
9 }
10
11 public static void closeConnection() {
12 if(connectionHolder.get()!=null)
13 connectionHolder.get().close();
14 }
15 }
并发处理相关知识

 

ThreadLocal的方法

ThreadLocal提供的方法很简单,主要有:

  1. public T get() { }
  2. public void set(T value) { }
  3. public void remove() { }
  4. protected T initialValue() { }

 

ThreadLocal的原理

分析ThreadLocal的源代码(分析过程参考这里:http://www.cnblogs.com/dolphin0520/p/3920407.html),可得,ThreadLocal的原理是:

1,在每个线程Thread内部有一个ThreadLocal.ThreadLocalMap类型的成员变量threadLocals,这个threadLocals就是用来存储实际的变量副本的,键值为当前ThreadLocal变量,value为变量副本(即T类型的变量)。

2,初始时,在Thread里面,threadLocals为空,当通过ThreadLocal变量调用get()方法或者set()方法,就会对Thread类中的threadLocals进行初始化,并且以当前ThreadLocal变量为键值,以ThreadLocal要保存的副本变量为value,存到threadLocals。

3,注意,一般在get之前,需要先执行set(),以保证threadlocals中有值,如果在get()之前,没有执行过set(),则ThreadLocal会自动调用setInitialValue()方法,setInitialValue()的源代码是这样的:

并发处理相关知识
 1         private T setInitialValue() {
2 T value = initialValue();
3 Thread t = Thread.currentThread();
4 ThreadLocalMap map = getMap(t);
5 if (map != null)
6 map.set(this, value);
7 else
8 createMap(t, value);
9 return value;
10 }
并发处理相关知识

 

它先取出当前线程的ThreadLocalMap,即threadLocals变量,然后将value set进去,所以,如果没有提前执行过set方法,initialValue()默认返回的又是null,所以可能导致运行过程中出现NPE。建议最好在声明ThreadLocal变量时,重写initialValue()方法,这样即使没有提前执行set,也能有个初始值(如前面ConnectionHolder中的代码)。

4,然后在当前线程里面,如果要使用副本变量,就可以通过get方法在threadLocals里面查找。

ThreadLocal泛型的变量类型,不能是基本数据类型,只能是类,如果一定要将基本上数据类型做泛型参数,则可以采用Integer、Long、Double等类。

 

使用ThreadLocal的步骤

1,、在多线程的类(如ThreadDemo类)中,创建一个ThreadLocal<Object>对对象xxxLocal,用来保存线程间需要隔离处理的对象xxx。

2、在ThreadDemo类中,创建一个获取要隔离访问的数据的方法getXxx(),在方法中判断,若ThreadLocal对象为null时候,应该new()一个隔离访问类型的对象,并强制转换为要应用的类型。

3、在ThreadDemo类的run()方法中,通过getXxx()方法获取要操作的数据,这样可以保证每个线程对应一个数据对象,在任何时刻都操作的是这个对象。

 

ThreadLocal实现变量副本的方法

ThreadLocal实现变量副本,并没有真的将原来的变量clone一份出来,而是采用了一种很灵活的方法,假设对每个单独的线程ThreadA而言,当前ThreadLocal为localXx(这是key),初始外部变量为va(这是value):

1,第一次执行set时,new了一个Entry(localXx, va),并添加到localXx的ThreadLocalMap中,此时,Entry.value的引用就是指向va的强引用;

2,此时如果执行localXx.get(),会得到va

3,此时,如果在当前线程ThreadA直接对va执行set操作,仍然会更新外部变量va的值,但如果在另外一个线程ThreadB中希望对va进行操作,则由于此时ThreadB直接执行get得到的是null,所以无法访问va,除非我们将va声明为final的,并set到ThreadB中;

3,后续再进行set时,比如set进来的新值为va1,则直接替换Entry中的value,得到Entry(localXx, va1),此时原来的va在ThreadLocal这里,已经得到释放了,当前ThreadLocal跟原来的va已经没有任何关系了。

4,如果此时再执行get操作,得到的就是新的va1;

 

从上面的步骤可以看出,ThreadLocal只是用原变量va做为初始值,但是它并未真的复制va,后续执行ThreadLocal.set之后,ThreadLocal中存放的已经是新set的对象了;

这也是为什么ThreadLocal只能对类对象有效的原因了,因为它的set,改变的是value的引用。

具体例子可以参考下面的代码:

下面的例子中User包含两个属性:name、age,重写了toString方法;

并发处理相关知识
 1     public class ThreadLocalTest {
2 ThreadLocal<User > userLocal = new ThreadLocal <User>();
3 public void set(User user) {
4 userLocal.set(user);
5 }
6 public User get() {
7 return userLocal.get();
8 }
9 public static void main( String[] args) throws InterruptedException {
10 final ThreadLocalTest test = new ThreadLocalTest();
11 final User user1 = new User( "AAA", 5 );//注意这个user1被声明成final的了
12 test.set(user1);
13 System.out.println(test.get()); //这里得到的是user1的初始值:AAA,5
14 Thread thread1 = new Thread() {
15 public void run() {
16 test.set(user1);
17 test.get().setName( "BBB");//这里get()得到的是user1,所以会影响外部主线程
18 System.out.println(test.get()); //BBB,5
19 User user2 = new User("CCC" , 5);
20 test.set(user2); //这里thread1的ThreadLocal.userLocal中存储的值变为user2了,外部主线程中仍然是user1
21 System.out.println(test.get()); // CCC, 5
22 test.get().setName( "DDD");//这里get()得到的是user2,不会影响外部主线程
23 System.out.println(test.get()); //DDD,5
24 };
25 };
26 thread1.start();
27 thread1.join();
28 // 这里得到的值user1,已经在上面设置BBB的时候已经被更新过了
29 // 但是不会受thread中更新CCC和DDD的影响,所以这里得到的是BBB,5
30 System.out.println(test.get());
31 }
32 }
并发处理相关知识

 

得到的结果为:

[AAA,5]

[BBB,5]

[CCC,5]

[DDD,5]

[BBB,5]

 

ThreadLocal的内存泄露问题

在第一次将T类型的变量value set到ThreadLocal时,它是将value set到ThreadLocalMap 中去的,但是需要注意ThreadLocalMap并不是Map接口的子类,它是一个ThreadLocal的内部类,其中的Entry是一种特殊实现:static class Entry extends WeakReference< ThreadLocal> 

对ThreadLocal.ThreadLocalMap.Entry执行set操作时,如果以前这个Entry(key,value)不存在,则会new一个Entry。如果这个Entry已经存在,则直接替换Entry.value的引用为新的value;

下面的分析和图来自于:http://www.cnblogs.com/onlywujun/p/3524675.html

如下图,每个thread中都存在一个map, map的类型是ThreadLocal.ThreadLocalMap. Map中的key为一个threadlocal实例. 这个Map的确使用了WeakReference(虚线),不过弱引用只是针对key. 每个key都弱引用指向threadlocal. 当把threadlocal实例置为null以后(或threadLocal实例被GC回收了,弱引用会被回收),没有任何强引用指向threadlocal实例,所以threadlocal将会被gc回收. 但是,我们的value却不能回收,因为存在一条从current thread连接过来的强引用. 只有当前thread结束以后, current thread就不会存在栈中,强引用断开, Current Thread, Map, value将全部被GC回收.

 所以得出一个结论就是只要这个线程对象被gc回收,就不会出现内存泄露,但在threadLocal设为null和线程结束这段时间不会被回收的,就发生了我们认为的内存泄露。其实这是一个对概念理解的不一致,也没什么好争论的。最要命的是线程对象不被回收的情况,这就发生了真正意义上的内存泄露。比如使用线程池的时候,线程结束是不会销毁的,会再次使用的。就可能出现内存泄露。

并发处理相关知识

注意:Java为了最小化减少内存泄露的可能性和影响,在ThreadLocal的get,set的时候都会执行一个for循环,遍历其中所有的entiry,清除线程Map里所有key为null的value。这也大大减小了出现内存泄露的风险。但最怕的情况就是,threadLocal对象设null了,开始发生“内存泄露”,然后使用线程池,这个线程结束,线程放回线程池中不销毁,这个线程一直不被使用,或者分配使用了又不再调用get,set方法,那么这个期间就会发生真正的内存泄露。

关于ThreadLocal内存泄露问题的数据,有兴趣的可以参考这里:http://liuinsect.iteye.com/blog/1827012

 

5,CountDownLatch和CyclicBarrier


CountDownLatch

CountDownLatch是一种Latch(门闩),它的操作类似于泄洪,或聚会。主要有两种场景:

1,泄洪:即一个门闩(计数器为1)挡住所有线程,放开后所有线程开始执行。在门闩打开之前,所有线程在池子里等着,等着这个门闩的计数器减少到0,门闩打开之后,所有线程开始同时执行;

这种场景一个典型例子是并发测试器(启动多个线程去执行测试用例,一声令下,同步开始执行,即下面的beginLatch):

并发处理相关知识
 1         int threadNum =10; //并发线程数
2 CountDownLatch beginLatch = new CountDownLatch(1 );// 用于触发各线程同时开始
3 CountDownLatch waitLatch = new CountDownLatch(threadNum);// 用于等待各线程执行结束
4 ExecutorService executor = Executors. newFixedThreadPool(threadNum);
5 for (int i = 0; i < threadNum; i++) {
6 Callable<String> thread = new SubTestThread(beginLatch, waitLatch, method, notifier);
7 executor. submit(thread);
8 }
9 beginLatch.countDown(); // 开始执行!
10 waitLatch.await(); // 等待结束
11 private class SubTestThread implements Callable< String> {
12 private CountDownLatch begin;
13 private CountDownLatch wait;
14 private FrameworkMethod method;
15 public SubTestThread(CountDownLatch begin, CountDownLatch wait, FrameworkMethod method) {
16 this.begin = begin;
17 this.wait = wait;
18 this.method = method;
19 }
20 @Override
21 public String call() throws Exception {
22 try {
23 begin.await();
24 runTest(method);
25 } catch (Exception e) {
26 throw e;
27 } finally {
28 wait.countDown();
29 }
30 return null ;
31 }
32 }
并发处理相关知识

 

2,聚会:即N个线程正在执行,一个门闩(计数器为N)挡住了后续操作,每个线程执行完毕后,计数器减1,当门闩计数器减到0时,表示所有线程都执行完毕(所有人到齐,party开始),可以开始执行后续动作了。

这种场景一个典型的例子是记账汇总,即多个子公司的账目,都要一一算完之后,才汇总到一起算总账。上面例子中的waitLatch就是这样的latch;

 

countDownLatch的真正原理在于latch是一种计数器,它的两个方法分别是countDown()和await(),其中countDown()是减数1,await()是等待减到0,当每次调用countDown()时,当前latch计数器减1,减到0之前,当前线程的await()会一直卡着(阻塞,WAITING状态),当计数器减少到0,唤醒当前线程,继续执行await()后面的代码;

await(long timeout, TimeUtil unit)是另一个await方法,特点是可以指定wait的时间,

-如果超出指定的等待时间,await()不再等待,返回值为false;

-如果在指定时间内,计数器减到0,则返回值为true;

-如果线程在等待中被中断或进入方法时已经设置了中断状态,则抛出InterruptedException异常。

 

CyclicBarrier是一种回环栅栏,它的作用类似于上面例子中的的waitLatch,即等到多个线程达到同一个点才继续执行后续操作,如:

并发处理相关知识
 1     public class CyclicBarrierTest {
2 public static class ComponentThread implements Runnable {
3 CyclicBarrier barrier;// 计数器
4 int ID; // 组件标识
5 int[] array; // 数据数组
6 // 构造方法
7 public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
8 this.barrier = barrier;
9 this.ID = ID;
10 this.array = array;
11 }
12 public void run() {
13 try {
14 array[ID] = new Random().nextInt(100);
15 System.out.println("Component " + ID + " generates: " + array[ID]);
16 // 在这里等待Barrier处
17 System.out.println("Component " + ID + " sleep");
18 barrier.await();
19 System.out.println("Component " + ID + " awaked");
20 // 计算数据数组中的当前值和后续值
21 int result = array[ID] + array[ID + 1];
22 System.out.println("Component " + ID + " result: " + result);
23 } catch (Exception ex) {
24 }
25 }
26 }
27 /**
28 * 测试CyclicBarrier的用法
29 */
30 public static void testCyclicBarrier() {
31 final int[] array = new int[3];
32 CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
33 // 在所有线程都到达Barrier时执行
34 public void run() {
35 System.out.println("testCyclicBarrier run");
36 array[2] = array[0] + array[1];
37 }
38 });
39 // 启动线程
40 new Thread(new ComponentThread(barrier, array, 0)).start();
41 new Thread(new ComponentThread(barrier, array, 1)).start();
42 }
43 public static void main(String[] args) {
44 CyclicBarrierTest.testCyclicBarrier();
45 }
46 }
并发处理相关知识

 

可见,cyclicBarrier与countDownLatch的后一种使用方法(聚会)很像,其实两者能够达到相同的目的。区别在于,cyclicBarrier可以重复使用,也就是说,当一次cyclicBarrier到达汇总点之后,可以再次开始,每次cyclicbarrier减数到0之后,会触发汇总任务执行,然后,会把计数器再恢复成原来的值,这也是“回环”的由来。

CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。

在实现方式上也有所不同,CountDownLatch是直接基于AQS编写的,他的await和countDown过程,分别是一次acquireShared和releaseShared的过程;而cyclicBarrier是基于锁、condition来实现的,让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。

(CyclicBarrier的原理,参考:Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例:http://www.cnblogs.com/skywang12345/p/3533995.html?utm_source=tuicool)


6,信号量


信号量(Semaphore)与锁类似,锁是一次允许一次一个线程访问(readWrite锁除外),而信号量用来控制一组资源有多个线程访问,比如一个店铺最多能接受5个客户 ,有10个客户要求访问的话,那么可以用信号量来控制。

Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

Semaphore类位于java.util.concurrent包下,它提供了2个构造器:

并发处理相关知识
1     public Semaphore(int permits) {          //参数permits表示许可数目,即同时可以允许多少线程进行访问
2 sync = new NonfairSync(permits);
3 }
4 public Semaphore(int permits, boolean fair) { //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
5 sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
6 }
并发处理相关知识

 

下面说一下Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:

  1. public void acquire() throws InterruptedException {  }     //获取一个许可
  2. public void acquire(int permits) throws InterruptedException { }    //获取permits个许可
  3. public void release() { }          //释放一个许可
  4. public void release(int permits) { }    //释放permits个许可

acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。

release()用来释放许可。注意,在释放许可之前,必须先获获得许可。

这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:

  1. public boolean tryAcquire() { };    //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
  2. public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
  3. public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
  4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

另外还可以通过availablePermits()方法得到可用的许可数目。

下面通过一个例子来看一下Semaphore的具体使用:

假若一个工厂有5台机器,但是有8个工人,一台机器同时只

能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:

并发处理相关知识
 1     public class Test {
2 public static void main(String[] args) {
3 int N = 8; //工人数
4 Semaphore semaphore = new Semaphore(5); //机器数目
5 for(int i=0;i<N;i++)
6 new Worker(i,semaphore).start();
7 }
8 static class Worker extends Thread{
9 private int num;
10 private Semaphore semaphore;
11 public Worker(int num,Semaphore semaphore){
12 this.num = num;
13 this.semaphore = semaphore;
14 }
15 @Override
16 public void run() {
17 try {
18 semaphore.acquire();
19 System.out.println("工人"+this.num+"占用一个机器在生产...");
20 Thread.sleep(2000);
21 System.out.println("工人"+this.num+"释放出机器");
22 semaphore.release();
23 } catch (InterruptedException e) {
24 e.printStackTrace();
25 }
26 }
27 }
28 }
并发处理相关知识

 

执行结果:

工人0占用一个机器在生产... 

工人1占用一个机器在生产... 

工人2占用一个机器在生产... 

工人4占用一个机器在生产... 

工人5占用一个机器在生产... 

工人0释放出机器 工人2释放出机器 

工人3占用一个机器在生产... 

工人7占用一个机器在生产... 

工人4释放出机器 工人5释放出机器 

工人1释放出机器 工人6占用一个机器在生产... 

工人3释放出机器 工人7释放出机器 

工人6释放出机器

7,Condition


在上一章“Java系列笔记(5)-线程”中,我们曾经说过,线程间通信并不是靠消息,而是靠共享内存,不过,本节要介绍一种更加高效的通信方式:Condition。

Condition 与上一章介绍的线程间通信的wait、notify等方法有相似之处,但也有不同。其相似之处在于,都建立与锁的基础上,

wait、notify都是在同步代码块中,建立在synchronized所作用的对象上。

而Condition直接作用在Lock对象上,因此建立一个Condition对象,必须通过lock.newCondition()来构造。

  1. Lock lock
    = new
    ReentrantLock();  
  2. Condition condition= lock.newCondition();

 在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现。

并发处理相关知识
 1     lock.lock();    //synchronized
2 try {
3 while(bool) {
4 condition.await();//this.wait();
5 }
6 System.out.println("this is condition test" );
7 condition.signal();//this.notify();
8 } finally {
9 lock.unlock();
10 }
并发处理相关知识

 

而且,对于wait、notify机制而言,只能作用于当前同步代码块,不能建立多重通信条件,而,使用Condition机制,可以建立多重通信条件。

下面的例子,是一个很有意思的并发缓冲区,其中用Condition建立了两个条件,一个写条件,一个读条件,这个例子的具体用法和意义,参考这里:http://blog.csdn.net/ghsau/article/details/7481142

 

并发处理相关知识
 1 class BoundedBuffer {  
2 final Lock lock = new ReentrantLock();//锁对象
3 final Condition notFull = lock.newCondition();//写线程条件
4 final Condition notEmpty = lock.newCondition();//读线程条件
5
6 final Object[] items = new Object[100];//缓存队列
7 int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/;
8
9 public void put(Object x) throws InterruptedException {
10 lock.lock();
11 try {
12 while (count == items.length)//如果队列满了
13 notFull.await();//阻塞写线程
14 items[putptr] = x;//赋值
15 if (++putptr == items.length) putptr = 0;//如果写索引写到队列的最后一个位置了,那么置为0
16 ++count;//个数++
17 notEmpty.signal();//唤醒读线程
18 } finally {
19 lock.unlock();
20 }
21 }
22
23 public Object take() throws InterruptedException {
24 lock.lock();
25 try {
26 while (count == 0)//如果队列为空
27 notEmpty.await();//阻塞读线程
28 Object x = items[takeptr];//取值
29 if (++takeptr == items.length) takeptr = 0;//如果读索引读到队列的最后一个位置了,那么置为0
30 --count;//个数--
31 notFull.signal();//唤醒写线程
32 return x;
33 } finally {
34 lock.unlock();
35 }
36 }
37 }
并发处理相关知识

 

可见,用两个条件,可以灵活的确定应该唤醒写线程还是读线程,这就是使用Condition的灵活之处。

8,Exchanger


从JDK1.5开始,Java开始提供一个叫Exchanger的工具套件,这个工具套件,可以真正用于两个线程之间交换数据。

Exchanger类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的。

Exchanger提供的方法非常简单,其接口就是两个方法:

 

  1. public V exchange(V x)
    throws InterruptedException
  2. public V exchange(V x,long timeout,TimeUnit unit)throws InterruptedException,TimeoutException

从官方的javadoc可以知道,当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行 对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达——完成交换正常返回;或者当前线程被中断——抛出中断 异常;又或者是等候超时——抛出超时异常。


转出自:http://www.cnblogs.com/zhguang/p/concurrent1.html