【C#】C#线程_基元线程的同步构造

时间:2022-02-09 19:31:34

目录结构:

contents structure [+]

1.简介

1.1 为什么需要使用线程同步

当多个线程同步访问共享数据时,可能会出现数据损坏的现象,使用线程同步可以防止线程损坏。虽然线程防止了数据的损坏,然而这并不是没有代价的。如果一些数据由两个线程同时访问,那么那些不可能访问到这些数据的线程就更本不需要进行线程同步。

1.2 线程同步的缺点

不需要线程同步是最理想的情况,因为线程同步存在许多问题。

第一,线程同步比较繁琐,而且容易写错。在代码中必须标识出所有可能会被多个线程访问的数据,并且用额外的代码包围起来,并获取和释放一个同步锁。
第二,线程同步会损害性能。获取和释放锁都是需要时间的,因为要调用一些额外的方法,而且不同的CPU必须进行协调,以决定哪个线程先取得锁。
第三,线程同步通常只允许一个线程访问资源,这是同步锁的意义所在,但也是问题所在,因为阻塞一个线程通常会创建更多的线程。假如,一个线程池线程试图获取一个暂时无法获取到的锁,线程池就有可能会创建新的线程(如果线程池还有其他的任务需要执行),使CPU继续执行其他的任务。如果创建了大量的额外线程,那么还会造成CPU频繁切换上下文,更影响性能(这一点也是为什么要异步函数的原因)。

线程同步是一件不好的事,所以在设计应用程序时候,应该尽量避免使用线程同步。如果一个共享数据必须要由多个线程访问,那么可以考虑使用值类型,因为值类型总是被复制,每个操作线程都有其副本。

2 基元线程同步

2.1 什么是基元线程同步

这里解释一下什么基元,基元就是指可以在代码中使用的最简单的构造。基元线程同步就是指在线程中使用最简单的同步方式构造线程同步。基元有两种构造模式:用户模式构造和内核模式构造。

2.2 基元用户模式构造和内核模式构造的比较

这两种构造模式中,基元用户模式的构造明显比内核模式构造的效率高。下面将会分析其优缺点。

基元用户模式构造,优点:使用特殊的CPU指令来调节线程,并且在用户模式的基元构造上阻塞的线程池永远不会认为其阻塞,所以线程池不会创建新的线程来替换临时的阻塞,同时,这些CPU指令只阻塞相当短的一段时间。缺点:只有Windows操作系统内核(内核是操作系统最基本的部分)才能停止一个线程的运行,然而在用户模式中运行的线程可能会被抢占,所以想要取得资源但是取不到资源的线程,就可能会一直“自旋”,这可能会浪费大量的CPU时间。

内核模式构造,优点:当线程通过内核模式获取其他线程的资源时,如果资源不可用,内核会阻止这个线程使其阻塞(这样就不会浪费CPU时间),当资源可用时候,Windows内核会恢复这个线程,允许它访问资源。缺点:因为内核模式构造是由Windows操作系统自己提供的函数来实现的,所以,当应用程序调用Windows自身的函数时,线程会从用户模式切换为内核模式(或相反)会导致巨大的性能损失,这是正是为什么要避免使用内核模式的原因。

如果能够集合内核模式和基元用户模式构造的有点创建一个新的构造模式就好了,这种构造模式确实存在,其名为“混合构造”。本文重点是介绍基元线程的构造。混合构造将会留在后面的文章中介绍。

3.用户模式构造

Windows保证对以下数据类型的变量读写是原子性的:Boolean,Char,(S)Byte,(U)Int16,(U)Int32,(U)IntPtr,Single以及引用类型。这意味着变量中的所有字节都是一次性读取或写入。
例如:

Int32 x=0;

如果有个线程执行如下代码:

x=0x012345;

那么线程会一次性(原子性)地从0变成0x012345。另一个线程不可能看到处于中间状态的值。

.NET Framework提供了如下的简单用户模式构造:易变构造和互锁构造。下面将会一一介绍:

3.1 易变构造(Volatile)

先来看看如下的代码:

    internal static class StrangeBehaviour {
        private static volatile Boolean s_stopWorker = false;

        public static void Main(String[] args) {
            Console.WriteLine("Main: letting worker run for 5 seconds");
            Thread t = new Thread(Worker);
            t.Start();
            Thread.Sleep(5000);
            s_stopWorker = true;
            Console.WriteLine("Main:waiting for worker to stop");
            t.Join();
            Console.ReadLine();
        }

        private static void Worker(Object o) {
            Int32 x = 0;
            while (!s_stopWorker) x++;
            Console.WriteLine("Worker: stopped when x={0}",x);
        }
    }

如果以调试方式的运行(或是不加上优化运行),那么程序将会按照期望的那样结束。如果加上优化的话,那么结局将会不同,打开C#编译器的 platform:x86 和 /optimize+ 开关来编译,然后运行,会发现程序一直运行,并不会停止。
接下来解释其过程,当Worker方法开始工作时,s_stopWorker是false,程序进入循环,在5秒钟后,主线程改变s_stopWorker为true,所以Worker方法中的线程应该立即停止。然而,因而加了优化,所以每次当while(!s_stopWorker)执行时,并不会去检查s_stopWorker的真实值,只检查一次(也就是在循环开始的时候),之后其会记住s_stopWorker的值,本例中为false,所以程序会一直执行。

System.Threading.Volatile类提供了如下类似静态方法(有许多的重载方法):

public static class Volatile{
 public static void Write(ref Int32 location,Int32 value)
 public static Int32 Read(ref Int32 location)
 public static void Write(ref Boolean location,Boolean value)
 public static Int32 Read(ref Boolean location)
 ....
}

他们会禁止C#编译器,JIT编译器和CPU平常执行的一些优化。他们总是对变量本身进行直接操作,而不是对其中间值进行操作。换句话说,使用Volatile对字段操作,总是直接对RAM进行操作,而不会CPU寄存器进行操作(不使用CPU寄存器,速度肯定会比使用CPU寄存器慢)。
可以像这样重写上面的方法:

    internal static class StrangeBehaviour {
        private static volatile Boolean s_stopWorker = false;

        public static void Main(String[] args) {
            Console.WriteLine("Main: letting worker run for 5 seconds");
            Thread t = new Thread(Worker);
            t.Start();
            Thread.Sleep(5000);
            Volatile.Write(ref s_stopWorker,true);
            Console.WriteLine("Main:waiting for worker to stop");
            t.Join();
            Console.ReadLine();
        }

        private static void Worker(Object o) {
            Int32 x = 0;
            while (!Volatile.Read(ref s_stopWorker)) x++;
            Console.WriteLine("Worker: stopped when x={0}",x);
        }
    }

除了可以使用Volatile的Read和Write方法对线程共享数据进行操作,也可以使用volatile关键字,使用volatile关键字修饰字段时候,告诉编译器读取数据时候总是以Volatile.Read方式读取,写数据时总是以Volatile.Write方法写入。

3.2 互锁构造(Interlocked)

我们已经知道了Volatile的Read和Write操作是分别执行一次原子性的读取和写入操作。Interlocked也是一种可以用来执行原子操作的基元用户模式。
下面代码用来演示Interlocked的方法异步查询几个Web服务器,并同时处理返回的数据。下面的代码很短,而且不会阻塞任何线程:
MutiWebRequest类:

   class MutiWebRequest {
        //辅助类用于协调所有的异步操作
        private AsyncCoordinator m_ac = new AsyncCoordinator();

        //这是想要查询的Web服务器及其响应(异常或Int32)的集合
        //注意:多个线程访问该字典不需要以同步方式进行,
        //因为构造后键是只读的
        private Dictionary<String, Object> m_servers = new Dictionary<string, object>() {
            {"https://www.wintellect.com/",null},
            {"http://Microsoft.com",null},
            {"http://baidu.com",null}
        };

        public MutiWebRequest(Int32 timeout = Timeout.Infinite)
        {
            //以异步方式发起一次性的所有请求
            var http=new HttpClient();
            foreach(var server in m_servers.Keys){
                m_ac.AboutToBegin();
                http.GetByteArrayAsync(server).ContinueWith(task => {
                    ComputeResult(server,task);
                }); ;
            }
            //告诉AsyncCoordinator所有操作都已发起,并在所有操作完成、
            //调用Cancel或发生超时的时候调用AllDone
            m_ac.AllBegun(AllDown, timeout);
        }

        private void ComputeResult(String server, Task<Byte[]> task) {
            Object result;
            if (task.Exception != null)
            {
                result = task.Exception.InnerException;
            }
            else {
                //在线程池上处理I/O完成
                //在此添加自己的计算密集型算法....
                result = task.Result.Length;//本例只返回长度
            }

            //保存结果
            m_servers[server] = result;
            m_ac.JustEnded();
        }

        //调用这个方法指出结果以无关紧要
        public void Cancel() { m_ac.Cancel(); }

        //所有Web服务器都已经响应,调用了Cancel或者发生了超时
        private void AllDown(CoordinationStatus status) {
            switch (status) {
                case CoordinationStatus.Cancel:
                    Console.WriteLine("operation canceled");
                    break;
                case CoordinationStatus.Timeout:
                    Console.WriteLine("operation timeout");
                    break;
                case CoordinationStatus.AllDown:
                    Console.WriteLine("operation completed; result below:");
                    foreach (var server in m_servers) {
                        Console.Write("{0}",server.Key);
                        Object result = server.Value;
                        if (result is Exception)
                        {
                            Console.WriteLine("failed due to {0}.", result, GetType().Name);
                        }
                        else {
                            Console.WriteLine("returned {0:N0} bytes.",result);
                        }
                    }
                    break;
            }
        }
    }

CoordiationStatus枚举:

enum CoordinationStatus{AllDown,Timeout,Cancel}

AsyncCoordinator类:

    class AsyncCoordinator {
        private Int32 m_opCount = 1;
        private Int32 m_statusReported = 0;
        private Action<CoordinationStatus> m_callback = null;
        private System.Threading.Timer m_timer = null;

        //该方法在一个操作发起之前调用
        public void AboutToBegin(Int32 opsToAdd = 1) {
            Interlocked.Add(ref m_opCount,opsToAdd);
        }

        //该方法在一个操作发起之后调用
        public void JustEnded() {
            if (Interlocked.Decrement(ref m_opCount) == 0) {
                ReportStatus(CoordinationStatus.AllDown);
            }
        }

        //该方法在所有操作发起后调用
        public void AllBegun(Action<CoordinationStatus> callback,Int32 timeout=Timeout.Infinite) {
            m_callback = callback;
            if (timeout != Timeout.Infinite) {
                m_timer = new System.Threading.Timer(TimerExpired, null, timeout, Timeout.Infinite);
            }
            JustEnded();
        }

        public void TimerExpired(Object o) {
            ReportStatus(CoordinationStatus.Timeout);
        }

        public void Cancel() {
            ReportStatus(CoordinationStatus.Cancel);
        }
        private void ReportStatus(CoordinationStatus status) {
            //如果状态从未报告过,就报告
            if (Interlocked.Exchange(ref m_statusReported,1) == 0) {
                m_callback(status);
            }
        }
    }

3.3 线程简单的自旋锁

Interlocked的方法非常好用,但主要用于操作Int32值。如果需要原子性地操作类对象中的一组字段,该怎么办呢?在这种情况下,需要采用一个方法阻止所有线程,只允许其中一个线程进入并对字段进行操作。
例如:

internal struct SimpleSpinLock{
    private Int32 m_ResourceInUse;//0=false(HttpClient),1=true
    public void Enter(){
        while(true){
            //总是将资源设置为“正在使用”(1)
            //只有从“未使用”变成“正在使用”才会返回1
            if(Interlocked.Exchange(ref m_ResourceInUse,1)==1) return;
            //在这里添加“黑科技”
        }
    }
    public void Leave(){
        //将资源标记为“未使用”
        Volatile.Write(ref m_ResourceInUse,0);
    }
}

下面展示了如何使用

public sealed class SomeResource{
    private SimpleSpinLock m_sl=new SimpleSpinLock();

    public void AccessResource(){
        m_sl.Enter();
        //一次只有一个线程进入
        m_sl.Leave();
    }
}

上面封装的自旋锁SimpleSpinLock并不是很好,当一个线程未获取到资源后,就进入自旋,直到其他资源变得可用。看上面的SimpleSpinLock可以知道,如果一个线程未获取到资源,就会进入疯狂自旋模式,将会耗费大量的CPU时间。

3.3.1 SpinLock自旋锁

上面的SimpleSpinLock提供了一种简单的自旋锁模式,SimpleSpinLock并未加什么特殊的而优化。幸运的是,在FCL中已经给我们提供好了一个System.Threading.SpinWait的结构,它封装了上面SimpleSpinLock中关于“黑科技”的最新研究。FCL还包含了System.Threading.SpinLock结构,它和上面的SimpleSpinLock结构相似,只是使用了SpinWait来增强性能。SpinWait还提供了超时支持。而且SpinWait和SpinLock都是值类型,因此它是轻量级的、内存友好的对象。

3.4 Interlocked Anything模式

Interlocked类并没有提供一组丰富的操作方法,比如Mutilple,Divide,Minimun,Maximum...方法。虽然Interlocked并没有提供这些方法,但一个已知的模式是Interlocked.CompareExchange方法,以原子的方式在Int32上执行任何操作。该方法还提供了丰富的重载版本。该模式类似于修改数据库记录时使用的乐观并发模式。

例如下面是一个原子的Maximun方法:

        public static Int32 Maximum(ref Int32 target, Int32 value) {
            Int32 currentVal = target, startVal, desireVal;

            do{
                //记录这一次循环迭代的起始值(startVal)
                startVal=currentVal;

                desireVal=Math.Max(startVal,value);

                //注意:线程在这里可能会被抢占,所以以下代码不是原子的
                //if(target==startVal) target=desireVal;

                //应该使用下面原子的CompareExchange方法,它返回target在被方法修改之前的值
                currentVal=Interlocked.CompareExchange(ref target,desireVal,startVal);
                
                //如果target的值在这一次循环中被其他线程改变,就重复
            }while(startVal!=currentVal);


            return desireVal;
        }

下面是一个泛型方法Morph来封装这个Interlocked.CompareExchange模式,使其实用性更强:

        delegate Int32 Morpher<TResult, TArgument>(Int32 startValue, TArgument argument, out TResult morpherResult);

        static TResult Morph<TResult, TArgument>(ref Int32 target, TArgument argument, Morpher<TResult, TArgument> morpher)
        {
            TResult morphResult;
            Int32 currentVal = target, startVal, desiredVal;
            do{
                startVal=currentVal;
                desiredVal=morpher(startVal,argument,out morphResult);
                currentVal=Interlocked.CompareExchange(ref target,desiredVal,startVal);
            }while(startVal!=currentVal);

            return morphResult;
        }

4.内核模式构造

4.1 EventWaitHandle构造

Event其实只是由内核维护的Boolean变量。事件为false,在事件上等待的线程就阻塞;事件为True就解除阻塞。有两种事件,自己重置事件和手动重置事件。当一个自己重置事件为true时,它只唤醒一个阻塞的线程,因为在解除第一个线程的阻塞后,内核自动将事件重置为false,造成其他线程阻塞。而手动重置事件为true时,它解除正在等待它的所有线程的线程,因为内核不将事件自动重置为false;

4.2 Semaphore构造

System.Threading.Semaphore限制对线程的资源访问的数量。Semaphore(信号量)其实就是由内核维护的Int32变量。当信号量为0时,在信号量上等待的线程会阻塞;信号量大于0时候,解除阻塞。在信号量上等待的线程解除阻塞时,内核自动从信号量的计数中减1。信号量还关联了一个最大的Int32值,当前计数绝不允许超过最大计数。

4.3 Mutex构造

Mutex(互斥体)代表一个互斥的锁。它的工作方式和AutoResetEvent(或者计数为1的Semaphore相似),三者都是一次只能释放一个正在等待的线程。

互斥体有一些逻辑,这造成他们比其他构造复杂。首先Mutex对象会调查调用线程的Int32 ID,记录时那个线程获得了它。一个线程在调用ReleaseMutex时,Mutex确保调用线程就是获取Mutex的那个线程。如若不然,Mutex对象的状态就不会改变,而ReleaseMutex会抛出一个System.ApplicationException。

5.基元用户模式和内核模式性能比较

在上面的笔者已经说过基元用户模式的性能要优于内核模式,接下来笔者选择基元用户模式中的System.Threading.SpinLock自旋锁,和内核模式中的System.Threading.AutoResetEvent事件锁进行比较。
 

   class Program
    {
        static void Main(string[] args)
        {
            Int32 x = 0;
            const Int32 iterations = 10000000;

            Stopwatch sw = new Stopwatch();
            //X递增1000万次
            for (Int32 i = 0; i < iterations; i++)
            {
                x++;
            }
            Console.WriteLine("Incrementing x : {0:N}",sw.ElapsedMilliseconds);

            x = 0;
            sw.Restart();
            //X递增1000万次,加上调用一个什么都不做方法的开销
            for (Int32 i = 0; i < iterations; i++) {
                M(); x++; M();
            }
            Console.WriteLine("Incrementing x in M :{0:N}",sw.ElapsedMilliseconds);

            x = 0;
            SpinLock sl = new SpinLock();
            Boolean locktoken = false;
            sw.Restart();
            //X递增1000万次,加上调用一个无竞争的SpinLock开销
            for (Int32 i = 0; i < iterations; i++)
            {
                locktoken = false;
                sl.Enter(ref locktoken); x++; sl.Exit();
            }
            Console.WriteLine("Incrementing x in SpinLock : {0:N}", sw.ElapsedMilliseconds);

            x = 0;
            AutoResetEvent autoresetevent = new AutoResetEvent(true);
            sw.Restart();
            //X递增1000万次,加上调用一个无竞争的AutoResetEvent开销
            for (Int32 i = 0; i < iterations; i++) {
                autoresetevent.WaitOne(); x++; autoresetevent.Set();
            }
            Console.WriteLine("Incrementing x in AutoResetEvent : {0:N}", sw.ElapsedMilliseconds);

            Console.ReadLine();
        }
        static void M() {
            //该方法什么都不做
        }
    }
//笔者平台的结果如下:
            ///result:
            ///Incrementing x : 0.00
            ///Incrementing x in M :60.00
            ///Incrementing x in SpinLock : 2,537.00
            ///Incrementing x in AutoResetEvent : 12,757.00 

单纯的递增几乎不需要花费事件,但是在调用前后加上一个空方法就要将近多花费60倍的时间,使用基元用户模式的自旋锁就要将近多花费42(6537/60)倍,如果使用内核模式又要多花费5(12757/2537)倍的时间。