在线程安全1中,我介绍了线程同步的意义和一种实现线程同步的方法:volatile。volatile关键字属于原子操作的一种,若对一个关键字使用volatile,很多时候会显得很“浪费”,因为只有在并发访问的情况下才需要“易变”读写,单线程访问时并不需要。在命名空间System.Threading命名空间中提供了InterLock类,该类中提供了一些原子方法。本文来介绍如何使用这些方法。
- 互锁
在抢占式系统中,一个线程在执行到任何阶段都有可能被其他线程“打断”,原子操作能够保证该操作不被其他线程打断,其他线程的“打断”只可能发生在该操作的之前或之后。InterLock类中的方法就是”原子“式的,最常用的方法有:
public static class InterLock{ // return (++location) public static int Increment(ref int location); // return(--location) public static int Decrement(ref int location); // return(location += value) //注意,也可能是负数,从而实现减法运算。 public static int Add(ref int location, int value) // int old = location; location = value; return old; public static int Exchange(ref int location, int value); //old = location1; //if(location1 = comparand) location1 = value; //return old; public static int CompareExchange(ref int location1, int value, int comparand); ... }
《CLR via C#》的作者在书中说他喜欢Interlocked中的方法,因为它不但很快,而且不会阻塞线程。为了验证InterLock真的很快,我们对变量进行一百万次的写,与Volatile.Write()来进行对比,看看是否真的快。
static void Main(string[] args){ TestInterLock(); Console.ReadLine(); } private static volatile int v = 0; private static int n = 0; static void TestInterLock(){ Stopwatch sw = Stopwatch.StartNew(); for (int i = 0; i < 1000000; i++) v++; Console.WriteLine("volatile write 1000000 times takes:{0}", sw.ElapsedMilliseconds); sw = Stopwatch.StartNew(); for (int i = 0; i < 1000000; i++) Interlocked.Increment(ref n); Console.WriteLine("InterLock write 1000000 times takes:{0}", sw.ElapsedMilliseconds); n = 0; sw = Stopwatch.StartNew(); for (int i = 0; i < 1000000; i++) n++; Console.WriteLine("n++ write 1000000 times takes:{0}", sw.ElapsedMilliseconds); }
运行结果如下:
volatile write 1000000 times takes:2
InterLock write 1000000 times takes:8
n++ write 1000000 times takes:2
我运行了好几次,结果会有些出入,但是大部分的结果都是volatile的写入速度和原生的n++的速度是一样的。InterLock也确实如Jeffrey Richter所说,很快,只是没有volatile关键字修饰的变量的读写快。这是在非并发情况下,下面来看一下并发情况下是否还是很快。
static void TestInterLock1(){ Stopwatch sw = Stopwatch.StartNew(); Task[] t2 = new[] { new Task(() => { for (int i = 0; i < 1000000; i++) Interlocked.Increment(ref n); }), new Task(() => { for (int i = 0; i < 1000000; i++) Interlocked.Increment(ref n);}) }; Task[] t1 = new[] { new Task(() => { for (int i = 0; i < 1000000; i++) v++; }), new Task(() => { for (int i = 0; i < 1000000; i++) v++;}) }; Task.WhenAll(t1) .ContinueWith(t => Console.WriteLine("volatile write 2000000 times takes:{0}", sw.ElapsedMilliseconds)); Task.WhenAll(t2) .ContinueWith(t => Console.WriteLine("InterLock write 2000000 times takes:{0}", sw.ElapsedMilliseconds)); t2[0].Start(); t1[0].Start(); t1[1].Start(); t2[1].Start(); }
先看运行结果,
volatile write 2000000 times takes:94
InterLock write 2000000 times takes:101
多次运行,运行时间会有不同,但是在并发情况下,volatile的写入和InterLock的写入速度几乎相同。上述代码写的如此丑陋,而不是直接写Task.Run(),是为了保证初始化部分都运行完成后,再Start(),且两个任务的先后顺序进行了打乱,最大限度减少误差。可以看到并发情况下,volatile和InterLock几乎一样,且在InterLock中的方法要比Volatile的功能要全,但是在串行时,Volatile的性能要比InterLock要好。结论是,若只对变量读写,没有替换或者其他复杂操作时,可以使用volatile关键字,但是一些复杂操作,需要原子操作时,就得使用InterLock中的方法了,如果使用volatile关键字修饰的变量来进行交换的话,很难保证原子性,只有引入锁才能保证线程同步。且InterLock中提供了几个重载方法,能够接受object类型,还有泛型版本。
可以利用InterLock来实现一个简单的自旋锁,代码如下:
public struct SimpleSpinLock{ private int m_Lock; public void Enter(){ while (true){ if (Interlocked.Exchange(ref m_Lock, 1) == 0) return; //此处可以添加黑科技 } } public void Leave(){ Volatile.Write(ref m_Lock, 0); } } //下面是如何使用SimpleSpinLock的例子 public class Simple{ private SimpleSpinLock m_lock = new SimpleSpinLock(); public void AccessResource(){ m_lock.Enter(); //执行某些程序,只有一个线程可以进入这里 m_lock.Leave(); } }
这个简单的自旋锁在一个线程调用Enter()时,其他线程在调用m_lock.Enter()方法时,if (Interlocked.Exchange(ref m_lock, 1) == 0)会失败,因为该方法会将m_lock和1交换,并返回旧值,在已有线程调用m_lock.Enter()时,m_lock的旧值是1,因此该方法会在whle(true)处自旋,不断尝试获得锁。该锁的问题是,该线程没有被阻塞(挂起),而是一直在占用CPU资源,其他需要CPU资源的线程无法运行(可以在while内,我加注释的地方,加入”黑科技“来尝试解决此问题。其思路是在线程自旋的过程中,立刻交出CPU资源,可通过调用Thread.Sleep(0)或者Thread.Yield()来实现。尝试获得锁的线程交出时间片,这样当前获得锁的线程能够有更多的资源来运行程序,从而运行结束并交出锁,具体细节在这里不展开)。因此,自旋锁只适合那些运行非常快的方法。
- Interlocked Anything 模式
Interlocked中全部是原子性操作,那是否提供了一个方法,该方法可以接受一个委托,保证该委托在运行时是原子的。答案是没有,但是可以利用Interlocked.CompareExchange来自己实现一个。我们先来看一下利用CompareExchange来实现原子性的Maximum方法。
public static int Maximum(ref int target, int value){ int currentValue = target, startValue, desireValue; do{ startValue = currentValue; //可以在此处添加任何想要保证“原子性”的操作,此处是求最大值。 desireValue = Math.Max(startValue, value); //注意,此处有可能被其他线程抢占,也有可能target的值被修改,因此if()语句会出问题,要使用Interlocked的方法。 //if (startValue = target) target = desireValue; currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue); } //若在此时target已被修改,则重新计算最大值 while (startValue != currentValue); return currentValue; }
此方法的思路是:从CPU将target的值读到寄存器中,到计算最大值结束,期间的任何时间target都有可能被其他线程修改。因此保证原子性就被转换成保证计算最大值时,target的值没有变过,如果变过,就重新计算。因此,在最开始的时候,startValue=currentValue,currentValue是开始计算时target的值。然后求得最大值,并保存到desireValue中。注意,此时target有可能被修改,因此调用CompareExchange方法,该方法会将target与startValue比较,如果此时两值相等,那相当于我之前说的,target从开始到最后没有改变,那么这个最大值是准确的,并将target的旧值付给currentValue,最后如果startValue==currentValue,则计算完成,否则继续循环。
《CLR via C#》的作者Jeff很喜欢上面的方法,他在实际开发中,都是使用上面的方法,并对其进行了包装,使之能够支持Interlocked Anything。其原理就是在desireValue=Math.Max()处替换成其他方法,只要在返回结果时保证旧值和最开始读取的值一致就可以。我们来看一下他的封装:
delegate int Morpher<TResult, in TArgument>(int startValue, TArgument argument, out TResult result); static TResult Morph<TResult, TArgumen>(ref int target, TArgumen argumen, Morpher<TResult, TArgumen> morpher){ TResult mophorResult; int currentValue = target, desireValue, startValue; do{ startValue = currentValue; desireValue = morpher(startValue, argumen, out mophorResult); currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue); } while (currentValue != startValue); return mophorResult; }
说实话,我并不能非常好了理解这个封装,并不是不能理解做法,而是不能确定此方法到底能不能实现效果,那我们来测试一下。测试的基本思路是对一个变量执行1000次的result+=10。分别是不带线程同步的和利用Morph方法对result+=10的方法进行互锁,保证其原子性。我省去了Morpher和Morph的声明部分。之所以要在DelayAdd和DelayAdd1方法中调用ThreadSleep(20),是为了模拟当在运行较长的方法时,Morph方法是否还能够保证该方法运行的原子性。
static void Main(string[] args){ Test(new TestAction(Add), "Add"); Test(new TestAction(MorphAdd), "MorphAdd"); Console.ReadLine(); } //DelayAdd1的变种,使之能够符合Morpher的签名 static int DelayAdd(int startValue, int argument, out int result){ Thread.Sleep(20); result = startValue + argument; return result; } static void DelayAdd1(int argument, ref int result){ Thread.Sleep(20); result += argument; } //测试的不具有线程同步的方法。 static void Add(ref int result){ DelayAdd1(10, ref result); } //具有线程同步的方法。 static void MorphAdd(ref int result){ Morph(ref result, 10, new Morpher<int, int>(DelayAdd)); } //要测试的委托签名 delegate void TestAction(ref int result); //公共测试方法 static void Test(TestAction action, string actionName){ int result = 0; var tList = new Task[1000]; for (int i = 0; i < 1000; i++) tList[i] = Task.Run(() => { action(ref result); }); Task.WhenAll(tList).GetAwaiter().OnCompleted( () => Console.WriteLine("{0}, Result is {1}", actionName, result)); }
运行,得到的结果是:
Add, Result is 8440
MorphAdd, Result is 10000
运行1000次result += 10,普通的Add不能够得到正确结果,但是MorphAdd可以。这是因为在1000次的Add中,某几个Add是同时调用的,result+=10在同一时间调用了多次,因此有156次的Add因为并行而被吞噬了。值得注意的是,MorphAdd方法因为需要线程同步,因此执行时间要慢很多。但是这些付出是值得的,因为这保证了结果的正确。
上述例子证明了Morph方法能够保证委托的原子性,且该方法既不会阻塞线程也不会长时间的自旋,推荐大家在实际中使用该方法。
本文中,我先介绍了Interlocked类中较常用的方法,以及Interlocked.Increment()方法与volatile关键字的对比,结论是虽然将变量设置为所有读写都是“易变的”看起来很浪费,但是该关键字能够保证在单线程时几乎没有性能损失,大部分情况下和原生的读写是一样的速度,且volatile比Interlocked类中提供的写要快2-4倍,但是在并发状态下其性能和volatile关键字是没有差别的。之后我介绍了用Interlocked类中的方法来实现简单的自旋锁,该锁的优点是在非竟态情况下非常快,但是在竟态情况下,未获得锁的线程会一直处于自旋状态,白白浪费CPU。最后介绍了《CLR via C#》书中提到的Interlocked Anything的方法(文中其他的知识点大多也是提取自《CLR via C#》),并测试了该方法确实可以保证委托的原子性,且不会阻塞线程,没有锁,不会造成死锁。至此线程同步中互锁构造就讲完了,后面我会给大家介绍内核构造的信号量和其他锁。