进程和线程
不同程序执行需要进行调度和独立的内存空间
在单核计算机中,CPU 是独占的,内存是共享的,这时候运行一个程序的时候是没有问题。但是运行多个程序的时候,为了不发生一个程序霸占整个 CPU 不释放的情况(如一个程序死循环无法结束了,那么其他程序就没有机会运行了),就需要开发者给不同程序划分不同的执行时间。为了避免不同程序之间互相操作数据或代码,导致程序被破坏的情况,就需要开发者给程序划分独立的内存范围。也就是程序需要开发者进行调度以及和划分独立的内存空间。
进程是应用程序的一个实例
为了避免每个开发者来进行这个工作,所以有了操作系统。操作系统负责整个计算机的程序调度,让每个程序都有机会使用CPU,同时使用来进程来为程序维护一个独立虚拟空间,确保程序间的运行不会互相干扰。所以进程就是程序的一个实例,拥有程序需要使用的资源集合,确保自己的资源不会被其他进程破坏。
线程是操作系统进行调度的最小单位
这时候一个进程一次只能处理一个任务,如果需要一边不停输出 hellowork,一边计时,那么需要启动两个进程。如果需要对一个队列同时入队出队,那么不仅需要两个进程,还需要两个进程可以访问相同的内存空间。所以为了进程可以并发地处理任务,同时共享相同的资源,就需要给进程一个更小的调度单位,也就是线程,因此,线程也叫轻量化进程。所以在现代计算机中,操作继续不会直接调度进程在 CPU 上执行,而是调度线程在 CPU 上执行,所以说,线程是操作系统进行调度的最小单位。
线程操作
新建线程、启动线程、线程优先级
public void Test()
{
var t = new Thread(() => { }); // 使用无参委托
var t2 = new Thread(state => { }); // 使用 object? 参数委托
var t3 = new Thread(DoWork);
var t4 = new Thread(DoWork2);
t.Priority = ThreadPriority.Highest; // 设置线程的优先级,默认是 ThreadPriority.Normal
t.Start(); // 不传入参数,启动线程
t2.Start("参数"); // 传入参数,启动线程
void DoWork() {}
void DoWork2(object? state) {}
}
阻塞线程的执行
- 当线程调用 Sleep() 或者等待锁时,进入阻塞状态。
public void Test()
{
var pool = new SemaphoreSlim(0, 1);
var t = new Thread(DoWork);
var t2 = new Thread(DoWork2);
t.Start();
t2.Start();
void DoWork()
{
pool.Wait(); // 等待信号量
}
void DoWork2()
{
Thread.Sleep(Timeout.Infinite); // 永久休眠
}
}
- Thread.Sleep() 不仅用于休眠,也可以用于让出当前 CPU 时间,让其他正在等待 CPU 的线程也有机会抢到 CPU 时间。
tip:相似的方法,Thread.Yield() 也有让出 CPU 时间的功能。
tip:不同的方法,Thread.SpinWait() 不会让出 CPU 控制权,而是进行自旋。
Thread.Sleep(0) 让出控制权给同等优先级的线程执行,如果没有,就继续执行本线程。
Thread.Sleep(1) 让出控制权给正在等待的线程执行。
Thread.Yield() 让出控制权给CPU上的其他线程。
Thread.SpinWait() 不让出控制权,在CPU上自旋一段时间。
中断阻塞中的线程
当线程处于阻塞状态时,其他线程调用阻塞线程的 Thread.Interrupt() 时,会中断线程并抛出 System.Threading.ThreadInterruptedException。
tip:如果线程没有处于阻塞状态,那么调用 Thread.Interrupt() 则不会有效果。
public void Test3()
{
var sleepSwitch = false;
var pool = new SemaphoreSlim(0, 1);
var t = new Thread(DoWork);
t.Start();
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 调用 {t.ManagedThreadId} 的 Interrupt()");
t.Interrupt();
Thread.Sleep(3000);
sleepSwitch = true;
var t2 = new Thread(DoWork2);
t2.Start();
Thread.Sleep(2000);
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 调用 {t2.ManagedThreadId} 的 Interrupt()");
t2.Interrupt();
void DoWork()
{
try
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: 开始执行");
while (!sleepSwitch)
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: 自旋 SpinWait()");
Thread.SpinWait(10000000); // 只是进行自旋,不阻塞线程,所以不会被中断
}
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: 休眠 Sleep()");
Thread.Sleep(Timeout.Infinite);
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
void DoWork2()
{
try
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: 开始执行");
pool.Wait();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
}
取消线程的执行
取消正在执行中或者阻塞中的线程有多种方法
- 调用 Thread.Interrupt() 中断线程
- 调用 CancellationTokenSource.Cancel() 或者超时取消
- 通过 WaitHandle 超时取消
- 取消正在执行的线程
/// <summary>
/// 使用 CancellationToken 取消处于死循环的线程,或者超时取消
/// </summary>
public void Test2()
{
var cts = new CancellationTokenSource(5000);
Task.Run(() =>
{
Console.WriteLine("按下 c 取消线程,或者五秒后取消");
if (Console.ReadKey().Key == ConsoleKey.C)
{
cts.Cancel();
}
});
var t = new Thread(DoWork);
t.Start(cts.Token);
void DoWork(object? state)
{
var ct = (CancellationToken)state;
while (!ct.IsCancellationRequested)
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 自旋");
Thread.SpinWait(10000000);
}
Console.WriteLine("结束执行");
}
}
- 取消正在阻塞或者执行的线程
/// <summary>
/// 使用 WaitHandle.WaitAny 取消被阻塞的线程,或者超时取消,或者使用 CancellationToken 协助式取消
/// </summary>
public void Test3()
{
var pool = new Semaphore(0, 1);
var cts = new CancellationTokenSource();
Task.Run(() =>
{
Console.WriteLine("按下 c 调用 CancellationTokenSource.Cancel() 取消线程,或者按下 v 调用 Semaphore.Release() 取消线程,或者五秒后取消");
switch (Console.ReadKey().Key)
{
case ConsoleKey.C:
cts.Cancel();
break;
case ConsoleKey.V:
pool.Release();
break;
}
if (Console.ReadKey().Key == ConsoleKey.C)
{
cts.Cancel();
}
});
var t = new Thread(DoWork);
t.Start();
void DoWork()
{
var signalIndex = WaitHandle.WaitAny(new WaitHandle[] { pool, cts.Token.WaitHandle }, 5000);
if (signalIndex == 0)
{
Console.WriteLine("调用 Semaphore.Release() 取消线程");
}
else if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("CancellationTokenSource.Cancel() 取消线程");
}
else if (signalIndex == WaitHandle.WaitTimeout)
{
Console.WriteLine("超时取消");
}
Console.WriteLine("结束运行");
}
}
线程异常和线程返回值
当调用 Thread.Abort() 或者 Thread.Interrupt() 就会抛出异常,线程执行的代码也会抛出异常,所以线程出现异常是很常见的。
当直接新建线程并执行,或者调用 ThreadPool.QueueUserWorkItem() 使用线程池线程执行代码,出现未捕获的异常时,会导致程序崩溃。
在线程中执行方法,是无法直接知道方法是否执行完毕,或者得到返回值的。
避免未捕获异常导致程序崩溃或者得到在其他线程执行方法的返回值,所以可以使用 Task.Run() 来执行代码,Task 已经处理了未捕获异常,也可以直接得到返回值。
也可以使用委托包装一下线程执行的代码,变成一个能安全执行的代码。
internal class ThreadExceptionTest
{
public async void Test()
{
ThreadPool.QueueUserWorkItem(_ => ThreadThrowException()); // 未捕获异常导致程序崩溃
var t = new Thread(_ => ThreadThrowException()); // 未捕获异常导致程序崩溃
t.IsBackground = true;
t.Start();
var _ = Task.Run(ThreadThrowException); // 未捕获异常也不会导致程序崩溃
string? r = null;
Exception? e = null;
var t2 = new Thread(_ => SafeExecute(ThreadReturnValue, out r, out e)); // 通过委托获取返回值
t2.Start();
t2.Join();
Console.WriteLine(r);
var t3 = new Thread(_ => SafeExecute(ThreadThrowException, out r, out e)); // 通过委托处理异常
t3.Start();
t3.Join();
Console.WriteLine(e);
Console.WriteLine(await SafeExecute(ThreadReturnValue)); // 通过委托获取返回值
try
{
await SafeExecute(ThreadThrowException); // 通过委托处理异常
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
}
public string ThreadThrowException()
{
Thread.Sleep(1000);
throw new Exception("线程异常");
}
public string ThreadReturnValue()
{
Thread.Sleep(1000);
return "done";
}
/// <summary>
/// 捕获异常,并通过 out 获取返回值
/// </summary>
public void SafeExecute<T>(Func<T> func, out T? r, out Exception? e)
{
try
{
e = null;
r = func();
}
catch (Exception? exception)
{
r = default;
e = exception;
}
}
/// <summary>
/// 捕获异常,并通过 TaskCompletionSource 获取返回值
/// </summary>
public Task<T> SafeExecute<T>(Func<T> func)
{
var t = new TaskCompletionSource<T>();
try
{
t.TrySetResult(func());
}
catch (Exception e)
{
t.SetException(e);
}
return t.Task;
}
}
插槽和 ThreadStatic
.Net 提供了两种线程相关变量的方法。
- 插槽
Thread.AllocateDataSlot() Thread.AllocateDataSlot() 可以给方法设置一个线程插槽,插槽里面的值是线程相关的,也就是每个线程特有的,同一个变量不同线程无法互相修改。一般在静态构造方法中初始化。
Thread.GetData() Thread.SetData() 可以对插槽取值和赋值。
插槽是动态的,在运行时进行赋值的,而且 Thread.GetData() 返回值是 object,如果线程所需的值类型不固定,可以使用插槽。 - ThreadStaticAttribute
ThreadStaticAttribute 标记静态变量时,该变量是线程相关的,不同线程的静态变量值是不一样的。
[ThreadStatic] IDE 可以提供编译检查,性能和安全性更好,如果线程所需的值类型是固定的,就应该使用 [ThreadStatic]。
tip: 插槽和 [ThreadStatic] 中的值一般不初始化,因为跟线程相关,在哪个线程初始化,只有那个线程可以看到这个初始化后的值,所以初始化也就没啥意义了。
internal class ThreadDemo
{
/// <summary>
/// 测试 ThreadStaticAttribute
/// </summary>
public void Test()
{
Parallel.Invoke(StaticThreadDemo.Test, StaticThreadDemo.Test, StaticThreadDemo.Test); // 打印对应线程的ID,证明被 [ThreadStatic] 标记过的字段是线程相关的。
}
/// <summary>
/// 测试 LocalDataStoreSlot
/// </summary>
public void Test2()
{
Parallel.Invoke(StaticThreadDemo.Test2, StaticThreadDemo.Test2, StaticThreadDemo.Test2); // 打印对应线程的ID,证明 LocalDataStoreSlot 是线程相关的。
}
}
static class StaticThreadDemo
{
[ThreadStatic]
private static int? _threadId = null;
public static void Test()
{
_threadId = Thread.CurrentThread.ManagedThreadId;
Thread.Sleep(500);
Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId} ThreadStatic: {_threadId}");
}
private static LocalDataStoreSlot _localSlot;
static StaticThreadDemo()
{
_localSlot = Thread.AllocateDataSlot();
}
public static void Test2()
{
Thread.SetData(_localSlot, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(500);
Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId} LocalSlot:{Thread.GetData(_localSlot)}");
}
}
线程池操作
线程需要维护自己的栈和上下文,新建线程是有空间(一个线程大概需要 1M 内存)和时间(CPU 切换线程的时间)上的开销的,所以一般不会手动新建线程并执行代码,而是把代码交给线程池操作,线程池会根据电脑的 CPU 核数初始化线程数量,根据线程忙碌情况新增线程。
Task.Run() 最终也是通过线程池执行异步操作的。
让线程池里的线程执行代码
ThreadPool.QueueUserWorkItem((state) => { Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}"); });
使用 WaitHandle 控制线程池代码的执行
ThreadPool.RegisterWaitForSingleObject() 提供了一种方法,传入一个 WaitHandle 子类或者定时执行线程池的代码。
internal class ThreadPoolDemo
{
public void Test()
{
var ti = new TaskInfo
{
Info = "其他信息"
};
var are = new AutoResetEvent(false);
var handle = ThreadPool.RegisterWaitForSingleObject(are, DoWork, ti, 2000, false); // 定时 2s 执行
//var handle = ThreadPool.RegisterWaitForSingleObject(are, DoWork, ti, Timeout.Infinite, false); // 也可以不定时执行
ti.WaitHandle = handle;
Thread.Sleep(3000);
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 重新 signal AutoResetEvent");
are.Set();
Thread.Sleep(2000);
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 第二次 signal AutoResetEvent");
are.Set(); // 调用后没有反应,证明 CallBack 已经被取消注册
void DoWork(object? state, bool timeout)
{
if (timeout)
{
Console.WriteLine("超时");
}
else
{
var taskInfo = (TaskInfo)state;
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 执行完毕,取消 Callback");
taskInfo.WaitHandle.Unregister(null); // 取消回调,不然会回调会一直循环执行,而且应该用 Unregister 来取消,只在构造函数里面指定 executeOnlyOnce:true 的话,可能会无法 gc 回调。
}
}
}
class TaskInfo
{
public RegisteredWaitHandle WaitHandle { get; set; }
public string Info { get; set; }
}
}
最后
其实日常开发都是用 Task,回顾一下 Thread 可以写出更加优秀的异步代码,下次回顾一下线程同步的知识。
源码 https://github.com/yijidao/blog/tree/master/TPL/ThreadDemo/ThreadDemo3