C#多线程及同步示例简析

时间:2021-10-12 07:55:11

       60年代,在os中能拥有资源和独立运行的基本单位是进程,然而随着计算机技术的发展,进程出现了很多弊端,一是由于进程是资源拥有者,创建、撤消与切换存在较大的时空开销,因此需要引入轻型进程;二是由于对称多处理机(smp)出现,可以满足多个运行单位,而多个进程并行开销过大。
因此在80年代,出现了能独立运行的基本单位——线程(threads)。
       线程,有时被称为轻量级进程(lightweight process,lwp),是程序执行流的最小单元。一个标准的线程由线程id,当前指令指针(pc),寄存器集合和堆栈组成。另外,线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。由于线程之间的相互制约,致使线程在运行中呈现出间断性。线程也有就绪、阻塞和运行三种基本状态。就绪状态是指线程具备运行的所有条件,逻辑上可以运行,在等待处理机;运行状态是指线程占有处理机正在运行;阻塞状态是指线程在等待一个事件(如某个信号量),逻辑上不可执行。每一个程序都至少有一个线程,若程序只有一个线程,那就是程序本身。
       线程是程序中一个单一的顺序控制流程。进程内一个相对独立的、可调度的执行单元,是系统独立调度和分派cpu的基本单位指运行中的程序的调度单位。在单个程序中同时运行多个线程完成不同的工作,称为多线程。

C#多线程及同步示例简析

一、线程简义

1、进程与线程:进程作为操作系统执行程序的基本单位,拥有应用程序的资源,进程包含线程,进程的资源被线程共享,线程不拥有资源。

2、前台线程和后台线程:通过thread类新建线程默认为前台线程。当所有前台线程关闭时,所有的后台线程也会被直接终止,不会抛出异常。

3、挂起(suspend)和唤醒(resume):由于线程的执行顺序和程序的执行情况不可预知,所以使用挂起和唤醒容易发生死锁的情况,在实际应用中应该尽量少用。

4、阻塞线程:join,阻塞调用线程,直到该线程终止。

5、终止线程:abort:抛出 threadabortexception 异常让线程终止,终止后的线程不可唤醒。interrupt:抛出 threadinterruptexception 异常让线程终止,通过捕获异常可以继续执行。

6、线程优先级:abovenormal belownormal highest lowest normal,默认为normal。

二、线程的使用

线程函数通过委托传递,可以不带参数,也可以带参数(只能有一个参数),可以用一个类或结构体封装参数。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
namespace test
{
  class program
  {
    static void main(string[] args)
    {
      thread t1 = new thread(new threadstart(testmethod));
      thread t2 = new thread(new parameterizedthreadstart(testmethod));
      t1.isbackground = true;
      t2.isbackground = true;
      t1.start();
      t2.start("hello");
      console.readkey();
    }
 
    public static void testmethod()
    {
      console.writeline("不带参数的线程函数");
    }
 
    public static void testmethod(object data)
    {
      string datastr = data as string;
      console.writeline("带参数的线程函数,参数为:{0}", datastr);
    }
  }
}

三、线程池

由于线程的创建和销毁需要耗费一定的开销,过多的使用线程会造成内存资源的浪费,出于对性能的考虑,于是引入了线程池的概念。线程池维护一个请求队列,线程池的代码从队列提取任务,然后委派给线程池的一个线程执行,线程执行完不会被立即销毁,这样既可以在后台执行任务,又可以减少线程创建和销毁所带来的开销。

线程池线程默认为后台线程(isbackground)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class program
  {
    static void main(string[] args)
    {
      //将工作项加入到线程池队列中,这里可以传递一个线程参数
      threadpool.queueuserworkitem(testmethod, "hello");
      console.readkey();
    }
 
    public static void testmethod(object data)
    {
      string datastr = data as string;
      console.writeline(datastr);
    }
  }

四、task类

使用threadpool的queueuserworkitem()方法发起一次异步的线程执行很简单,但是该方法最大的问题是没有一个内建的机制让你知道操作什么时候完成,有没有一个内建的机制在操作完成后获得一个返回值。为此,可以使用system.threading.tasks中的task类。

构造一个task<tresult>对象,并为泛型tresult参数传递一个操作的返回类型。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class program
  {
    static void main(string[] args)
    {
      task<int32> t = new task<int32>(n => sum((int32)n), 1000);
      t.start();
      t.wait();
      console.writeline(t.result);
      console.readkey();
    }
 
    private static int32 sum(int32 n)
    {
      int32 sum = 0;
      for (; n > 0; --n)
        checked{ sum += n;} //结果太大,抛出异常
      return sum;
    }
  }

一个任务完成时,自动启动一个新任务。

一个任务完成后,它可以启动另一个任务,下面重写了前面的代码,不阻塞任何线程。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class program
  {
    static void main(string[] args)
    {
      task<int32> t = new task<int32>(n => sum((int32)n), 1000);
      t.start();
      //t.wait();
      task cwt = t.continuewith(task => console.writeline("the result is {0}",t.result));
      console.readkey();
    }
 
    private static int32 sum(int32 n)
    {
      int32 sum = 0;
      for (; n > 0; --n)
        checked{ sum += n;} //结果溢出,抛出异常
      return sum;
    }
  }

五、委托异步执行

委托的异步调用:begininvoke() 和 endinvoke()

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public delegate string mydelegate(object data);
  class program
  {
    static void main(string[] args)
    {
      mydelegate mydelegate = new mydelegate(testmethod);
      iasyncresult result = mydelegate.begininvoke("thread param", testcallback, "callback param");
 
      //异步执行完成
      string resultstr = mydelegate.endinvoke(result);
    }
 
    //线程函数
    public static string testmethod(object data)
    {
      string datastr = data as string;
      return datastr;
    }
 
    //异步回调函数
    public static void testcallback(iasyncresult data)
    {
      console.writeline(data.asyncstate);
    }
  }

六、线程同步

1)原子操作(interlocked):帮助保护免受计划程序切换上下文时某个线程正在更新可以由其他线程访问的变量或者在单独的处理器上同时执行两个线程就可能出现的错误。 此类的成员不会引发异常。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class program
  {
    static int counter = 1;
 
    static void main(string[] args)
    {
      thread t1 = new thread(new threadstart(f1));
      thread t2 = new thread(new threadstart(f2));
 
      t1.start();
      t2.start();
 
      t1.join();
      t2.join();
 
      system.console.readkey();
    }
 
    static void f1()
    {
      for (int i = 0; i < 5; i++)
      {
        interlocked.increment(ref counter);
        system.console.writeline("counter++ {0}", counter);
        thread.sleep(10);
      }
    }
 
    static void f2()
    {
      for (int i = 0; i < 5; i++)
      {
        interlocked.decrement(ref counter);
        system.console.writeline("counter-- {0}", counter);
        thread.sleep(10);
      }
    }
  }

2)lock()语句:避免锁定public类型,否则实例将超出代码控制的范围,定义private对象来锁定。而自定义类推荐用私有的只读静态对象,比如:private static readonly object obj = new object();为什么要设置成只读的呢?这时因为如果在lock代码段中改变obj的值,其它线程就畅通无阻了,因为互斥锁的对象变了,object.referenceequals必然返回false。array 类型提供 syncroot。许多集合类型也提供 syncroot。

3)monitor实现线程同步

通过monitor.enter() 和 monitor.exit()实现排它锁的获取和释放,获取之后独占资源,不允许其他线程访问。

还有一个tryenter方法,请求不到资源时不会阻塞等待,可以设置超时时间,获取不到直接返回false。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void monitorsomething()
    {
      try
      {
        monitor.enter(obj);
        dosomething();
      }
      catch(exception ex)
      {
        
      }
      finally
      {
        monitor.exit(obj);
      }
    }

4)readerwriterlock

当对资源操作读多写少的时候,为了提高资源的利用率,让读操作锁为共享锁,多个线程可以并发读取资源,而写操作为独占锁,只允许一个线程操作。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class synchronizedcache
  {
    private readerwriterlockslim cachelock = new readerwriterlockslim();
    private dictionary<int, string> innercache = new dictionary<int, string>();
 
    public string read(int key)
    {
      cachelock.enterreadlock();
      try
      {
        return innercache[key];
      }
      finally
      {
        cachelock.exitreaderlock();
      }
    }
 
    public void add(int key, string value)
    {
      cachelock.enterwritelock();
      try
      {
        innercache.add(key, value);
      }
      finally
      {
        cachelock.exitwritelock();
      }
    }
 
    public bool addwithtimeout(int key, string value, int timeout)
    {
      if (cachelock.tryenterwritelock(timeout))
      {
        try
        {
          innercache.add(key, value);
        }
        finally
        {
          cachelock.exitreaderlock();
        }
        return true;
      }
      else
      {
        return false;
      }
    }
 
    public addorupdatestatus addorupdate(int key, string value)
    {
      cachelock.enterupgradeablereadlock();
      try
      {
        string result = null;
        if (innercache.trygetvalue(key, out result))
        {
          if (result == value)
          {
            return addorupdatestatus.unchanged;
          }
          else
          {
            cachelock.enterwritelock();
            try
            {
              innercache[key] = value;
            }
            finally
            {
              cachelock.exitwritelock();
            }
            return addorupdatestatus.updated;
          }
        }
        else
        {
          cachelock.enterwritelock();
          try
          {
            innercache.add(key, value);
          }
          finally
          {
            cachelock.exitwritelock();
          }
          return addorupdatestatus.added;
        }
      }
      finally
      {
        cachelock.exitupgradeablereadlock();
      }
    }
 
    public void delete(int key)
    {
      cachelock.enterwritelock();
      try
      {
        innercache.remove(key);
      }
      finally
      {
        cachelock.exitwritelock();
      }
    }
 
    public enum addorupdatestatus
    {
      added,
      updated,
      unchanged
    };
  }

5)事件(event)类实现同步

事件类有两种状态,终止状态和非终止状态,终止状态时调用waitone可以请求成功,通过set将时间状态设置为终止状态。

 1).autoresetevent(自动重置事件)

 2).manualresetevent(手动重置事件)

 autoresetevent和manualresetevent这两个类经常用到, 他们的用法很类似,但也有区别。set方法将信号置为发送状态,reset方法将信号置为不发送状态,waitone等待信号的发送。可以通过构造函数的参数值来决定其初始状态,若为true则非阻塞状态,为false为阻塞状态。如果某个线程调用waitone方法,则当信号处于发送状态时,该线程会得到信号, 继续向下执行。其区别就在调用后,autoresetevent.waitone()每次只允许一个线程进入,当某个线程得到信号后,autoresetevent会自动又将信号置为不发送状态,则其他调用waitone的线程只有继续等待.也就是说,autoresetevent一次只唤醒一个线程;而manualresetevent则可以唤醒多个线程,因为当某个线程调用了manualresetevent.set()方法后,其他调用waitone的线程获得信号得以继续执行,而manualresetevent不会自动将信号置为不发送。也就是说,除非手工调用了manualresetevent.reset()方法,则manualresetevent将一直保持有信号状态,manualresetevent也就可以同时唤醒多个线程继续执行。

6)信号量(semaphore)

信号量是由内核对象维护的int变量,为0时,线程阻塞,大于0时解除阻塞,当一个信号量上的等待线程解除阻塞后,信号量计数+1。

线程通过waitone将信号量减1,通过release将信号量加1,使用很简单。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public thread thrd;
    //创建一个可授权2个许可证的信号量,且初始值为2
    static semaphore sem = new semaphore(2, 2);
 
    public mythread(string name)
    {
      thrd = new thread(this.run);
      thrd.name = name;
      thrd.start();
    }
    void run()
    {
      console.writeline(thrd.name + "正在等待一个许可证……");
      //申请一个许可证
      sem.waitone();
      console.writeline(thrd.name + "申请到许可证……");
      for (int i = 0; i < 4 ; i++)
      {
        console.writeline(thrd.name + ": " + i);
        thread.sleep(1000);
      }
      console.writeline(thrd.name + " 释放许可证……");
      //释放
      sem.release();
    }
  }
 
  class mysemaphore
  {
    public static void main()
    {
      mythread mythrd1 = new mythread("thrd #1");
      mythread mythrd2 = new mythread("thrd #2");
      mythread mythrd3 = new mythread("thrd #3");
      mythread mythrd4 = new mythread("thrd #4");
      mythrd1.thrd.join();
      mythrd2.thrd.join();
      mythrd3.thrd.join();
      mythrd4.thrd.join();
    }
  }

7)互斥体(mutex)

独占资源,可以把mutex看作一个出租车,乘客看作线程。乘客首先等车,然后上车,最后下车。当一个乘客在车上时,其他乘客就只有等他下车以后才可以上车。而线程与c# mutex对象的关系也正是如此,线程使用mutex.waitone()方法等待c# mutex对象被释放,如果它等待的c# mutex对象被释放了,它就自动拥有这个对象,直到它调用mutex.releasemutex()方法释放这个对象,而在此期间,其他想要获取这个c# mutex对象的线程都只有等待。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class test
  {
    /// <summary>
    /// 应用程序的主入口点。
    /// </summary>
    [stathread]
    static void main(string[] args)
    {
      bool flag = false;
      system.threading.mutex mutex = new system.threading.mutex(true, "test", out flag);
      //第一个参数:true--给调用线程赋予互斥体的初始所属权
      //第一个参数:互斥体的名称
      //第三个参数:返回值,如果调用线程已被授予互斥体的初始所属权,则返回true
      if (flag)
      {
        console.write("running");
      }
      else
      {
        console.write("another is running");
        system.threading.thread.sleep(5000);//线程挂起5秒钟
        environment.exit(1);//退出程序
      }
      console.readline();
    }
  }

8)跨进程间的同步

通过设置同步对象的名称就可以实现系统级的同步,不同应用程序通过同步对象的名称识别不同同步对象。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static void main(string[] args)
    {
      string mutexname = "interprocesssyncname";
      mutex syncnamed;   //声明一个已命名的互斥对象
       try
      {
        syncnamed = mutex.openexisting(mutexname);    //如果此命名互斥对象已存在则请求打开
      }
      catch (waithandlecannotbeopenedexception)
      {
        syncnamed = new mutex(false, mutexname);     //如果初次运行没有已命名的互斥对象则创建一个
      }
      task multesk = new task
        (
          () =>         //多任务并行计算中的匿名方法,用委托也可以
          {
            for (; ; )     //为了效果明显而设计
            {
              console.writeline("当前进程等待获取互斥访问权......");
              syncnamed.waitone();
              console.writeline("获取互斥访问权,访问资源完毕,按回车释放互斥资料访问权.");
              console.readline();
              syncnamed.releasemutex();
              console.writeline("已释放互斥访问权。");
            }
          }
        );
      multesk.start();
      multesk.wait();
    }

9)分布式的同步

可以使用redis任务队列或者redis相关特性

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
parallel.for(0, 1000000, i =>
 {
  stopwatch sw1 = new stopwatch();
   sw1.start();
 
    if (redishelper.getredisoperation().lock(key))
       {
     var tt = int.parse(redishelper.getredisoperation().stringget("calc"));
 
     tt++;
 
     redishelper.getredisoperation().stringset("calc", tt.tostring());
 
     redishelper.getredisoperation().unlock(key);
            }
            var v = sw1.elapsedmilliseconds;
            if (v >= 10 * 1000)
            {
              console.write("f");
        }
      sw1.stop();
 });

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:http://www.cnblogs.com/yswenli/archive/2017/08/24/7421475.html