一步一步掌握线程机制(六)---Atomic变量和Thread局部变量

时间:2023-12-26 18:32:19

前面我们已经讲过如何让对象具有Thread安全性,让它们能够在同一时间在两个或以上的Thread中使用。Thread的安全性在多线程设计中非常重要,因为race condition是非常难以重现和修正的,我们很难发现,更加难以改正,除非将这个代码的设计推翻来过。

同步最大的问题不是我们在需要同步的地方没有使用同步,而是在不需要同步的地方使用了同步,导致效率极度低下。所以,我们要想办法限制同步,因为无谓的同步比起无谓的运算还更加让人无语。

但是否有办法完全避免同步呢?

在有些情况下是可以的。我们可以使用之前的volatile关键字来解决这个问题,因为volatile修饰的变量是被完整的存储的,在读取它们的时候,能够确保它们是有效的,也就是最近一次存入的值。但这也是可以避免同步的唯一情况,如果有多个线程同时访问同一份数据,就必须明确的同步化所有对该数据的访问以防止各种race condition。

为什么无法完全避免呢?

每组线程都有自己的一组寄存器,但系统将某个线程分配给CPU时,它会把该线程持有的信息加载到CPU的寄存器中,在分配不同的线程给CPU前,它会将寄存器的信息保存下来,所以线程之间绝不会共享保存在寄存器中的数值,但是通过使用volatile,我们可以确保变量不会保持在寄存器中,这点我们在之前的文章中已经说过了,这就能够确保变量是真正的共享于线程之间。但是同步为什么能够解决这个问题呢?因为当虚拟机进入synchronized方法或者synchronized块的时候,它必须重新加载原本已经缓冲到自有寄存器上的数据,也就是存入到主存储器中。

也就是说,除了使用volatile和同步,我们就没有方法保证被线程共享的数据在访问上的安全性,但事实证明,volatile并不是值得推荐的解决方法,所以也只剩下同步了。

既然这样,我们唯一能够做到的就是学会恰当的使用同步。

同步的目的就是防止race condition导致数据在不一致或者变动中间状态被使用到,这段期间会禁止线程间的竞争。但这个保证会因为一个微妙的问题而变得不可信:线程间可能在同步的程序代码运行前就开始竞争。

      并不是所有的race condition都应该避免,只有在无Thread安全性的程序段中的race condition才会被认为是问题。我们可以使用两种方法来解决这个问题:使用synchronized程序代码来防止race condition的发生或者将程序设计成无需同步(或仅使用最少的同步)就具有Thread安全性。
      对于第二种方法,我们应该尽可能缩小同步的范围并重新组织程序代码以便让具有Thread安全性的段落能够被移出synchronized块之外,这点非常重要,如果有足够的程序代码能够被移出synchronized块之外,我们甚至根本就不需要进行同步。
      我们可以使用volatile来减少同步,但是volatile只能针对单一的载入或者存储操作,但很多情况下都不是这样子,所以它的使用是比较不常见的。
      J2SE 5.0提供了一组atomic class来处理更加复杂的情况。相对于只能处理单一的atomic操作,这些class能够让多个操作被atomic地对待,这样我们也就有可能不需要同步就能实现同步机制所做到的一切。
      我们可以用AtomicInteger,AtomicLong,AtomicBoolean和AtomicReference这四个class实现的四个基本的atomic类型来处理integer,long,boolean和对象。这些class都提供了两个构造器:默认的构造器的值为0,false,false或者null,另一个构造器是以程序设计者所指定的值来初始化和创建变量。set()和get()这两个方法就提供了volatile变量所具有的的功能:能够atomic的设定与取得值,因为它们能够确保数据的读写是从主存储器上运行的。但是这些class还提供了更多的操作来适应volatile无法解决的情况。
      getAndSet()能够在返回初始值的时候atomic的设定变量成新值,完全不需要任何的同步lock。compareAndSet()与weakCompareAndSet()是有条件的修改程序的方法,这两个方法都要取用两个参数:在方法启动时预期数据所具有的的值,以及要把数据所设定成的值。它们都只会在变量具有预期值的时候才会设定成新值,如果当前值不等于预期值,该变量就不会被重新赋值并且返回false。这两个方法之间有什么区别吗?第二个方法少了一项保证:如果方法返回的值false,该变量不会被变动,但是这并不表示现有值不是预期值,也就是说,这个方法不管初始值是否是预期值都可能会无法更新改值。
      incrementAndGet(),decrementAndGet(),getAndIncrement()和getAndDecrement()提供了前置递增,前置递减,后递增和后递减,之所以有这些方法,是因为这些操作都不是atomic的。
      addAndGet()和getAndAdd()提供了"前置"和"后置"的运算符给指定值的加法运算,它们能够让程序对变量增或者减一个指定值,包括了负值,所以我们就不需要一个相对的减法运算。
      atomic package目前没有实现atomic字符或者浮点变量,但是我们可以使用AtomicInteger来处理字符,就像是字符常量一样,但是使用atomic的浮点数需要atomic带有只读浮点数值的受管理对象。我们也没有实现atomic数组,并没有功能能够对整个数组做atomic化的变动,最多就是通过使用AtomicInteger,AtomicLong和AtomicReference的数组来模型化,但是数组的大小必须在构造时就指定好,并且在操作过程中必须提供索引。至于Boolean的atomic数组,同样也可以通过AtomicInteger来实现。
     atomic package还有两个类:AtomicMarkableReference和AtomicStampedReterence。这两个类能够让mark或stamp跟在任何对象的引用上,更精确点讲,AtomicMarkableReference提供了一种包括对象引用结合boolean的数据结构,而AtomicStampedReference提供了一种包括对象引用结合integer的数据结构。
     其实,atomic class的这些方法在本质上是一样的。在使用的时候,get()方法需要我们传入一个数组作为参数,stamp或者mark被存储在数组的第一个元素而引用被正常返回。其他的get()方法就只是返回引用,mark或者stamp。
    set()与compareAndSet()方法需要额外的参数来代表mark或者stamp。最后,这些class都带有attemptMark()或attemptStamp()方法,用来依据期待的引用设定mark或者stamp。
     到了这里,我们也许会欣喜的将每个程序或者class改成只用atomic变量,事实上,这种尝试并不只是替换变量那么简单。atomic class并不是同步工具的直接替代品,它们的使用会让我们的程序设计更加复杂,就算只是一些简单的class也是这样。
    我们来举一个例子:
public class ScoreLabel extends JLabel implements CharacterListener {
private volatile int score = 0;
private int char2type = -1;
private CharacterSource generator = null, typist = null;
private Lock scoreLock = new ReentrantLock(); public ScoreLabel(CharacterSource generator, CharacterSource typist) {
this.generator = generator;
this.typist = typist;
if (generator != null) {
generator.addCharacterListener(this);
}
if (typist != null) {
typist.addCharacterListener(this);
}
} public ScoreLabel() {
this(null, null);
} public void resetGenerator(CharacterSource newCharactor) {
try {
scoreLock.lock();
if (generator != null) {
generator.removeCharacterListener(this);
}
generator = newCharactor;
if (generator != null) {
generator.addCharacterListener(this);
}
} finally {
scoreLock.unlock();
}
} public void resetTypist(CharacterSource newTypist) {
if (typist != null) {
typist.removeCharacterListener(this);
typist = newTypist;
}
if (typist != null) {
typist.addCharacterListener(this);
}
} public synchronized void resetScore() {
score = 0;
char2type = -1;
setScore();
} private synchronized void setScore() {
SwingUtilities.invokeLater(new Runnable() { @Override
public void run() {
setText(Integer.toString(score));
}
});
} @Override
public synchronized void newCharacter(CharacterEvent ce) {
if (ce.source == generator) {
if (char2type != -1) {
score--;
setScore();
}
char2type = ce.character;
} else {
if (char2type != ce.character) {
score--;
} else {
score++;
char2type = -1;
}
setScore();
}
}
}

为了修改这个类,我们需要三个修改:简单的变量代换,算法的变更和重新尝试操作,每一个修改都要保持class的synchronized版本语义的完整,而这些都是依赖于程序代码所有的效果,所以我们必须确保程序代码的最终效果和synchronized版本是一致的,这个目的也是重构的基本原则:在不影响代码外在表现下对代码进行内在的修改,也是面向对象的核心思想。

      变量代换是最简单的操作,我们只要将之前所使用的变量替换成atomic变量。像是我们这里就可以将resetScore()中的score和char2type这两个变量修改成atomic变量。但有意思的是,将这两个变量一起变更的动作并不是atomic地完成,还是有可能会让char2type变量的变更在完成前就变更了score。这似乎是个问题,但实际上并不是这样,因为我们还保持住这个类在synchronized版本上的语义。要记住,同步的目的是为了消除有问题的race condition,有一些race condition根本就不是问题。我们再来举个例子:resetScore()和newCharacter()方法都是synchronized,但这也只是意味着两者不会同时运行,被拖延住的newCharacter()方法的调用还是可能会因为到达的顺序或者取得lock的顺序而延迟运行,所以打字输入的事件可能会等到resetScore()方法完成后才会被传递,但这时传递到的只是个已经过时的事件,这些也是出于同样的原因:在resetScore()方法中同时变更两个变量这个动作并没有被atomic地处理。
       第二个修改是变更算法。
      我们来看看resetGenerator()和resetTypist()这两个方法的新的实现。之前我们对这两个方法所做的,就是尝试将两者的同步lock分离。这的确是个不错的主意,这两个方法都没有变动score或者char2type变量,事实上,它们甚至也没有变动到相互共享的变量,因为resetGenerator()方法和resetTypist()的同步lock只是用来保护此方法不受多个Thread同时地调用。但是,如果只是简单的将generator变量变成AtomicReference,那么之前我们解决过的问题都会重新引发。
      之所以会是这样,是因为resetGenerator()这个方法封装的状态并不只是generator变量的值,让generator变量变成AtomicReference表示我们知道对该变量的操作是atomic地发生,但当我们要从resetGenerator()方法中完全的删除掉同步时,我们还必须确保整个由此方法所封装住的状态还是一致的,而这些所谓的状态,包括了在字母源产生器上ScoreLabel对象的登记(this对象),在这个方法完成后,我们要确保this对象只有被登记过一次到唯一一个产生器上,也就是被分配到generator的instance变量上的那一个。
     想一下这个具体的情景:现有的产生器是generatorA,某个线程以generatorB产生器调用resetGenerator(),而另一个线程以称为generatorC的产生器来调用此方法。
     我们之前的代码是这样的:
if (generator != null) {
generator.removeCharacterListener(this);
}
generator = newCharactor;
if (generator != null) {
generator.addCharacterListener(this);
}

这段代码最大的问题就是:两个线程同时要求generatorA删除this对象,实际上它会被删除两次,ScoreLabel对象同样也会加入generatorB和generatorC。这两个结果都是错的。

    但是我们前面使用了synchronized来防止这样的错误,但如果是没有使用synchronized的前提下:
if (newGenerator != null) {
newGenerator.addCharacterListener(this);
}
oldGenerator = generator.getAndSet(newGenerator);
if (oldGenerator != null) {
oldGenerator.removeCharacterListener(this);
}

当它被两个线程同时调用时,ScoreLabel对象会被generatorB和generatorC登记,各个线程随后会atomic地设定当前的产生器,因为它们是同时运行,可能会有不同的结果:假设第一个线程先运行,它会从getAndSet()中取回generatorA,然后将ScoreLabel对象从generatorA的监听器中删除,而第二个线程从getAndSet()中取回generatorB并从generatorB的监听器删除ScoreLabel。如果第二个线程先运行,变量会稍有不同,但结果永远会是一样的:不管哪一个对象被分配给genrator的instance变量,它就是ScoreLabel对象所监听的那一个,并且是唯一的一个。

      但是这里会有一个副作用:交换之后监听器会从旧的数据来源中被删除掉,且监听器会在交换前被加入到新的数据来源,它现在有可能接收到既不是现有的产生器也不是打字输入来源所产出的字符。之前newCharacter()方法会检查来源是否为产生器的来源,并在不是的时候会假设来源是打字输入来源。现在就不再是这样,newCharacter()方法必须在处理它之前确认字母的来源,它也必须忽略掉来自不正确的监听器的字母。
      所以我们的最后一步就是重新尝试操作:
@Override
public synchronized void newCharacter(CharacterEvent ce) {
int oldChar2type;
if (ce.source == generator.get()) {
oldChar2type = char2type.getAndSet(ce.character);
if (oldChar2type != -1) {
score.decrementAndGet();
setScore();
}
} else if (ce.source == typist.get()) {
while (true) {
oldChar2type = char2type.get();
if (oldChar2type != ce.character) {
score.decrementAndGet();
break;
} else if (char2type.compareAndSet(oldChar2type, -1)) {
score.incrementAndGet();
break;
}
}
setScore();
}

newCharacter()这个方法的修改是最大的,因为它现在必须要丢弃任何不是来自于所属来源的事件。

      重点是打字输入事件的处理。我们需要检查输入的字母是否是正确的,如果不是,玩家就会被惩罚,这是通过atomic地递减分数来完成的。如果字母被正确的输入,玩家无法被立即的给予奖赏,相对的,char2type变量要先被更新,分数只有在char2type被正确更新时才会更新。如果更新的操作失败了,这代表着在我们处理此事件时,有其他事件已经被其他线程处理过了,并且是成功处理完。
      什么叫成功的处理完?它表示我们必须重新处理事件,因为我们是基于这样的假设:假设正在使用的该变量值不会被变更并且程序代码完成时也是这样,所有已经被我们设定为具有特定值的变量就确实应该是那个值,但因为这与其他线程冲突,这些假设也就被破坏了。通过重新尝试处理事件,就好像从未遇到冲突一样。
      所以我们才需要将这段代码封装在一个无穷循环里:程序不会离开循环直到事件被成功处理掉。显然在多个事件间存在race condition,循环会确保没有一个事件会被漏掉或者处理了超过一次。只要我们确实只处理有效的事件一次,事件被处理的顺序就不重要了,因为在处理完每个事件后,数据就会保持在完好的状态。实际上,当我们使用同步的时候,也是一样的情形:多个事件并没有以指定的顺序进行,它们只是根据授予lock的顺序来进行。
      整个代码如:
public class ScoreLabel extends JLabel implements CharacterListener {
private AtomicInteger score = new AtomicInteger(0);
private AtomicInteger char2type = new AtomicInteger(-1);
private AtomicReference<CharacterSource> generator = null;
private AtomicReference<CharacterSource> typist = null; public ScoreLabel(CharacterSource generator, CharacterSource typist) {
this.generator = new AtomicReference<CharacterSource>();
this.typist = new AtomicReference<CharacterSource>(); if (generator != null) {
generator.addCharacterListener(this);
}
if (typist != null) {
typist.addCharacterListener(this);
}
} public ScoreLabel() {
this(null, null);
} public void resetGenerator(CharacterSource newGenerator) {
CharacterSource oldGenerator;
if (newGenerator != null) {
newGenerator.addCharacterListener(this);
}
oldGenerator = generator.getAndSet(newGenerator);
if (oldGenerator != null) {
oldGenerator.removeCharacterListener(this);
}
} public void resetTypist(CharacterSource newTypist) {
CharacterSource oldTypist;
if (newTypist != null) {
newTypist.addCharacterListener(this);
}
oldTypist = typist.getAndSet(newTypist);
if (oldTypist != null) {
oldTypist.removeCharacterListener(this);
}
} public synchronized void resetScore() {
score.set(0);
char2type.set(-1);
setScore();
} private synchronized void setScore() {
SwingUtilities.invokeLater(new Runnable() { @Override
public void run() {
setText(Integer.toString(score.get()));
}
});
} @Override
public synchronized void newCharacter(CharacterEvent ce) {
int oldChar2type;
if (ce.source == generator.get()) {
oldChar2type = char2type.getAndSet(ce.character);
if (oldChar2type != -1) {
score.decrementAndGet();
setScore();
}
} else if (ce.source == typist.get()) {
while (true) {
oldChar2type = char2type.get();
if (oldChar2type != ce.character) {
score.decrementAndGet();
break;
} else if (char2type.compareAndSet(oldChar2type, -1)) {
score.incrementAndGet();
break;
}
}
setScore();
}
}
}
     atomic变量的目的只是避免同步的性能因素,但是为什么它在无穷循环中的时候还比较快呢?从技术上来讲,答案肯定不是那个无穷循环,额外的循环只会发生在atomic操作失败的时候,这是因为与其他线程发生冲突。要发生一个真正的无穷循环,就需要无穷的冲突。如果使用同步,这也会是一个问题:无数的线程要访问lock同样也会让程序无法正常操作。另一个方面,实际上,atomic class与同步间的性能差异通常不是很大。
     所以,我们有必要平衡同步和atomic变量的使用。在使用同步的时候,线程在取得lock前 会被block住而不能执行。这能够让程序因为其他的线程被阻挡不能执行而atomic地来运行。当使用atomic变量的时候,线程是能够并行的运行相同的代码,atomic变量的目的不是消除不具有线程安全性的race condition,它们的目的是要让程序代码具有线程安全性,所以就不用特地去防止race condition。
     使用atomic变量正是应了这样的一句老话:天下没有白吃的午餐!我们避开了同步,但是在所运行的工作量上却付出了代价。这就是所谓的乐观同步:我们的程序代码抓住保护变量的值并作出在这瞬间没有其他修改的假设,然后程序代码就计算出该变量的新值并尝试更新该变量。如果有其他线程同时修改了这个变量,这个更新就失败并且程序必须重新执行这些步骤,并且使用变量的最新修改过的值。
     上面例子中出现了数据交换,也就是在取得旧值的同时atomic地设定新值的能力。这是通过使用getAndSet()方法来完成的,使用这个方法就能确保只有一个线程能够取得并使用该值。
     如果有更复杂的数据交换时该如何处理?如果值的设定要依据该旧值又该如何处理?
     我们可以通过把get()和compareAndSet()方法放在循环中来处理。get()方法用来取得旧值,以计算新值,然后再通过使用compareAndSet()方法来设定新值----只有在旧值没有被更动的时候才会设定为新值,如果这个方法失败,整个操作可以重新进行,因为当前的线程在失败时都没有动到任何数据。虽然调用过get()方法,计算过新值,数据的交换并不是被个别地atomic,如果它成功,这个顺序可以被认为是atomic的,因为它只在没有其他线程变动该值时才会成功。
     compareAndSet()这个方法处理的其实就是所谓比较与设定这种情况。它是只有在当前值是预期值的时候才atomic地设定值的能力,这个方法是在atmoic层提供条件支持能力的重要方法。,它甚至能够用来实现出由mutex所提供的同步能力。
     如果是更加复杂的比较该如何处理?如果比较是要依据旧值或者外部值该如何处理?
     我们依然可以通过把get()和compareAndSet()方法放在循环中来处理。因为数据交换和比较其实是差不多的,唯一的区别就是get()方法取得旧值的目的是为了用来比较或者只用来完成atomic地交换,而复杂的比较是可以用来观察是否要继续操作。
     虽然atomic class可用的数据类型列表数量是相当大的,但它依然不是完整的。它不能支持字符和浮点数据类型,虽然支持一般对象类型,但是没有对更复杂的对象类型提供支持,像是String这类对象就具有很多方便的操作。但是,我们是在JAVA中编程,也就是面向对象编程,我们完全可以将数据类型封装进只读的数据对象中来对任何新类型实现atomic的支持,然后此数据对象通过改变atomic引用到新的数据对象,就可以被atomic地变动。但这也仅仅在嵌入数据对象的值不会被任何方式变动的时候才有效。任何对数据对象的变动必须只能通过改变引用到不同的对象来完成,旧对象的值是不变的。所有由数据对象所封装的值,不管是直接还是非直接,必须是只读的才能使这个技巧有效的运作。
     所以,我们是不可能atomic地改变浮点数值,但是我们可以atomic地改变对象引用到不同的浮点数值,只要浮点数值是只读的,它就具有线程安全性。
     这就是我们实现的浮点数值的atomic class:
public class AtomicDouble extends Number{
private AtomicReference<Double> value;
public AtomicDouble(){
this(0.0);
} public AtomicDouble(double initVal){
value = new AtomicReference<Double>(new Double(initVal));
} public double get(){
return value.get().doubleValue();
} public void set(double newVal){
value.set(new Double(newVal));
} public boolean compareAndSet(double expect, double update){
Double origVal, newVal; newVal = new Double(update);
while(true){
origVal = value.get(); if(Double.compare(origVal.doubleValue(), expect) == 0){
if(value.compareAndSet(origVal, newVal)){
return true;
}else{
return false;
}
}
}
} public boolean weakCompareAndSet(double expect, double update){
return compareAndSet(expect, update);
} public double getAndSet(double setVal){
Double origVal, newVal; newVal = new Double(setVal);
while(true){
origVal = value.get();
if(value.compareAndSet(origVal, newVal)){
return origVal.doubleValue();
}
}
} public double getAndAdd(double delta){
Double origVal, newVal; while(true){
origVal = value.get();
newVal = new Double(origVal.doubleValue() + delta);
if(value.compareAndSet(origVal, newVal)){
return origVal.doubleValue();
}
}
} public double addAndGet(double delta){
Double origVal, newVal; while(true){
origVal = value.get();
newVal = new Double(origVal.doubleValue() + delta);
if(value.compareAndSet(origVal, newVal)){
return newVal.doubleValue();
}
}
} public double getAndIncrement(){
return getAndAdd((double)1.0);
} public double getAndDecrement(){
return addAndGet((double)-1.0);
} public double incrementAndGet(){
return addAndGet((double)1.0);
} public double decrementAndGet(){
return addAndGet((double)-1.0);
} public double getAndMultiply(double multiple){
Double origVal, newVal; while(true){
origVal = value.get();
newVal = new Double(origVal.doubleValue() * multiple);
if(value.compareAndSet(origVal, newVal)){
return origVal.doubleValue();
}
}
} public double multiplyAndGet(double multiple){
Double origVal, newVal; while(true){
origVal = value.get();
newVal = new Double(origVal.doubleValue() * multiple);
if(value.compareAndSet(origVal, newVal)){
return newVal.doubleValue();
}
}
}
}

到现在为止,我们还只是对个别的变量做atomic地设定,还没有做到对一群数据atomic地设定。如果是这样,我们就必须通过创建封装这些要被变动值的对象来完成,之后这些值就可以通过atomic地变动对这些值的atomic引用来做到同时地改变。这样的运行方式其实和上面实现的AtomicDouble是一样的。

     这同样也是需要在值没有以任何方式直接改变的情况下才会有效。任何对数据对象的改变是通过改变引用到不同的对象上来完成的,也就是所谓的额数据交换,旧的对象值必须没有被变动过,不管是直接还是间接封装的值都必须是只读的才能让这个技巧有效运作。
      明白了这点,我们也清楚的知道了以高级atomic数据类型来执行大量的数据变更,我们会用到大量的临时对象,因为为了确保所有的操作是atomic的,我们必须在临时变量上来完成所有的计算,并且所有的值都是使用数据交换来做atomic地变更。实际上,对象的创建远比我们想象中要多得多:对每个事务动作都需要创建一个新的对象,每个atomic地比对和设定操作在失败而必须重新尝试的时候也需要创建新的对象。
      所以,我们必须明白一件事:使用atomic变量会让我们的代码非常复杂,我们必须在它与使用同步间取得平衡:是否可以接受所有临时对象的创建?此技巧是否比同步好?
      我们最后来讲一下Thread的局部变量。
      任何线程都可以在任意的时间定义该线程私有的局部变量,而其他线程可以定义相同的变量以创建该变量自有的拷贝。这就意味着线程的局部变量无法用在线程间共享状态,对某个线程私有的变量所做的改变并不会反映在其他线程所持有的拷贝上,但这意味着对该变量的访问觉不需要同步化,因为它不可能让多个线程同时访问。
      我们可以利用java.lang.ThreadLocal这个类来模型化:
public class ThreadLocal<T>{
protected T initialValue();
public T get();
public void set(T value);
public void remove();
}

一般情况下,我们都是subclass这个ThreadLocal并覆写initialValue()这个方法来返回应该在线程第一次访问此变量时返回的值。我们还可以通过继承自这个类来让子线程继承父线程的局部变量。

      事实上,ThreadLocal因为性能非常差的原因我们很少使用到,但是它在实现一些线程问题的时候还是非常有用的,像是Android的消息队列,就是使用到了这点。