概念
任务并行库(Task Parallel Library,TPL),在.NET Framework 4.0引入,4.5有所改进。我们可以把线程池认为是一个抽象层,其隐藏使用线程的细节;TPL可被认为是线程池上的又一个抽象层,其隐藏了与线程池交互的底层代码,并提供更方便的API。
更多内容
TPL核心概念是任务。一个任务代表一个异步操作,该操作可以通过多种方式运行,可以使用或不使用独立线程运行。TPL向用户隐藏任务的实现细节从而创建一个抽象层,我们无须知道任务实际是如何执行的。但如果不正确使用,将会导致诡异的错误。
一个任务可以通过多种方式与其他任务组合,其用于组合任务的便利的API也是一个与其他模式(APM,EAP)的一个关键优势。
处理任务中的异常:一个任务可能有多个任务组成,多个任务又有自己的子任务,通过AggregateException来捕获底层任务的所有异常,并允许单独处理这些异常。
准备工作
使用.NET Framework 4.5以上版本。
使用using
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static System.Threading.Thread;
using System.Threading;
创建任务
Task.Run创建任务,再使用Start开启任务;Task.Factory.StartNew创建任务后会立即启动。当创建任务时标记任务为长时间运行TaskCreationOptions.LongRunning,将不会使用线程池线程,而是另开线程。
class Program
{
static void Main(string[] args)
{
Task t1 = new Task(() => TaskMethod("Task1"));
Task t2 = new Task(() => TaskMethod("Task2"));
t1.Start();
t2.Start();
Task.Run(() => TaskMethod("Task3"));
Task.Factory.StartNew(() => TaskMethod("Task4"));
Task.Factory.StartNew(() => TaskMethod("Task5"), TaskCreationOptions.LongRunning);//标记该任务为长时间运行任务,将不会使用线程池线程
Console.ReadLine();
}
static void TaskMethod(string name)
{
Console.WriteLine($"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
}
}
使用任务执行基本操作
创建Task<int>任务,使用任务对象.Result属性来获取该任务返回值,在在任务返回值之前,主线程会阻塞等待完成。使用Task.IsCompleted来轮询任务状态,通过Task.RunAsynchornously()方法,来指定任务运行在主线程。
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Task1 result:"+ TaskMethod("Task1"));
Task<int> task = CreatTask("Task2");
task.Start();
Console.WriteLine("Task2 result:" + task.Result);
task = CreatTask("Task3");
task.RunSynchronously();//运行在主线程
Console.WriteLine("Task3 result:" + task.Result);
task = CreatTask("Task4");
Console.WriteLine(task.Status) ;
task.Start();
while (!task.IsCompleted)
{
Console.WriteLine(task.Status);
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Console.WriteLine(task.Status);
Console.WriteLine("Task4 result:" + task.Result);
Console.ReadLine();
}
static Task<int> CreatTask(string name)
{
return new Task<int>(()=>TaskMethod(name));
}
static int TaskMethod(string name)
{
Console.WriteLine($"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
Thread.Sleep(TimeSpan.FromSeconds(2));
return 42;
}
}
组合任务
创建一个任务,再给他创建一个后续任务(Task.ContinueWith),先执行该任务,完成后才能执行后续任务。TaskContinuationOptions.OnlyOnRanToCompletion属性指定该延续任务只能在前面任务完成后才能执行;TaskContinuationOptions.ExecuteSynchronously属性指定该延续任务会工作在创建此延续任务的线程上执行。
创建任务时指定TaskCreationOptions.AttachedToParent属性,指定该任务为父子任务,只有当所有子任务完成,父任务才算完成。
class Program
{
static void Main(string[] args)
{
var t1 = new Task<int>(() => TaskMethod("task1", 3));
var t2 = new Task<int>(() => TaskMethod("task2", 2));
t1.ContinueWith(t => Console.WriteLine($"The first answer is {t.Result}. Thread id :{CurrentThread.ManagedThreadId},is thread pool thread:{CurrentThread.IsThreadPoolThread}"), TaskContinuationOptions.OnlyOnRanToCompletion);//该属性指定该延续任务只能在前面任务完成后才能执行
t1.Start();
t2.Start();
Thread.Sleep(TimeSpan.FromSeconds(4));
Task continuation = t2.ContinueWith(t => Console.WriteLine($"The first answer is {t.Result}. Thread id :{CurrentThread.ManagedThreadId},is thread pool thread:{CurrentThread.IsThreadPoolThread}"), TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);//ExecuteSynchronously属性指定该延续任务会工作在创建此延续任务的线程上执行
continuation.GetAwaiter().OnCompleted(() => Console.WriteLine($"Continuation Task Completed! Thread id is: {CurrentThread.ManagedThreadId}. Is thread pool thread: {CurrentThread.IsThreadPoolThread}"));
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine();
t1 = new Task<int>(() =>
{
var innerTask = Task.Factory.StartNew(() => TaskMethod("second Task", 5), TaskCreationOptions.AttachedToParent);
innerTask.ContinueWith(t => TaskMethod("Third Task", 2), TaskContinuationOptions.AttachedToParent);
return TaskMethod("First Task", 2);
});
t1.Start();
while (!t1.IsCompleted)
{
Console.WriteLine(t1.Status);
Thread.Sleep(TimeSpan.FromSeconds(0.5));
}
Console.WriteLine(t1.Status);
Console.ReadLine();
Console.ReadLine();
}
static int TaskMethod(string name, int seconds)
{
Console.WriteLine($"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
return seconds * 42;
}
}
APM(异步编程)模式转任务
将APM API转换为任务。代码示例Task.Factory.FromAsync三种重载方法。
class Program
{
static void Main(string[] args)
{
int threadId;
AsynchronouTask a = Test;
IncompatibleAsynchronousTask c = Test;
Console.WriteLine("Option1");
Task<string> task1 = Task<string>.Factory.FromAsync(a.BeginInvoke("Async 1",callBack,"async1 state"),a.EndInvoke);
task1.ContinueWith(t => Console.WriteLine($"Callback is finished. now running a continuation!result:{t.Result}"));
while (!task1.IsCompleted)
{
//Console.WriteLine(task1.Status);
}
Console.WriteLine(task1.Status);
Console.WriteLine("----------------------------");
Console.WriteLine();
Console.WriteLine("Option2");
Task<string> task2 = Task<string>.Factory.FromAsync(a.BeginInvoke,a.EndInvoke,"Async 2","async2 state");
task2.ContinueWith(t => Console.WriteLine($"Callback is finished. now running a continuation!result:{t.Result}"));
while (!task2.IsCompleted)
{
//Console.WriteLine(task2.Status);
}
Console.WriteLine(task2.Status);
Console.WriteLine("----------------------------");
Console.WriteLine();
Console.WriteLine("Option3");
IAsyncResult ar = c.BeginInvoke(out threadId,callBack,"async3 state");
Task<string> task3 = Task<string>.Factory.FromAsync(ar,_=>c.EndInvoke(out threadId,ar));
task2.ContinueWith(t => Console.WriteLine($"Callback is finished. now running a continuation!result:{t.Result}"));
while (!task3.IsCompleted)
{
//Console.WriteLine(task3.Status);
}
Console.WriteLine(task3.Status);
Console.ReadLine();
}
delegate string AsynchronouTask(string threadName);
delegate string IncompatibleAsynchronousTask(out int threadId);
static void callBack(IAsyncResult ar)
{
Console.WriteLine("Starting callback……");
Console.WriteLine("State paseed to a callback:"+ar.AsyncState);
Console.WriteLine("Is thread pool thread:"+CurrentThread.IsThreadPoolThread);
}
static string Test(string threadName)
{
Console.WriteLine("TN: Starting Test……");
Console.WriteLine("Is thread pool thread:"+CurrentThread.IsThreadPoolThread);
Sleep(2000);
CurrentThread.Name = threadName;
return "Thread name is:" + CurrentThread.Name;
}
static string Test(out int threadId)
{
Console.WriteLine("TI: Starting Test……");
Console.WriteLine("Is thread pool thread:" + CurrentThread.ManagedThreadId);
Sleep(2000);
threadId = CurrentThread.ManagedThreadId;
return "Thread Id is:" + threadId;
}
}
EAP(基于事件的异步操作)模式转任务
class Program
{
static void Main(string[] args)
{
var tcs = new TaskCompletionSource<int>();
var worker = new BackgroundWorker();
worker.DoWork += (sender, evenArgs) =>
{
evenArgs.Result = TaskMethod("BackgroundWorker",3);
};
worker.RunWorkerCompleted += (sender, evenArgs) =>
{
if (evenArgs.Error != null)
{
tcs.SetException(evenArgs.Error);
}
else if (evenArgs.Cancelled)
{
tcs.SetCanceled();
}
else
{
tcs.SetResult((int)evenArgs.Result);
}
};
worker.RunWorkerAsync();
int result = tcs.Task.Result;
Console.WriteLine(result);
Console.ReadLine();
}
static int TaskMethod(string name,int seconds)
{
Console.WriteLine($"Task {name} on a thread id:{CurrentThread.ManagedThreadId},is thread pool thread:{CurrentThread.IsThreadPoolThread}");
Sleep(TimeSpan.FromSeconds(seconds));
return seconds * 42;
}
}
实现取消选项
取消基于任务的异步操作。
class Program
{
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
var longTask = new Task<int>(() =>
TaskMethod("Task1", 10, cts.Token), cts.Token);
Console.WriteLine(longTask.Status);
cts.Cancel();
//longTask.Start();//异常
Console.WriteLine(longTask.Status);
Console.WriteLine("First task hai been cancelled before execution.");
cts = new CancellationTokenSource();
longTask = new Task<int>(() =>
TaskMethod("Task2", 10, cts.Token), cts.Token);
longTask.Start();
for (int i = 0; i < 5; i++)
{
Thread.Sleep(TimeSpan.FromSeconds(0.5));
Console.WriteLine(longTask.Status);
}
cts.Cancel();
Console.WriteLine("Task2 is canceled.");
for (int i = 0; i < 5; i++)
{
Thread.Sleep(TimeSpan.FromSeconds(0.5));
Console.WriteLine(longTask.Status);
}
Console.WriteLine($"A task has been completed with result {longTask.Result}");
Console.ReadLine();
}
static int TaskMethod(string name, int seconds, CancellationToken token)
{
Console.WriteLine($"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
for (int i = 0; i < seconds; i++)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
if (token.IsCancellationRequested)
return -1;
}
return seconds * 42;
}
}
并行运行任务
借助Task.WhenAll方法,创建第三个方法,该方法在所有任务完成之后执行,并返回一个结果集;使用Task.WhenAny方法,任务列表中每完成一个任务,就从列表删除并等待其他任务完成。可以使用一个任务来计时,超时取消其他所有任务,来模拟超时管理。
class Program
{
static void Main(string[] args)
{
var t1 = new Task<int>(() => TaskMethod("Task1", 2));
var t2 = new Task<int>(() => TaskMethod("Task2", 3));
var whenAllTask = Task.WhenAll(t1, t2);
whenAllTask.ContinueWith(t => Console.WriteLine($"The first task answer is {t.Result[0]}. The second task answer is {t.Result[1]}"), TaskContinuationOptions.OnlyOnRanToCompletion);
t1.Start();
t2.Start();
Thread.Sleep(TimeSpan.FromSeconds(5));
var tasks = new List<Task<int>>();
for (int i = 0; i < 4; i++)
{
int counter = i;
var task = new Task<int>(() => TaskMethod("Tasks:Task " + counter, counter));
tasks.Add(task);
task.Start();
}
while (tasks.Count > 0)
{
var completedTask = Task.WhenAny(tasks).Result;
tasks.Remove(completedTask);
Console.WriteLine($"A task has been completed with result {completedTask.Result}");
}
Console.ReadLine();
}
static int TaskMethod(string name, int seconds)
{
Console.WriteLine($"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
return seconds * 42;
}
}
UI访问线程
第一个按钮Sync点击后整个界面卡死,一段时间后返回错误“调用线程无法访问此对象。另一个线程拥有该对象。”,因为我们不允许从创建UI的线程之外的线程访问UI;第二个按钮Async点击后,界面没有卡死,但依然会报错;第三个按钮AsyncOk点击后,可以成功返回结果,并且等待过程中,界面依然可以相应其他事件。
WPF XAML:
<Window x:Class="TaskSchedulerTest.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
xmlns:local="clr-namespace:TaskSchedulerTest"
mc:Ignorable="d"
Title="MainWindow" Height="350" Width="525">
<Grid>
<TextBlock Name="ContentTextBlock" HorizontalAlignment="Left" Margin="44,134,0,0" VerticalAlignment="Top"
Width="425" Height="40"/>
<Button Name="btnSync" Content="Sync" HorizontalAlignment="Left" Margin="45,190,0,0" VerticalAlignment="Top" Width="75" Click="btnSync_Click"/>
<Button Name="btnAsync" Content="Async" HorizontalAlignment="Left" Margin="165,190,0,0" VerticalAlignment="Top" Width="75" Click="btnAsync_Click"/>
<Button Name="btnAsyncOk" Content="AsyncOk" HorizontalAlignment="Left" Margin="285,190,0,0" VerticalAlignment="Top" Width="75" Click="btnAsyncOk_Click"/>
</Grid>
</Window>
后台程序:
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
}
private void btnSync_Click(object sender, RoutedEventArgs e)
{
ContentTextBlock.Text = string.Empty;
try
{
string result = TaskMethod().Result;
ContentTextBlock.Text = result;
}
catch (Exception ex)
{
ContentTextBlock.Text = ex.InnerException.Message;
}
}
private void btnAsync_Click(object sender, RoutedEventArgs e)
{
ContentTextBlock.Text = string.Empty;
Mouse.OverrideCursor = Cursors.Wait;
Task<string> task = TaskMethod();
task.ContinueWith(t=>
{
ContentTextBlock.Text = t.Exception.InnerException.Message;
Mouse.OverrideCursor = null;
}, CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted,TaskScheduler.FromCurrentSynchronizationContext());
}
private void btnAsyncOk_Click(object sender, RoutedEventArgs e)
{
ContentTextBlock.Text = string.Empty;
Mouse.OverrideCursor = Cursors.Wait;
Task<string> task = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext());
task.ContinueWith(t=>Mouse.OverrideCursor=null,CancellationToken.None,TaskContinuationOptions.None,TaskScheduler.FromCurrentSynchronizationContext());
}
private Task<string> TaskMethod()
{
return TaskMethod(TaskScheduler.Default);
}
private Task<string> TaskMethod(TaskScheduler scheduler)
{
Task delay = Task.Delay(TimeSpan.FromSeconds(5));
return delay.ContinueWith(d=>
{
string str = $"Task is running on thread id :{CurrentThread.ManagedThreadId}. Is thread pool: {CurrentThread.IsThreadPoolThread}";
ContentTextBlock.Text = str;
return str;
},scheduler);
}
}
注:本文是在阅读《C#多线程编程实战》后所写,部分内容引用该书内容,这是一本不错的书,感谢!