1,线程概述
线程是程序汇中独立的指令流。线程有一个优先级,实际上正在处理的程序的位置计数器,一个存储其局部变量的栈。每个线程都有自己的栈。但应用程序的内存和堆由一个进程的所有线程共享。
进程包含资源,如windows句柄,文件句柄或其他内核对象。每个进程都分配了虚拟内存。一个进程至少包含一个线程。操作系统会调度线程。
总结:
同步代码区域(代码块):lock, Monitor, SpinLock, Mutex,WaitHandle,Semaphore,EventWaitHandle,AutoRestEvent/ManualResetEvent.
Barrier, ReadWriterLock(Slim)
多线程变量同步:InterLocked,
进程间同步:
Mutex, Semaphore,
2,异步委托:
创建线程的一种简单方式是定义一个委托,并异步调用它。委托时方法类型安全的引用。Delegate类还支持异步调用委托,在后头创建一个执行任务的线程。
委托使用线程池来完成异步调用。
public delegate int TakesAWhileDelegate(int data, int ms);
2.1投票:
IAsyncResult ar=al.BeginInvoke(1,3000, null, null);
int result=dl.EndInvoke(ar);
2.2 等待句柄 (WaitHandle)
1 class Program投票(ar.IsCompleted) 或者等待句柄(ar.AsyncWaitHandle.WaitOne(50, false)
2 {
3 public delegate int TakesAWhileDelegate(int data, int ms);
4 static int TakesAWhile(int data, int ms)
5 {
6 Console.WriteLine("TakesAWhile started");
7 Thread.Sleep(ms);
8 Console.WriteLine("TakesAWhile completed");
9 return ++data;
10 }
11 private static void Main(string[] args)
12 {
13 Console.WriteLine("Main Begin.");
14 TakesAWhileDelegate dl = TakesAWhile;
15 IAsyncResult ar = dl.BeginInvoke(2, 3000, null, null);
16
17 //ar.IsCompleted
18 //ar.AsyncWaitHandle.WaitOne(50)
19
20 dl.EndInvoke(ar);
21 Console.WriteLine("Main() end.");
22 Console.ReadLine();
23 }
24 }
2.3 异步回调 (dl.BeginInvoke(1,3000, TakesAWhileCompleted, dl) )
传入一个回调函数委托,来异步执行。
1 class Program回调方法
2 {
3 public delegate int TakesAWhileDelegate(int data, int ms);
4 static int TakesAWhile(int data, int ms)
5 {
6 Console.WriteLine("TakesAWhile started");
7 Thread.Sleep(ms);
8 Console.WriteLine("TakesAWhile completed");
9 return ++data;
10 }
11 private static void Main(string[] args)
12 {
13 Console.WriteLine("Main Begin.");
14 TakesAWhileDelegate dl = TakesAWhile;
15 dl.BeginInvoke(2, 3000, ar =>
16 {
17 if (ar == null)
18 throw new ArgumentNullException("ar");
19 TakesAWhileDelegate dl1 = ar.AsyncState as TakesAWhileDelegate;
20 Trace.Assert(dl1 != null, "Invalid object type");
21 int result = dl1.EndInvoke(ar);
22 Console.WriteLine("result: {0}", result);
23 }, null);
24
25 Console.WriteLine("Main() end.");
26 Console.ReadLine();
27 }
28 }
3,Thread类
3.1 给线程传递数据
1,使用带ParameterizedThreadStart委托参数的Thread构造函数。2,创建自定义类,把线程的方法定位实例方法,这样就可以初始化实例的数据,之后启动线程。
3.2 后台线程:
只要有一个前台线程在运行,应用程序的进程就在运行。如果多个前台线程在运行,而Main()方法结束了,应用程序的进程依然是激活的,直到所有前台线程完成其任务为止。
private static void Main(string[] args)前台线程示例
{
var t1 = new Thread(ThreadMain) {Name = "MyNewThread", IsBackground = false};
t1.Start();
Console.WriteLine("Main Thread ending now.");
}
static void ThreadMain()
{
Console.WriteLine("Thread {0} started", Thread.CurrentThread.Name);
Thread.Sleep(3000);
Console.WriteLine("Thread {0} Completed",Thread.CurrentThread.Name);
}
3.3 线程的优先级
4 线程池
5,任务
5.1 启动任务:
TaskFactory tf = new TaskFactory();
Task t1 = tf.StartNew(TaskMethod);
Task t2 = Task.Factory.StartNew(TaskMethod);
Task t3 = new Task(TaskMethod);
t3.Start();
5.2 连续的任务
Task t1=new Task(DoOnFirst);
Task t2=t1.ContinueWith(DoOnSecond);
5.3任务层次结构
6 Parallel 类
Parallel.For
Parallel.ForEach()
Parallel.Invoke(fun1,fun2);
1 private static void Main(string[] args)Parallel.Invoke Code
2 {
3 Parallel.Invoke(TaskMethod1,TaskMethod2);
4
5 Console.ReadLine();
6 }
7
8 static void TaskMethod1()
9 {
10 Console.WriteLine("1running in a task.");
11 Console.WriteLine("Task id: {0}", Task.CurrentId);
12 }
13 static void TaskMethod2()
14 {
15 Console.WriteLine("2running in a task.");
16 Console.WriteLine("Task id: {0}", Task.CurrentId);
17 }
7. 取消架构
8, 线程问题: 争用条件和死锁
8.1 争用条件:
1 public class StateObject死锁演示代码
2 {
3 private int state = 5;
4
5 public void ChangeState(int loop)
6 {
7 lock (this)
8 {
9 if (state == 5)
10 {
11 state++;
12 Trace.Assert(state == 6, "Race condition ocurred after " + loop + " loops " + Task.CurrentId);
13 if (loop % 1000000 == 0)
14 {
15 Console.WriteLine("after " + loop + " loops " + Task.CurrentId);
16 }
17 }
18 state = 5;
19 }
20
21 }
22 }
23
24 public class SampleTask
25 {
26 public void RaceCondition(object o)
27 {
28 Trace.Assert(o is StateObject, "o must be of type StateObject.");
29 StateObject state = o as StateObject;
30 int i = 0;
31 while (true)
32 {
33 state.ChangeState(i++);
34 }
35 }
36 }
37
38 public class SampleThread
39 {
40 public SampleThread(StateObject s1, StateObject s2)
41 {
42 this.s1 = s1;
43 this.s2 = s2;
44 }
45
46 private StateObject s1;
47 private StateObject s2;
48
49 public void Deadlock1()
50 {
51 int i = 0;
52 while (true)
53 {
54 lock (s1)
55 {
56 lock (s2)
57 {
58 s1.ChangeState(i);
59 s2.ChangeState(i++);
60 Console.WriteLine("still running, {0}", i);
61
62 }
63 }
64 //Thread.Yield();
65 }
66 }
67 public void Deadlock2()
68 {
69 int i = 0;
70 while (true)
71 {
72 lock (s2)
73 {
74 lock (s1)
75 {
76 s1.ChangeState(i);
77 s2.ChangeState(i++);
78 Console.WriteLine("still running, {0}", i);
79
80 }
81 }
82 //Thread.Yield();
83 }
84 }
85 }
86
87 private static void Main(string[] args)
88 {
89 var state1 = new StateObject();
90 var state2 = new StateObject();
91
92 SampleThread st = new SampleThread(state1, state2);
93
94 Task.Factory.StartNew(st.Deadlock1);
95 Task.Factory.StartNew(st.Deadlock2);
96
97 Console.ReadLine();
98 }
9, 同步
9.1 Lock 语句
栈是线程独立的,但不是私有的。所有线程的栈内所有内容,都可以被其他线程访问。
为什么不用 lock(this) ?
因为这通常超出我们的控制,因为其他人也有可能lock这个对象。一个私有的对象是更好的选择。避免lock一个公开类型,或者超出你代码的控制的实例。
Tips:可以提供线程安全的原子操作。
1 class Program线程不安全-问题代码
2 {
3 public class SharedState
4 {
5 public int State { get; set; }
6 }
7
8 public class Job
9 {
10 private SharedState sharedState;
11
12 public Job(SharedState sharedState)
13 {
14 this.sharedState = sharedState;
15 }
16
17 public void DoTheJob()
18 {
19 for (int i = 0; i < 50000; i++)
20 {
21 sharedState.State +=1;
22 }
23 }
24 }
25
26 private static void Main(string[] args)
27 {
28 int numTasks = 20
29 ;
30 var state = new SharedState();
31 var tasks = new Task[numTasks];
32 for (int j = 0; j < 5; j++)
33 {
34 state.State = 0;
35 for (int i = 0; i < numTasks; i++)
36 {
37 tasks[i] = new Task(new Job(state).DoTheJob);
38 tasks[i].Start();
39
40 }
41
42 for (int i = 0; i < numTasks; i++)
43 {
44 tasks[i].Wait();
45
46 }
47 Console.WriteLine("summarized {0}", state.State);
48 }
49
50
51 }
52
53 }
9.2 Interlocked类
Interlock类用于使变量的简单语句原子化,线程安全方式递增、递减、交换和读取。i++不是线程安全的(包含3个操作:从内存获取、递增1、存储回内存,这些操作都可以被线程调度器打断)。
9.3 Monitor类
lock语句由编译器解释为Monitor类: Moniter.Enter(obj) ; Monitor.Exit(obj);
Monitor类的一个优点:可以添加一个等待被锁定的超市值。Monitor.TryEnter(lockObj,500,ref lockToken);
1 public void DoTheJob()Monitor.TryEnter
2 {
3 for (int i = 0; i < 50000; i++)
4 {
5 bool isLocked = false;
6 goLabel:
7 Monitor.TryEnter(sharedState, 500, ref isLocked);
8 if (isLocked)
9 {
10 sharedState.State += 1;
11 Monitor.Exit(sharedState);
12 }
13 else
14 {
15 Console.WriteLine("lock failed.");
16 goto goLabel;
17 }
18
19 }
20 }
9.4 SpinLock结构
适合于有大量的锁定,而且锁定的时间非常短。用法非常接近于Monitor类。获得锁使用Enter()或者TryEnter(),释放锁使用Exit()方法。小心SpinLock的传送,因为是结构,所以会复制。
9.5 WaitHandle基类
Delegate BeginInvoke() 用waithandle.WaitOne(50,false)来bolck当前线程,
WaitHandle是一个抽象基类,用于等待一个信号量的设置。可以等待不同的信号,因为WaitHandle是一个基类,可以派生一些类。
private static void Main(string[] args)AsyncWaitHandle
{
Action ac = () =>
{
Console.WriteLine("Action Begin.");
Thread.Sleep(2000);
Console.WriteLine("Action End.");
};
AsyncCallback callback = (o) =>
{
var cb = (Action)o.AsyncState;
cb.EndInvoke(o);
Console.WriteLine("Callback finished.");
};
IAsyncResult ar = ac.BeginInvoke(callback, ac);
while (true)
{
Console.Write(".");
if (ar.AsyncWaitHandle.WaitOne(50, true))
{
Console.WriteLine("Can get the result now.");
break;
}
}
Console.ReadLine();
}
WaitOne() 等待一个,waitAll()等待多个对象,WaitAny等待多个对象的一个。WaitAll和WaitAny是静态方法。
WaitHandle基类有一个SafeWaitHandle属性,其中可以将本机句柄赋予一个操作系统资源,并等待该句柄。
Mutex、EventWaitHandle 和 Semaphore类继承自WaitHandle基类。所以可以等到使用它们。
9.6 Mutex类
Mutex(mutual exclusion,互斥)是.net Framework中提供多个集成同步访问的一个类。它非常类似于Monitor,只有一个线程能拥有锁定。只有一个线程能获得互斥锁定,访问受互斥访问的同步代码区域。
在Mutex类的构造函数中,可以指定互斥是否最初由主调线程拥有。定义互斥的名称,获得互斥是否存在的信息。
系统能识别有名称的Mutex
private static void Main(string[] args)Mutex
{
bool isNew;
using (Mutex mutex = new Mutex(false, "ProMutext", out isNew))
{
if (isNew)
{
Console.WriteLine("Get mutex lock.");
}
else
{
Console.WriteLine("can't get mutex lock.");
}
Thread.Sleep(3000);
}
Thread.Sleep(1000000);
Console.ReadLine();
}
9.7 Semaphore类
信号量非常类似于互斥,其区别是多个线程使用。信号量是一种技术的互斥锁定。使用信号量可以定义同时访问旗语锁定保护的资源的线程个数。
Semaphore类:可以命名,使用系统范围内的资源,允许不同进程间同步。
static void Main()Semaphore(多线程&跨进程同步)
{
int threadCount = 6;
int semaphoreCount = 4;
var semaphore = new Semaphore( semaphoreCount, semaphoreCount,"ProSemaphore");
var threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
threads[i] = new Thread(ThreadMain);
threads[i].Start(semaphore);
}
for (int i = 0; i < threadCount; i++)
{
threads[i].Join();
}
Console.WriteLine("All threads finished");
}
static void ThreadMain(object o)
{
Semaphore semaphore = o as Semaphore;
Trace.Assert(semaphore != null, "o must be a Semaphore type");
bool isCompleted = false;
while (!isCompleted)
{
if (semaphore.WaitOne(600))
{
try
{
Console.WriteLine("Thread {0} locks the semaphore",
Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(4000);
}
finally
{
semaphore.Release();
Console.WriteLine("Thread {0} releases the semaphore",
Thread.CurrentThread.ManagedThreadId);
isCompleted = true;
}
}
else
{
Console.WriteLine("Timeout for thread {0}; wait again",
Thread.CurrentThread.ManagedThreadId);
}
}
}
SemaphoreSlim类是对于较短等待时间进行了优化的轻型版本,不能跨进程。不能命名,不使用内核信号量,不能跨进程。
private static void Main(string[] args)SemaphoreSlim
{
int threadCount = 6;
int semaphoreCount = 4;
var semaphore = new SemaphoreSlim(semaphoreCount, semaphoreCount);
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
threads[i] = new Thread(ThreadMain);
threads[i].Start(semaphore);
}
for (int i = 0; i < threadCount; i++)
{
threads[i].Join();
}
Console.WriteLine("AllThread finished!");
Console.ReadLine();
}
static void ThreadMain(object o)
{
SemaphoreSlim semaphore = o as SemaphoreSlim;
Trace.Assert(semaphore != null, "o must be a Semphore type.");
bool isCompleted = false;
while (!isCompleted)
{
if (semaphore.Wait(100))
{
try
{
Console.WriteLine("thread {0} locks the semaphore", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(3000);
}
finally
{
semaphore.Release();
Console.WriteLine("Thread {0} release the semaphore", Thread.CurrentThread.ManagedThreadId);
isCompleted = true;
}
}
else
{
Console.WriteLine("Timeout for thread {0}; wait again ", Thread.CurrentThread.ManagedThreadId);
}
}
}
9.8 Event类
事件是另一个系统范围内的资源同步方法。为了从托管代码中使用系统事件,.net framework提供了ManualResetEvent、AutoResetEvent、ManualResetEventSlim和CountdownEvent类。
private static void Main(string[] args)ManualResetEvent
{
const int taskCount = 10;
var mEvents = new ManualResetEventSlim[taskCount];
var waitHandles = new WaitHandle[taskCount];
var calcs = new Calculator[taskCount];
TaskFactory taskFactory = new TaskFactory();
for (int i = 0; i < taskCount; i++)
{
mEvents[i] = new ManualResetEventSlim(false);
waitHandles[i] = mEvents[i].WaitHandle;
calcs[i] = new Calculator(mEvents[i]);
taskFactory.StartNew(calcs[i].Calculation, Tuple.Create(i + 1, i + 3));
}
for (int i = 0; i < taskCount; i++)
{
int index = WaitHandle.WaitAny(waitHandles);
if (index == WaitHandle.WaitTimeout)
{
Console.WriteLine("Timeout!!");
}
else
{
mEvents[index].Reset();
Console.WriteLine("finished task for {0}, result: {1}", index, calcs[index].Result);
Thread.Sleep(100);
}
}
Console.ReadLine();
}
public class Calculator
{
private ManualResetEventSlim mEvent;
public int Result { get; private set; }
public Calculator(ManualResetEventSlim ev)
{
this.mEvent = ev;
}
public void Calculation(Object obj)
{
Tuple<int, int> data = (Tuple<int, int>)obj;
Console.WriteLine("Task {0} starts calculation", Task.CurrentId);
Thread.Sleep((3000));
Result = data.Item1 + data.Item2;
Console.WriteLine("Task {0} is ready", Task.CurrentId);
mEvent.Set();
}
}
把任务分支到多个任务中,并在以后合并结果,使用新的CountdownEvent类很有用。
不需要位每个任务创建一个单独的事件对象,而只需要创建一个事件对象。
var mEvents = new ManualResetEventSlim[taskCount];
// var cEvent = new CountdownEvent(taskCount);
var waitHandles = new WaitHandle[taskCount];
var calcs = new Calculator[taskCount];
int index = WaitHandle.WaitAny(waitHandles);//wait
//tasks
mEvent.Set();//all thread set;
//continue
private static void Main(string[] args)CountdownEvent
{
const int taskCount = 10;
var cEvent = new CountdownEvent(taskCount);
var calcs = new Calculator[taskCount];
TaskFactory taskFactory = new TaskFactory();
for (int i = 0; i < taskCount; i++)
{
calcs[i] = new Calculator(cEvent);
taskFactory.StartNew(calcs[i].Calculation, Tuple.Create(i + 1, i + 3));
}
cEvent.Wait();
Console.WriteLine("All finished.");
Console.ReadLine();
}
public class Calculator
{
private CountdownEvent cEvent;
public int Result { get; private set; }
public Calculator(CountdownEvent ev)
{
this.cEvent = ev;
}
public void Calculation(Object obj)
{
Tuple<int, int> data = (Tuple<int, int>)obj;
Console.WriteLine("Task {0} starts calculation", Task.CurrentId);
Thread.Sleep((3000));
Result = data.Item1 + data.Item2;
Console.WriteLine("Task {0} is ready", Task.CurrentId);
cEvent.Signal();
}
}
9.9 Barrier 类
适合于工作有多个任务分支且以后又需要合并工作的情况。
var barrier = new Barrier(numberTasks + 1);
barrier.SignalAndWait();//wait
//tasks
barrier.RemoveParticipant();//2 left
barrier.RemoveParticipant();//1 left
//continue.
9.10 ReadWriterLockSlim类
允许多个读取器。同时只有一个写入器工作,此时读取器不能工作。
private static List<int> items = new List<int>() { 0, 1, 2, 3, 4, 5 };ReadWriterLockSlim
static ReaderWriterLockSlim rwl = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
static void ReadMethod(object reader)
{
try
{
rwl.EnterReadLock();
for (int i = 0; i < items.Count; i++)
{
Console.WriteLine("read {0}, loop: {1}, item:{2}", reader, i, items[i]);
Thread.Sleep(40);
}
}
finally
{
rwl.ExitReadLock();
}
}
static void WriterMethod(object writer)
{
try
{
while (!rwl.TryEnterWriteLock(50))
{
Console.WriteLine("Writer {0} waiting ,current reader count: {1}", writer, rwl.CurrentReadCount);
}
Console.WriteLine("Writer{0} acquired the lock.", writer);
for (int i = 0; i < items.Count; i++)
{
items[i]++;
Thread.Sleep(50);
}
Console.WriteLine("Writer {0} finished.", writer);
}
finally
{
rwl.ExitWriteLock();
}
}
private static void Main(string[] args)
{
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var tasks = new Task[6];
tasks[0] = taskFactory.StartNew(WriterMethod, 1);
tasks[1] = taskFactory.StartNew(ReadMethod, 1);
tasks[2] = taskFactory.StartNew(ReadMethod, 2);
tasks[3] = taskFactory.StartNew(WriterMethod, 2);
tasks[4] = taskFactory.StartNew(ReadMethod, 3);
tasks[5] = taskFactory.StartNew(ReadMethod, 4);
foreach (Task task in tasks)
{
task.Wait();
}
Console.WriteLine("All finished.");
Console.ReadLine();
}
10 Timer类