目录结构:
这篇文章是C#线程系列的第一篇,在开发中经常会涉及到线程的使用。那么读者有思考过下面的问题吗,如果有一个大的任务,如何分成若干个小的任务执行?如何获得每个子任务的计算结果?如何知道任务是否执行结束?如何处理异常?如果读者对C#的线程不熟悉,可能会对这些问题比较迷惑,接下来的这篇文章可以帮你解决这些问题。本篇文章的大部分内容都来自 CLR VIR C# ,感谢Jeffrey Richter做出的贡献。
1 线程池简介
创建线程和销毁线程是一个非常昂贵的操作,需要耗费大量的时间。另外,太多的线程会浪费内存资源,由于操作系统必须调度可运行的线程并执行上下文切换,所以太多的线程对性能也不利。为了改善这种情况,C#提供了线程池来解决这个问题。每个CLR都有一个线程池,这个线程池由CLR控制的所有AppDomain共享。如果一个进程加载了多个CLR,那么每个CLR都具有一个线程池。
CLR在初始化时,线程池中是没有线程的。当线程池接受到一个请求任务后,CLR的线程池会创建一个新的线程来处理它,在请求任务处理完成后,这个线程并不会被真正回收,它会进入空闲状态,等待执行下一个请求任务。如果你的应用程序向线程池发出许多请求,线程池会尝试使用最初创建的线程来服务所有的请求任务。然而,如果你的应用程序发出请求的速度超过了线程池处理他们的速度,就会创建大量的线程(这不是绝对的,创建线程还与可用的资源有关系)。然后,你的应用程序所有的请求都能由少量的线程来完成处理,所以线程池不必创建大量的线程。
如果应用程序停止向线程池发出请求,池中会出现大量什么都不做的线程。大量的空闲线程对内存资源显然是不利的,为了解决这个问题,CLR的线程池的线程提供了自动终结的功能。当一个线程闲着没事一段时间后(不同的CLR版本对这个时间的定义不一样),线程会自己醒来终止自己以释放资源。线程终止自己会产生一定的性能损失。然而线程终止自己是因为他闲得慌,表明应用程序本身没有做太多的事情,所以这个对性能影响不大。
线程池可以只容纳少量线程,也可以容纳大量线程,而且还能够在这两者间切换。线程池是启发式的。如果应用程序需要执行许多任务,同时有可用的CPU,那么线程池就会创建更多的线程,如果应用程序的负载减轻,线程池线程就会终止它们自己。
2 执行上下文(Execution Context)
每个线程都关联了一个执行上下文的数据结构。执行上下文包括的东西有安全设置,宿主设置,逻辑上下文。线程执行它的代码时,一些操作会收到执行上下文设置(尤其是安全设置)的影响。默认情况下,当一个线程(初始线程)使用另一个线程(辅助线程)执行任务时,前者的执行上下文流向到(复制到)辅助线程,这就确保了辅助线程执行的任何操作使用的是和初始线程相同的安全设置和宿主设置,以及在初始线程中设置到逻辑上下文中的数据都适合辅助线程,这种复制行为是很合理的。然而这种执行上下文的复制会对性能产生一定的影响,因为执行上下文中包含了大量的信息。
System.Threading.ExecutionContext类允许控制执行上下文如从一个线程“流向”另一个线程。
下来是该类的三个常用的方法:
[Serializable] public sealed class ExecutionContext : IDisposable, System.Runtime.Serialization.ISerializable { [System.Security.SecurityCritical] public static System.Threading.AsyncFlowControl SuppressFlow ();//抑制执行上下文流向异步线程 public static void RestoreFlow ();//恢复流动 public static bool IsFlowSuppressed ();//当前执行上下文的流动是否是抑制的 ... }
下面展示了如何阻止逻辑上下文的流动来影响线程逻辑上下文中的数据:
//放入一些数据到Main线程的逻辑上下文中 CallContext.LogicalSetData("name","jame"); //初始化一个线程池线程做一些事情 //线程池线程能访问逻辑调用上下文数据 ThreadPool.QueueUserWorkItem( state=> Console.WriteLine("name is {0}",CallContext.LogicalGetData("name"))//name is jame ); //阻止Main线程的执行上下文流动 ExecutionContext.SuppressFlow(); //初始化要由线程池线程做的工作 //线程池线程不能访问逻辑上下文中的数据 ThreadPool.QueueUserWorkItem( state => Console.WriteLine("name is {0}", CallContext.LogicalGetData("name"))//name is ); //恢复Main线程的执行上下文流动 ExecutionContext.RestoreFlow(); Console.ReadLine();
上面的案例验证了如何在调用者的线程中影响逻辑上下文数据的传递。
我们知道 ThreadPool.QueueUserWorkItem 方法会创建线程,该方法创建的线程默认是支持执行上下文流动的。然而,ThreadPool也提供了另一个方法 ThreadPool.UnsafeQueueUserWorkItem ,UnsafeQueueUserWorkItem创建的线程默认是不支持上下文流动的。
3 CancelTokenSource的使用
Microsoft.NET Framework提供了标准的取消操作,这个操作是协作式的,意味着要取消的操作必须显示调用取消。对于长时间运行的计算操作,这是一件很“棒”的事情。取消操作首先要创建一个 System.Threading.CancellationTokenSource 对象:
public sealed class CancellationTokenSource:IDisposable{ public CancellationTokenSource(); public void Dispose();//释放资源 public Boolean IsCancellationRequested{get;} public CancellationToken Token{get;} public void Cancel();//发出取消请求 public void Cancel(Boolean throwOnFirstException); ... }
这个对象包含了和管理取消有关的所有状态。构造好一个CancellationTokenSource(一个引用对象)之后,可从它的Token属性获得一个或多个CancellationToken(一个值类型)实例,并传给你的操作,使操作可以取消。
下面是使用CancellationTokenSource的简单案例
static void Main(string[] args) { //构建CancellationTokenSource实例 CancellationTokenSource cts = new CancellationTokenSource(); //将CancellationTokenSource实例的CancellationToken和其他参数传入Count方法 ThreadPool.QueueUserWorkItem(o => Count(cts.Token, 1000) ); Console.WriteLine("Press<Enter> to cancel the operation"); Console.ReadLine(); cts.Cancel();//发出取消请求 Console.ReadLine(); } static void Count(CancellationToken token, Int32 countTo) { for (Int32 index = 0; index <= countTo; index++) { //使用isCancellationRequest判断是否有取消请求 //若有,则退出循环,否则继续。 if (token.IsCancellationRequested) { Console.WriteLine("count is cancelled"); break; } Console.WriteLine(index); Thread.Sleep(500);//出于演示的目的,浪费一些时间 } Console.WriteLine("count is down"); }
在上面的案例中,我们显式的传入CancellationToken的实例,并且写出逻辑代码判断如何结束循环。
如果要执行一个不允许被取消的操作,可以使用CancellationToken静态None属性返回的CancellationToken实例,由于它不和任何CancellationTokenSource关联,所以没有CancellationTokenSource能够调用Cancel()方法。查询这个特殊的CancellationToken实例的 IsCancellationRequest 属性,总是返回false。
CancellationToken提供的Register方法,允许注册在CancellationTokenSource取消时回调的方法。
例如:
var cts=new CancellationTokenSource(); cts.Token.Register(()=>Console.WriteLine("canceled 1")); cts.Token.Register(()=>Console.WriteLine("canceled 2")); //出于测试的目的,取消它,让其回调两个方法 cts.Cancel();
得到以下结果:
Canceled 2 Canceled 1
我们在这里讨论了显示取消操作(CancellationTokenSource)的使用,需要注意和线程打断(interrupt)区分开来。两者都可以取消线程执行,但是显示取消操作必需是显示调用取消操作才能取消,而且还应该规定线程退出的逻辑代码,比如上面的:
if (token.IsCancellationRequested) { Console.WriteLine("count is cancelled"); break; }
线程打断是在调用interrupt后,直接终止线程的执行。如果需要被打断的线程处于阻塞状态,那么会抛出ThreadInterruptedException异常,所以打断线程是没办法保证其内部数据的状态的。而显示取消操作对数据的操作更方便,更优雅。
4 ThreadPool
ThreadPool代表线程池,它提供对任务执行,异步I/O的操作。这里笔者主要说一下任务执行,ThreadPool提供的QueueUserWorkItem注册的任务会被排队异步执行,并且会流动数据(执行上下文数据)到异步线程中。UnsafeQueueUserWorkItem则不会流动数据(执行上下文数据)到异步线程中。UnsafeQueueUserWorkItem一般都是涉及安全问题,如果不太关心安全问题,可以使用该方法提升性能。
当调用ThreadPool.QueueUserWorkItem发起一个异步的计算操作时,最大的问题就是不知道操作什么时候完成,以及没有提供能够在操作完成时获得返回值的机制。为了解决这个问题,Miscrosoft引入了任务(Task)的概念,接下介绍任务的使用。
5 Task和Task<TResult>
Task<TResult>派生自Task,Task<TResult>提供了可以从异步线程中获得返回值的机制。Task和Task<TResult>在内部机制中,都是基于线程池来完成异步操作,所以上面所介绍的概念(执行上下文,CancelTokenSource)都适合Task和Task<TResult>。Timer和Timer<TResult>的最大不同之处,就是Timer不能获得异步返回值,而Timer<TResult>可以,下面的部分笔者将会更多的介绍Task<TResult>的使用。
5.1 等待任务执行完成并获取结果
static void Main(string[] args) { //创建一个Task(还没有开始运行) Task<Int32> t = new Task<Int32>(n => { return Sum((Int32)n);},1000); //启动任务 t.Start(); //可选择显式等待任务完成 t.Wait(); //可获得结果(Result属性内部会调用wait) Console.WriteLine("The Sum is : {0}",t.Result); Console.ReadLine(); } static Int32 Sum(Int32 n) { Int32 sum = 0; for (; n > 0; n--) { checked { sum += n; };//如果n太大,就会抛出System.OverflowException } return sum; }
除了等待单个线程,Task类还提供了两个静态方法,运行线程等待一个Task对象数组。其中Task的静态WaitAny方法会阻塞调用线程,直到数组的中任何Task对象完成,方法返回Int32数组索引值,指明完成的是那个Task对象。Task还有WaitAll方法,它阻塞调用线程,直到数组中的所有Task对象执行完成。如果所有Task对象都完成就返回True,发生超时就返回false。
调用Wait方法或者Result属性时,有可能抛出一个System.AggregateException异常,AggregateException封装了异常对象的一个集合(如果父任务生成了多个子任务,而多个子任务都抛出了异常,这个集合便可能包含多个异常。)该类型的InnerExceptions属性返回一个ReadOnlyCollection<Exception>对象。不要混淆InnerExceptions属性和InnerException属性,后者是AggregateException从System.Exception继承的。
5.2 ContinueWith(XXX)方法的使用
伸缩性好的软件不应该使用线程阻塞,当判断一个线程是否执行结束时,在调用线程中调用Wait或者在任务尚未完成的时候调用查询任务的Result属性可能不利于系统的性能和伸缩性,很有可能造成线程阻塞。Microsoft提供了更优雅的解决方式,Microsoft提供了一些列Sysetm.Threading.Tasks.Task.ContinueWith重载方法,
ContinueWith(Action<Task,Object>, Object) ContinueWith(Action<Task,Object>, Object, CancellationToken) ContinueWith(Action<Task,Object>, Object, TaskContinuationOptions) ContinueWith(Action<Task,Object>, Object, TaskScheduler) ContinueWith(Action<Task>, CancellationToken, TaskContinuationOptions, TaskScheduler) ContinueWith(Action<Task,Object>, Object, CancellationToken, TaskContinuationOptions, TaskScheduler) ContinueWith(Action<Task>) ContinueWith<TResult>(Func<Task,Object,TResult>, Object) ...
看下面的案例,既能判断线程结束的时刻,也能不能阻塞调用线程。
static void Main(string[] args) { //创建一个Task(还没有开始运行) Task<Int32> t = new Task<Int32>(n => { return Sum((Int32)n);},1000); //当父任务执行完毕后,接受就会执行该任务 t.ContinueWith((task) => { Console.WriteLine("task done"); }); //启动任务 t.Start(); Console.ReadLine(); } static Int32 Sum(Int32 n) { Int32 sum = 0; for (; n > 0; n--) { checked { sum += n; };//如果n太大,就会抛出System.OverflowException } return sum; }
在ContinueWith中关联的方法会在父任务执行完毕后,被依次执行。我们也可以修改上面的代码,完成更实用的功能:
//当父任务执行完毕后,接受就会执行该任务 t.ContinueWith((task) => { Console.WriteLine("The sum is : {0}",task.Result); });
这样以来,我们既可以得到父任务的执行结果,又不会阻塞调用线程。
可以在调用ContinueWith时传递一组TaskContinuationOptions枚举值进行按位OR运算。该枚举值大约有15个值,它指定了在使用ContinueWith创建附属任务时的行为,详情可以参见:https://docs.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.taskcontinuationoptions?view=netframework-4.7.2
5.3 子任务
任务是支持父/子关系的,例如:
static void Main(string[] args) { Task<Int32[]> parents = new Task<Int32[]>(() => { var result=new Int32[3];//创建数组来存储结果 //这个任务创建并启动3个子任务 new Task(() => { result[0] = Sum(1000); }, TaskCreationOptions.AttachedToParent).Start(); new Task(() => { result[1] = Sum(2000); }, TaskCreationOptions.AttachedToParent).Start(); new Task(() => { result[2] = Sum(3000); }, TaskCreationOptions.AttachedToParent).Start(); //返回对数组的引用(数组元素可能还没有初始化) return result; }); //父任务及其子任务执行完成后,用一个延续任务查看结果 var cwt = parents.ContinueWith(parentTask => Array.ForEach(parentTask.Result,Console.WriteLine)); //开始任务 parents.Start(); Console.ReadLine(); } static Int32 Sum(Int32 n) { Int32 sum = 0; for (; n > 0; n--) { checked { sum += n; };//如果n太大,就会抛出System.OverflowException } return sum; }
在本例中,父任务创建并启动三个Task对象。一个Task任务(父任务)创建的一个或多个Task对象(子任务)认是*任务,子任务与创建他们的父任务无关。但TaskCreationOptions.AttachedToParent 标志将一个子任务和创建它的父任务关联,结果是除非所有子任务结束执行(以及子任务的子任务),否则父任务不认为是已经结束执行。
5.4 任务工厂(TaskFactory)
有时需要创建一组共享相同配置的Task对象。为避免机械地将相同的参数传递给每个Task的构造器,可创建一个任务工厂来封装通用的配置。System.Threading.Tasks命名空间定义了一个TaskFactory类型和TaskFactory<TResult>类型。要创建一组返回Void的任务,就构造一个TaskFactory;要创建一组具有特定返回类型的任务,就构造一个TaskFactory<TResult>,并通过构造TResult实参返回传递任务的返回类型。TaskFactory和TaskFactory<TResult>都派生自System.Object类型,也就是说他们是同级的。但Task和Task<TResult>不是同级的,Task<TResult>派生自Task类型。
在创建一个TaskFactory时,需要指定所有任务的都具有的默认值,也就是说需要向任务工厂传递希望任务具有的CancellationToken,TaskScheduler,TaskCreationOptions和TaskContinuationOptions设置,下面展示如何使用TaskFactory<TResult>:
static void Main(string[] args) { var cts = new CancellationTokenSource(); var tf = new TaskFactory<Int32>( cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); //创建并启动三个子任务 var childTask = new[] { tf.StartNew(()=>{return Sum(cts.Token,1000);}), tf.StartNew(()=>{return Sum(cts.Token,2000);}), tf.StartNew(()=>{return Sum(cts.Token,Int32.MaxValue);})//太大,会抛出OverflowException异常 }; //任何子任务抛出异常,就取消其他子任务 for (Int32 task = 0; task < childTask.Length; task++) { childTask[task].ContinueWith(t => { cts.Cancel(); }, TaskContinuationOptions.OnlyOnFaulted); }; //所有子任务完成后,从未出错/未取消的子任务获得返回的结果值 //然后将最大值传递给另一个任务来显示最大结果 tf.ContinueWhenAll( childTask, completedTasks => { return completedTasks. Where(t => !t.IsFaulted && !t.IsCanceled). Max(t => t.Result); }, CancellationToken.None ). ContinueWith( t => { Console.WriteLine("the maximum is:{0}", t.Result); }, TaskContinuationOptions.ExecuteSynchronously ); Console.ReadLine(); } static Int32 Sum(CancellationToken token,Int32 n) { Int32 sum = 0; for (; n > 0; n--) { token.ThrowIfCancellationRequested();//如果有取消请求就抛出异常 checked { sum += n; };//如果n太大,就会抛出System.OverflowException } return sum; }
上面的代码不能以调试的方式运行,否则看不到任何实际的效果,应该在以非调试方式运行,点击"调试"->"开始执行(不调试)"运行才能得到正确的结果。上述代码创建了一个TaskFactory<Int32>对象,该任务工厂创建3个Task对象。所有子任务都使用相同的配置:每个Task对象都共享相同的CancellationTokenSource标记,任务都被视为其父任务的子任务,TaskFactory创建的所有延续任务都以同步方式执行,TaskFactory创建的所有Task对象都使用默认的TaskScheduler。
5.5 任务调度器
任务基础结构非常灵活,其中TaskFactory对象功不可没。TaskScheduler对象负责执行被调度的任务,同时向Visual Studio调试器公开任务信息。FCL提供了两个派生自TaskScheduler的类型:线程池任务调度器(thread pool task scheduler),和同步上下文任务调试器(Synchronization context task scheduler)。默认情况下,所有应用程序使用的都是线程池调度器。
同步上下文任务调度器适合提供了图形用户的应用程序,例如Windows窗口,Windows Presentation Foundation(WPF)、Silverlight和Windows Store应用程序。它将所有任务都调度给应用的GUI线程,使所有任务代码都能成功更新UI组件(按钮,菜单项)。该调度器不使用线程池。可执行TaskScheduler的静态FromCurrentSynchronizationContext方法来获得对同步上下文任务调度器的引用。
下面这个简单的Windows窗口应用程序演示了如何使用同步上下文任务调度器。
internal sealed class MyForm : Form{ private readonly TaskScheduler m_syncContextTaskScheduler; public MyForm(){ //获得对一个同步上下文任务调度器的引用 m_syncContextTaskScheduler= TaskScheduler.FromContextSyncrozationContext(); Text="Syncronization Context Task Scheduler Demo"; Visible=true; width=400; height=100; } private CancellationTokenSource m_cts; protected override void OnMouseClick(MouseEventArgs e){ if(m_cts!=null){//一个操作正在执行它,取消它 m_cts.Cancel(); m_cts=null; }else{//操作没有开始 Text="Operation running"; m_cts=new CancellationTokenSource(); //这个任务使用默认任务调度器,在一个线程池上执行 Task<Int32> t=Task.Run(()=> Sum(m_cts.Token,2000),m_cts.Token); //这些任务使用同步上下文调度器,在GUI线程 t.ContinueWith(task=>Text="Result:"+task.Result, CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, m_syncContextTaskScheduler); t.ContinueWith(task=>Text="Operation canceled", CancellationToken.None, TaskContinuationOptions.OnlyOnCanceled, m_syncContextTaskScheduler); t.ContinueWith(task=>Text="Operation faulted", CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, m_syncContextTaskScheduler); } base.OnMouseClick(e); } }
使用线程池线程很好,因为GUI线程在此期间不会被阻塞,能影响其他UI操作。但线程池线程执行的代码不应该尝试更新UI组件,否则会抛出InvalidOperationException异常。所以上面的方法是通过修改同步上下文调度器来实现在非GUI线程中修改GUI组件。修改GUI组件还可以通过如下的方法来实现。
SynchronizationContext.Current.post(delegate Method,Object obj);
6 Timer定时器
System.Threading命名空间中定义了Timer类,可用它让一个线程池线程定时调用一个方法。Timer类提供了如下几个相似的构造器:
public sealed class Timer : MarshalByRefObject,IDisposable{ public Timer(TimerCallback callback,Object state,Int32 dueTime,Int32 period); public Timer(TimerCallback callback,Object state,UInt32 dueTime,Int32 period); public Timer(TimerCallback callback,Object state,Int64 dueTime,Int32 period); public Timer(TimerCallback callback,Object state,TimeSpan dueTime,TimeSpan period); }
这个4个构造器可以完全一致的构造Timer对象。
callback:该参数标识希望由一个线程池线程回调的方法。
state:允许每次回调方法时都向它传递状态数据。
dueTime:告诉CLR在首次回调方法之前要等待多少毫秒。可以使用一个有符号或无符号的Int32为值,一个有符号的Int64位值或一个TimeSpan值。如果希望回调方法立即执行,那么应该参数dueTime指定为0。
period:每次回调方法之前应该等待多少毫秒,如果为这个参数传递Timeout.Infinite(-1),线程池线程只回调方法一次。
在内部,线程池为所有Timer对象都只使用了一个线程池。这个线程知道下一个Timer对象在什么时候到期(计时器还有多久触发)。下一个Timer对象到期时,线程就会被唤醒,在内部调用ThreadPool的QueueUserWorkItem,将一个工作项添加到线程池的队列中,使你的回调方法得到调用。如果回调方法的执行时间很长,计时器可能(上一个回调还未完成)再次触发。这可能造成多个线程池线程同时执行你的回调方法。例如:
static void Main(string[] args) { Int32 count = 0; Timer timer = new Timer(obj => { Console.WriteLine(count); Thread.Sleep(2000);//为了验证问题,休眠两秒 count++; }, null, 0,//立即执行 1000);//回调间隔为1秒钟 Console.ReadLine(); }
得到的输出如下:
0 0 1 2 3 4 5 5 7 ...
观察结果可以发现输出结果无规律,Timer会定时在间隔时间内从新调用回调方法,无论上一个回调方法是否完成,所以就出现了这种结果。如果我们希望实现函数节流(在一定的时间内,代码只运行一次),可以做出如下的修改:
Int32 count = 0; Boolean isRunning = false; Timer timer = new Timer(obj => { if (isRunning) { return;//如果已经在运行了,就退出 } isRunning = true;//立即锁住 Console.WriteLine(count); Thread.Sleep(2000);//为了验证问题,休眠两秒 count++; isRunning = false;//解锁 }, null, 0,//立即执行 1000);//回调间隔为1秒钟 Console.ReadLine();
然后我们就可以得到正确的输出结果:
0 1 2 3 ...
仔细观察上面的代码读者也需会发现,函数节流是不能准确控制两次执行操作的间隔时间的,
上面的方法可以解决问题,但如果有这样的需求呢,在每次执行完成后,再间隔一定时间后再执行下次操作。为了解决这样的问题,可以在构造Timer时,为period参数指定Timeout.Infinite,这样计时器就只触发一次。然后再在你的回调方法中,调用Change方法来指定一个新的dueTime,并再次为period参数指定Timeout.Infinite.
例如:
static Timer timer = null; static void Main(string[] args) { Int32 count = 0; timer = new Timer(obj => { Console.WriteLine(count); Thread.Sleep(2000);//为了验证问题,休眠两秒 count++; //更改启动时间为1秒后,间隔时间无限大 timer.Change(1000,Timeout.Infinite); }, null, Timeout.Infinite,//启动时间为无限长,防止提前运行 1000); //更改启动时间立即启动,间隔时间无限大 timer.Change(0,Timeout.Infinite); Console.ReadLine(); }
7 Parallel
System.Threading.Tasks.Parallel类封装了For,ForEach,Invoke方法,它的内部使用Task对象。使用这些并行方法可以提升性能。
例如,经常会看见类似如下的代码:
for(Int32 i=0;i<1000;i++){ DoWork(i); }
其实可以使用Parallel.For来提升性能:
Parallel.For(0,1000,i=>DoWork(i))
类似的如果有一个集合,不要这样写:
foreach(var item in collection){ DoWork(item); }
而要像下面这样写
//线程池的线程并行处理工作 Parallel.ForEach(collection,item=>DoWork(item));
如果既可以用For也可以用ForEach建议使用For,它执行的更快。
如果要执行多个多个方法,那么既可以像下面这样顺序执行:
//一个线程执行多个方法 Method1() Method2() Method3()
也可以并行执行,如下:
Parallel.Invoke( ()=>Method1(), ()=>Method2(), ()=>Method3());
Parallel的所有方法都让线程参与处理,从资源竞争的角度来说,这是一件好事。如果任何操作抛出未处理的异常,你调用的Parallel方法最后会抛出AggregateException异常,使用Parallel的方法的前提条件就是:工作项必须能并行执行!如果工作项不能并行执行,就不能使用Parallel的方法。
For和ForEach方法有一些版本允许传递3个委托。
任务局部初始化委托(localInit),为参与工作的每个任务都调用一次该方法,这个委托是在任务被要求处理工作项之前调用的。
主体委托(body),为参与工作的各个线程所处理的每一项都调用一次该委托。
任务局部总结委托(localFinally),为参与工作的每个任务都调用一次该委托。这个委托是在任务处理好派发给它的所有工作项之后调用的。即使主体引发一个未处理的异常,也会调用它。
下面展示了如何是这三个委托来计算一个目录中的所有文件的字节长度:
static void Main(string[] args) { Int64 result= DirectoryBytes(@"E:","*",SearchOption.AllDirectories); Console.WriteLine(result); Console.ReadLine(); } static Int64 DirectoryBytes(String path, String searchPattern, SearchOption searchOption) { var files = Directory.EnumerateFiles(path, searchPattern, searchOption); Int64 masterTotal = 0; ParallelLoopResult plr = Parallel.ForEach<String, Int64>(files, () =>//init { return 0; }, (file, loopstate, index, taskLocalTotal) =>//body { Int64 fileLength = 0; FileStream fs = null; try { fs = File.OpenRead(file); fileLength = fs.Length; } catch (Exception e) { Console.WriteLine(e.Message); } finally { if (fs != null) fs.Dispose(); } return fileLength + taskLocalTotal; }, (bytes) =>//finish { Interlocked.Add(ref masterTotal, bytes); }); return masterTotal; }
8 并行语言集成查询(PLINQ)
Microsoft的语言继承查询(Language Integrated Query,PLINQ)功能提供一个简洁的语言来查询数据集合。可用LING轻松的对数据进行筛选、排序、投射等等。使用LING只有一个线程处理数据集合的所有项;我们称之为顺序查询,要提高性能,可以使用并行LINQ(Parallel LINQ),它将顺序查询转化为并行查询,在内部使用任务(排队默认给TaskScheduler),将集合中的数据项的处理工作分散到多个CPU上,以便并发处理数据项。
静态的 System.Linq.ParallelEnumerable 类实现了PLINQ的所有功能,所以必须通过C#的using指令将System.LINQ命名空间导入你的源代码。要让自己的LINQ 通Objects 查询调用这些方法的并行版本,必须将自己的顺序查询(基于IEnumerable或者IEnumerable<T>)转化为并行查询(基于ParallelQuery或者ParallelQuery<T>),这是用ParallelEnumerable的AsParallel扩展方法来实现的,如下所示:
public static ParallelQuery<TSource> AsParallel<TSource>(this Enumerable<TSource> source); public static ParallelQuery AsParallel(this IEnumerable source);
例如:
static void Main(string[] args) { ObsoleteMethod(typeof(Object).Assembly); Console.ReadLine(); } static void ObsoleteMethod(Assembly assembly) { var query = //获得程序集中所有公开的类型,使用AsParallel将顺序查询转化为并行查询 from type in assembly.GetExportedTypes().AsParallel() //获得public,instance,static方法 from method in type.GetMethods(BindingFlags.Public | BindingFlags.Instance | BindingFlags.Static) //查询具有ObsoleteAttribute属性的方法 let absoletArr = typeof(ObsoleteAttribute) where Attribute.IsDefined(method, absoletArr) //按照名称排序 orderby type.FullName //获得方法上的ObsoleteAttribute特性信息 let vabsoletArr = (ObsoleteAttribute)Attribute.GetCustomAttribute(method, absoletArr) //返回结果 select String.Format("type={0},method={1},message={2}", absoletArr.FullName, method.ToString(), vabsoletArr.Message); //显示结果 foreach(var result in query) Console.WriteLine(result); }
上面使用AsParallel()将顺序查询转化为并行查询。如果需要将并行查询转化为顺序查询可以使用ParallelEnumerable的AsSequential方法,
public static IEnumerable<TSource> AsSequence<TSource>(this ParallelQuery<TSource> source)
如果希望以并行的方式处理查询结果可以使用ForAll方法
static void ForAll<TSource> (this ParallelQuery<TSource> source,Action<TSource> action)
该方法允许多个线程同时处理结果。可修改前面的代码来使用该方法:
query.ForAll(Console.WriteLine);
然而,让多个线程调用Console.WriteLine反而会损害性能,因为Console类内部会对线程进行同步处理,确保每次只有一个线程能访问控制台窗口,避免来自多个线程的文本在最后显示乱成一团。
由于PLINQ用多个线程处理数据项,所以数据项会被并发处理,结果被无序返回。如果需要让PLINQ保存数据项的顺序,可调用ParallelEnumerable的AsOrder方法,调用这个方法时,线程成组的处理数据项。然后,这些组被合并回去,同时保持顺序,这样会损害性能。以下操作符生成不排序的操作:Distinct,Except,Intersect,Union,Join,GroupBy,GroupJoin,ToLooku。在这些操作后强调排序,只需要再次使用AsOrdered方法。