C# 线程、任务和同步

时间:2021-07-12 18:32:31


1,线程概述
线程是程序汇中独立的指令流。线程有一个优先级,实际上正在处理的程序的位置计数器,一个存储其局部变量的栈。每个线程都有自己的栈。但应用程序的内存和堆由一个进程的所有线程共享。
进程包含资源,如windows句柄,文件句柄或其他内核对象。每个进程都分配了虚拟内存。一个进程至少包含一个线程。操作系统会调度线程。


总结:
同步代码区域(代码块):lock,  Monitor, SpinLock, Mutex,WaitHandle,Semaphore,EventWaitHandle,AutoRestEvent/ManualResetEvent.
Barrier, ReadWriterLock(Slim)
多线程变量同步:InterLocked, 



进程间同步:
Mutex, Semaphore,



2,异步委托:
创建线程的一种简单方式是定义一个委托,并异步调用它。委托时方法类型安全的引用。Delegate类还支持异步调用委托,在后头创建一个执行任务的线程。
  委托使用线程池来完成异步调用。
  public delegate int TakesAWhileDelegate(int data, int ms);
2.1投票:
  IAsyncResult ar=al.BeginInvoke(1,3000, null, null);
      int result=dl.EndInvoke(ar);
2.2 等待句柄  (WaitHandle)

C# 线程、任务和同步C# 线程、任务和同步
 1     class Program
2 {
3 public delegate int TakesAWhileDelegate(int data, int ms);
4 static int TakesAWhile(int data, int ms)
5 {
6 Console.WriteLine("TakesAWhile started");
7 Thread.Sleep(ms);
8 Console.WriteLine("TakesAWhile completed");
9 return ++data;
10 }
11 private static void Main(string[] args)
12 {
13 Console.WriteLine("Main Begin.");
14 TakesAWhileDelegate dl = TakesAWhile;
15 IAsyncResult ar = dl.BeginInvoke(2, 3000, null, null);
16
17 //ar.IsCompleted
18 //ar.AsyncWaitHandle.WaitOne(50)
19
20 dl.EndInvoke(ar);
21 Console.WriteLine("Main() end.");
22 Console.ReadLine();
23 }
24 }
投票(ar.IsCompleted) 或者等待句柄(ar.AsyncWaitHandle.WaitOne(50, false)

2.3 异步回调 (dl.BeginInvoke(1,3000, TakesAWhileCompleted, dl) )
传入一个回调函数委托,来异步执行。

C# 线程、任务和同步C# 线程、任务和同步
 1     class Program
2 {
3 public delegate int TakesAWhileDelegate(int data, int ms);
4 static int TakesAWhile(int data, int ms)
5 {
6 Console.WriteLine("TakesAWhile started");
7 Thread.Sleep(ms);
8 Console.WriteLine("TakesAWhile completed");
9 return ++data;
10 }
11 private static void Main(string[] args)
12 {
13 Console.WriteLine("Main Begin.");
14 TakesAWhileDelegate dl = TakesAWhile;
15 dl.BeginInvoke(2, 3000, ar =>
16 {
17 if (ar == null)
18 throw new ArgumentNullException("ar");
19 TakesAWhileDelegate dl1 = ar.AsyncState as TakesAWhileDelegate;
20 Trace.Assert(dl1 != null, "Invalid object type");
21 int result = dl1.EndInvoke(ar);
22 Console.WriteLine("result: {0}", result);
23 }, null);
24
25 Console.WriteLine("Main() end.");
26 Console.ReadLine();
27 }
28 }
回调方法


3,Thread类
3.1 给线程传递数据
  1,使用带ParameterizedThreadStart委托参数的Thread构造函数。2,创建自定义类,把线程的方法定位实例方法,这样就可以初始化实例的数据,之后启动线程。
3.2 后台线程:
  只要有一个前台线程在运行,应用程序的进程就在运行。如果多个前台线程在运行,而Main()方法结束了,应用程序的进程依然是激活的,直到所有前台线程完成其任务为止。
  

C# 线程、任务和同步C# 线程、任务和同步
        private static void Main(string[] args)
{
var t1 = new Thread(ThreadMain) {Name = "MyNewThread", IsBackground = false};
t1.Start();
Console.WriteLine(
"Main Thread ending now.");

}

static void ThreadMain()
{
Console.WriteLine(
"Thread {0} started", Thread.CurrentThread.Name);
Thread.Sleep(
3000);
Console.WriteLine(
"Thread {0} Completed",Thread.CurrentThread.Name);
}
前台线程示例

3.3 线程的优先级 

4 线程池

5,任务
5.1 启动任务:
  

C# 线程、任务和同步启动任务代码C# 线程、任务和同步
            TaskFactory tf = new TaskFactory();
Task t1
= tf.StartNew(TaskMethod);

Task t2
= Task.Factory.StartNew(TaskMethod);

Task t3
= new Task(TaskMethod);
t3.Start();

 5.2 连续的任务
Task t1=new Task(DoOnFirst);
Task t2=t1.ContinueWith(DoOnSecond);

5.3任务层次结构


6 Parallel 类
Parallel.For
Parallel.ForEach()
Parallel.Invoke(fun1,fun2);

C# 线程、任务和同步C# 线程、任务和同步
 1         private static void Main(string[] args)
2 {
3 Parallel.Invoke(TaskMethod1,TaskMethod2);
4
5 Console.ReadLine();
6 }
7
8 static void TaskMethod1()
9 {
10 Console.WriteLine("1running in a task.");
11 Console.WriteLine("Task id: {0}", Task.CurrentId);
12 }
13 static void TaskMethod2()
14 {
15 Console.WriteLine("2running in a task.");
16 Console.WriteLine("Task id: {0}", Task.CurrentId);
17 }
Parallel.Invoke Code

7. 取消架构

8, 线程问题:  争用条件和死锁
8.1 争用条件:

C# 线程、任务和同步C# 线程、任务和同步
 1         public class StateObject
2 {
3 private int state = 5;
4
5 public void ChangeState(int loop)
6 {
7 lock (this)
8 {
9 if (state == 5)
10 {
11 state++;
12 Trace.Assert(state == 6, "Race condition ocurred after " + loop + " loops " + Task.CurrentId);
13 if (loop % 1000000 == 0)
14 {
15 Console.WriteLine("after " + loop + " loops " + Task.CurrentId);
16 }
17 }
18 state = 5;
19 }
20
21 }
22 }
23
24 public class SampleTask
25 {
26 public void RaceCondition(object o)
27 {
28 Trace.Assert(o is StateObject, "o must be of type StateObject.");
29 StateObject state = o as StateObject;
30 int i = 0;
31 while (true)
32 {
33 state.ChangeState(i++);
34 }
35 }
36 }
37
38 public class SampleThread
39 {
40 public SampleThread(StateObject s1, StateObject s2)
41 {
42 this.s1 = s1;
43 this.s2 = s2;
44 }
45
46 private StateObject s1;
47 private StateObject s2;
48
49 public void Deadlock1()
50 {
51 int i = 0;
52 while (true)
53 {
54 lock (s1)
55 {
56 lock (s2)
57 {
58 s1.ChangeState(i);
59 s2.ChangeState(i++);
60 Console.WriteLine("still running, {0}", i);
61
62 }
63 }
64 //Thread.Yield();
65 }
66 }
67 public void Deadlock2()
68 {
69 int i = 0;
70 while (true)
71 {
72 lock (s2)
73 {
74 lock (s1)
75 {
76 s1.ChangeState(i);
77 s2.ChangeState(i++);
78 Console.WriteLine("still running, {0}", i);
79
80 }
81 }
82 //Thread.Yield();
83 }
84 }
85 }
86
87 private static void Main(string[] args)
88 {
89 var state1 = new StateObject();
90 var state2 = new StateObject();
91
92 SampleThread st = new SampleThread(state1, state2);
93
94 Task.Factory.StartNew(st.Deadlock1);
95 Task.Factory.StartNew(st.Deadlock2);
96
97 Console.ReadLine();
98 }
死锁演示代码


9, 同步
9.1 Lock 语句
栈是线程独立的,但不是私有的。所有线程的栈内所有内容,都可以被其他线程访问。
为什么不用 lock(this) ?
因为这通常超出我们的控制,因为其他人也有可能lock这个对象。一个私有的对象是更好的选择。避免lock一个公开类型,或者超出你代码的控制的实例。
Tips:可以提供线程安全的原子操作。

C# 线程、任务和同步C# 线程、任务和同步
 1     class Program
2 {
3 public class SharedState
4 {
5 public int State { get; set; }
6 }
7
8 public class Job
9 {
10 private SharedState sharedState;
11
12 public Job(SharedState sharedState)
13 {
14 this.sharedState = sharedState;
15 }
16
17 public void DoTheJob()
18 {
19 for (int i = 0; i < 50000; i++)
20 {
21 sharedState.State +=1;
22 }
23 }
24 }
25
26 private static void Main(string[] args)
27 {
28 int numTasks = 20
29 ;
30 var state = new SharedState();
31 var tasks = new Task[numTasks];
32 for (int j = 0; j < 5; j++)
33 {
34 state.State = 0;
35 for (int i = 0; i < numTasks; i++)
36 {
37 tasks[i] = new Task(new Job(state).DoTheJob);
38 tasks[i].Start();
39
40 }
41
42 for (int i = 0; i < numTasks; i++)
43 {
44 tasks[i].Wait();
45
46 }
47 Console.WriteLine("summarized {0}", state.State);
48 }
49
50
51 }
52
53 }
线程不安全-问题代码

9.2 Interlocked类
Interlock类用于使变量的简单语句原子化,线程安全方式递增、递减、交换和读取。i++不是线程安全的(包含3个操作:从内存获取、递增1、存储回内存,这些操作都可以被线程调度器打断)。

9.3 Monitor类
lock语句由编译器解释为Monitor类: Moniter.Enter(obj) ;   Monitor.Exit(obj);
Monitor类的一个优点:可以添加一个等待被锁定的超市值。Monitor.TryEnter(lockObj,500,ref lockToken);

C# 线程、任务和同步C# 线程、任务和同步
 1             public void DoTheJob()
2 {
3 for (int i = 0; i < 50000; i++)
4 {
5 bool isLocked = false;
6 goLabel:
7 Monitor.TryEnter(sharedState, 500, ref isLocked);
8 if (isLocked)
9 {
10 sharedState.State += 1;
11 Monitor.Exit(sharedState);
12 }
13 else
14 {
15 Console.WriteLine("lock failed.");
16 goto goLabel;
17 }
18
19 }
20 }
Monitor.TryEnter

 9.4 SpinLock结构
适合于有大量的锁定,而且锁定的时间非常短。用法非常接近于Monitor类。获得锁使用Enter()或者TryEnter(),释放锁使用Exit()方法。小心SpinLock的传送,因为是结构,所以会复制。


9.5 WaitHandle基类
Delegate BeginInvoke() 用waithandle.WaitOne(50,false)来bolck当前线程,
WaitHandle是一个抽象基类,用于等待一个信号量的设置。可以等待不同的信号,因为WaitHandle是一个基类,可以派生一些类。

C# 线程、任务和同步C# 线程、任务和同步
        private static void Main(string[] args)
{
Action ac
= () =>
{
Console.WriteLine(
"Action Begin.");
Thread.Sleep(
2000);
Console.WriteLine(
"Action End.");

};

AsyncCallback callback
= (o) =>
{
var cb = (Action)o.AsyncState;
cb.EndInvoke(o);
Console.WriteLine(
"Callback finished.");

};

IAsyncResult ar
= ac.BeginInvoke(callback, ac);
while (true)
{
Console.Write(
".");
if (ar.AsyncWaitHandle.WaitOne(50, true))
{
Console.WriteLine(
"Can get the result now.");
break;
}
}


Console.ReadLine();

}
AsyncWaitHandle

WaitOne() 等待一个,waitAll()等待多个对象,WaitAny等待多个对象的一个。WaitAll和WaitAny是静态方法。

WaitHandle基类有一个SafeWaitHandle属性,其中可以将本机句柄赋予一个操作系统资源,并等待该句柄。
Mutex、EventWaitHandle 和 Semaphore类继承自WaitHandle基类。所以可以等到使用它们。

9.6 Mutex类
Mutex(mutual exclusion,互斥)是.net Framework中提供多个集成同步访问的一个类。它非常类似于Monitor,只有一个线程能拥有锁定。只有一个线程能获得互斥锁定,访问受互斥访问的同步代码区域。
    在Mutex类的构造函数中,可以指定互斥是否最初由主调线程拥有。定义互斥的名称,获得互斥是否存在的信息。
系统能识别有名称的Mutex

C# 线程、任务和同步C# 线程、任务和同步
        private static void Main(string[] args)
{
bool isNew;
using (Mutex mutex = new Mutex(false, "ProMutext", out isNew))
{
if (isNew)
{
Console.WriteLine(
"Get mutex lock.");

}
else
{
Console.WriteLine(
"can't get mutex lock.");
}
Thread.Sleep(
3000);
}
Thread.Sleep(
1000000);
Console.ReadLine();

}
Mutex

 

9.7 Semaphore类
信号量非常类似于互斥,其区别是多个线程使用。信号量是一种技术的互斥锁定。使用信号量可以定义同时访问旗语锁定保护的资源的线程个数。
Semaphore类:可以命名,使用系统范围内的资源,允许不同进程间同步。

C# 线程、任务和同步C# 线程、任务和同步
        static void Main()
{
int threadCount = 6;
int semaphoreCount = 4;
var semaphore = new Semaphore( semaphoreCount, semaphoreCount,"ProSemaphore");
var threads = new Thread[threadCount];

for (int i = 0; i < threadCount; i++)
{
threads[i]
= new Thread(ThreadMain);
threads[i].Start(semaphore);
}

for (int i = 0; i < threadCount; i++)
{
threads[i].Join();
}
Console.WriteLine(
"All threads finished");

}

static void ThreadMain(object o)
{
Semaphore semaphore
= o as Semaphore;
Trace.Assert(semaphore
!= null, "o must be a Semaphore type");
bool isCompleted = false;
while (!isCompleted)
{
if (semaphore.WaitOne(600))
{
try
{
Console.WriteLine(
"Thread {0} locks the semaphore",
Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(
4000);
}
finally
{
semaphore.Release();
Console.WriteLine(
"Thread {0} releases the semaphore",
Thread.CurrentThread.ManagedThreadId);
isCompleted
= true;
}
}
else
{
Console.WriteLine(
"Timeout for thread {0}; wait again",
Thread.CurrentThread.ManagedThreadId);
}
}
}
Semaphore(多线程&跨进程同步)

SemaphoreSlim类是对于较短等待时间进行了优化的轻型版本,不能跨进程。不能命名,不使用内核信号量,不能跨进程。

C# 线程、任务和同步C# 线程、任务和同步
        private static void Main(string[] args)
{
int threadCount = 6;
int semaphoreCount = 4;
var semaphore = new SemaphoreSlim(semaphoreCount, semaphoreCount);
Thread[] threads
= new Thread[threadCount];

for (int i = 0; i < threadCount; i++)
{
threads[i]
= new Thread(ThreadMain);
threads[i].Start(semaphore);
}

for (int i = 0; i < threadCount; i++)
{
threads[i].Join();
}
Console.WriteLine(
"AllThread finished!");

Console.ReadLine();
}

static void ThreadMain(object o)
{
SemaphoreSlim semaphore
= o as SemaphoreSlim;
Trace.Assert(semaphore
!= null, "o must be a Semphore type.");
bool isCompleted = false;
while (!isCompleted)
{
if (semaphore.Wait(100))
{
try
{
Console.WriteLine(
"thread {0} locks the semaphore", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(
3000);
}
finally
{
semaphore.Release();
Console.WriteLine(
"Thread {0} release the semaphore", Thread.CurrentThread.ManagedThreadId);
isCompleted
= true;
}
}
else
{
Console.WriteLine(
"Timeout for thread {0}; wait again ", Thread.CurrentThread.ManagedThreadId);
}
}
}
SemaphoreSlim



9.8 Event类
事件是另一个系统范围内的资源同步方法。为了从托管代码中使用系统事件,.net framework提供了ManualResetEvent、AutoResetEvent、ManualResetEventSlim和CountdownEvent类。

C# 线程、任务和同步C# 线程、任务和同步
       private static void Main(string[] args)
{
const int taskCount = 10;
var mEvents = new ManualResetEventSlim[taskCount];
var waitHandles = new WaitHandle[taskCount];
var calcs = new Calculator[taskCount];
TaskFactory taskFactory
= new TaskFactory();
for (int i = 0; i < taskCount; i++)
{
mEvents[i]
= new ManualResetEventSlim(false);
waitHandles[i]
= mEvents[i].WaitHandle;
calcs[i]
= new Calculator(mEvents[i]);

taskFactory.StartNew(calcs[i].Calculation, Tuple.Create(i
+ 1, i + 3));

}

for (int i = 0; i < taskCount; i++)
{
int index = WaitHandle.WaitAny(waitHandles);
if (index == WaitHandle.WaitTimeout)
{
Console.WriteLine(
"Timeout!!");

}
else
{
mEvents[index].Reset();
Console.WriteLine(
"finished task for {0}, result: {1}", index, calcs[index].Result);
Thread.Sleep(
100);
}
}

Console.ReadLine();
}

public class Calculator
{
private ManualResetEventSlim mEvent;
public int Result { get; private set; }

public Calculator(ManualResetEventSlim ev)
{
this.mEvent = ev;
}

public void Calculation(Object obj)
{
Tuple
<int, int> data = (Tuple<int, int>)obj;
Console.WriteLine(
"Task {0} starts calculation", Task.CurrentId);
Thread.Sleep((
3000));
Result
= data.Item1 + data.Item2;
Console.WriteLine(
"Task {0} is ready", Task.CurrentId);
mEvent.Set();
}
}
ManualResetEvent

把任务分支到多个任务中,并在以后合并结果,使用新的CountdownEvent类很有用。
不需要位每个任务创建一个单独的事件对象,而只需要创建一个事件对象。

var mEvents = new ManualResetEventSlim[taskCount];
// var cEvent = new CountdownEvent(taskCount);

var waitHandles = new WaitHandle[taskCount];
var calcs = new Calculator[taskCount];

int index = WaitHandle.WaitAny(waitHandles);//wait

//tasks
mEvent.Set();//all thread set;
//continue

C# 线程、任务和同步C# 线程、任务和同步
        private static void Main(string[] args)
{
const int taskCount = 10;
var cEvent = new CountdownEvent(taskCount);

var calcs = new Calculator[taskCount];
TaskFactory taskFactory
= new TaskFactory();
for (int i = 0; i < taskCount; i++)
{
calcs[i]
= new Calculator(cEvent);

taskFactory.StartNew(calcs[i].Calculation, Tuple.Create(i
+ 1, i + 3));

}

cEvent.Wait();
Console.WriteLine(
"All finished.");

Console.ReadLine();
}

public class Calculator
{
private CountdownEvent cEvent;
public int Result { get; private set; }

public Calculator(CountdownEvent ev)
{
this.cEvent = ev;
}

public void Calculation(Object obj)
{
Tuple
<int, int> data = (Tuple<int, int>)obj;
Console.WriteLine(
"Task {0} starts calculation", Task.CurrentId);
Thread.Sleep((
3000));
Result
= data.Item1 + data.Item2;
Console.WriteLine(
"Task {0} is ready", Task.CurrentId);
cEvent.Signal();
}
}
CountdownEvent

 9.9 Barrier 类
适合于工作有多个任务分支且以后又需要合并工作的情况。

 var barrier = new Barrier(numberTasks + 1);
 barrier.SignalAndWait();//wait
//tasks
barrier.RemoveParticipant();//2 left

barrier.RemoveParticipant();//1 left

//continue.

9.10 ReadWriterLockSlim类
允许多个读取器。同时只有一个写入器工作,此时读取器不能工作。

C# 线程、任务和同步C# 线程、任务和同步
        private static List<int> items = new List<int>() { 0, 1, 2, 3, 4, 5 };
static ReaderWriterLockSlim rwl = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);

static void ReadMethod(object reader)
{
try
{
rwl.EnterReadLock();
for (int i = 0; i < items.Count; i++)
{
Console.WriteLine(
"read {0}, loop: {1}, item:{2}", reader, i, items[i]);
Thread.Sleep(
40);
}
}
finally
{
rwl.ExitReadLock();
}
}

static void WriterMethod(object writer)
{
try
{
while (!rwl.TryEnterWriteLock(50))
{
Console.WriteLine(
"Writer {0} waiting ,current reader count: {1}", writer, rwl.CurrentReadCount);
}
Console.WriteLine(
"Writer{0} acquired the lock.", writer);
for (int i = 0; i < items.Count; i++)
{
items[i]
++;
Thread.Sleep(
50);

}
Console.WriteLine(
"Writer {0} finished.", writer);

}
finally
{
rwl.ExitWriteLock();
}
}
private static void Main(string[] args)
{
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var tasks = new Task[6];
tasks[
0] = taskFactory.StartNew(WriterMethod, 1);
tasks[
1] = taskFactory.StartNew(ReadMethod, 1);
tasks[
2] = taskFactory.StartNew(ReadMethod, 2);
tasks[
3] = taskFactory.StartNew(WriterMethod, 2);
tasks[
4] = taskFactory.StartNew(ReadMethod, 3);
tasks[
5] = taskFactory.StartNew(ReadMethod, 4);
foreach (Task task in tasks)
{
task.Wait();

}


Console.WriteLine(
"All finished.");

Console.ReadLine();
}
ReadWriterLockSlim

 

 

10 Timer类