[.NET] 自己实现任务池(模仿线程池)

时间:2021-02-19 18:02:04

线程池虽然好用,但限制也不少:

(1)总觉得默认的 MaxThread 小了一点,每次使用都要手工调大= =

(2)任务不能等待完成

(3)任务一旦加入不能取消,甚至不知道是正在排队/正在执行/执行完毕

(4)最恶心的一点,全是 MTAThread,好多COM都不能用。ClipBoard、WebBrowser ...

实在不能忍了,自己写了个“任务池”,模拟线程池的功能。不同点有:

(1)没有数量上限,新进的任务不需要排队(但任务太多可能影响系统性能)

(2)任务的创建和开始可以分开,也可以创建时就开始。

(3)任务可等待

(4)任务可强制取消,虽然非常非常非常非常非常不建议这么做,这可能造成不可预知的问题

(5)可以选择任务线程的 ApartmentState (MTA / STA / Unknown = 无所谓)

相同点在于:

(1)保证(近乎绝对的)线程安全,逻辑上没有线程不安全的地方

(2)线程可重用

不足之处在于:

(1)还是应该添加限制线程数目的功能,避免对系统性能造成太大影响

(2)任务应该支持“尝试取消”操作

(3)有些 TTask 类里的字段应该对用户只读,但语法上实在做不到= =(没有 friend 关键字)

 

Version 1.0

  1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.Collections.ObjectModel;
5 using System.Linq;
6 using System.Net.Mime;
7 using System.Runtime.CompilerServices;
8 using System.Text;
9 using System.Threading;
10
11
12 // ReSharper Disable InconsistentNaming
13
14 namespace Efficiency.Core {
15
16
17 //+ delegate TTaskDelegate
18 public delegate void TTaskDelegate(TTask task, object param);
19
20 //+ class TTask
21 public class TTask {
22
23 public readonly AutoResetEvent Event = new AutoResetEvent(false);
24 public readonly ApartmentState ApartmentState;
25
26 public readonly TTaskDelegate Callback;
27 public readonly object Param;
28
29 public Thread Thread { get; set; }
30
31 //-- .ctor()
32 public TTask(TTaskDelegate callback, object param, ApartmentState state)
33 {
34 this.ApartmentState = state;
35 this.Param = param;
36 this.Callback = callback;
37 }
38
39
40 public int Status = 0;
41 // 0: Working...
42 // 1: Marked Finished
43 // 2: Marked Force Exited
44 public bool IsFinished => (this.Status != 0);
45
46 public bool Wait(int timeout = -1) => this.Event.WaitOne(timeout);
47
48
49 //-- void ForceExit()
50 public void ForceExit()
51 {
52 if (Interlocked.CompareExchange(ref this.Status, 2, 0) != 0) return;
53 try {
54 this.Thread.Abort();
55 }
56 catch {
57 // Ignored
58 }
59 }
60
61 //-- void ForceExit(out Exception)
62 public bool ForceExit(out Exception exception)
63 {
64 exception = null;
65 if (Interlocked.CompareExchange(ref this.Status, 2, 0) != 0) return true;
66 try {
67 this.Thread.Abort();
68 return true;
69 }
70 catch(Exception ex) {
71 exception = ex;
72 return false;
73 }
74 }
75 } // class TTask
76
77
78 //+ class TTaskPool
79 public static class TTaskPool {
80
81
82 //+ class TTaskPool.TTaskThreadContext
83 private class TTaskThreadContext {
84 public static volatile int CanExit = 0;
85
86 public readonly AutoResetEvent WaitEvent = new AutoResetEvent(false);
87 public readonly Queue<Thread> ThreadQueue;
88 public TTask Task;
89
90 //-- .ctor()
91 public TTaskThreadContext(Queue<Thread> threadQueue, TTask task)
92 {
93 this.ThreadQueue = threadQueue;
94 this.Task = task;
95 }
96
97 } // class TTaskPool.TTaskThreadContext
98
99
100
101 private static readonly Queue<Thread> m_STAQueue = new Queue<Thread>();
102 private static readonly ReaderWriterLock m_STARWLock = new ReaderWriterLock();
103 private static readonly Dictionary<Thread, TTaskThreadContext> m_STAContext =
104 new Dictionary<Thread, TTaskThreadContext>();
105
106 private static readonly Queue<Thread> m_MTAQueue = new Queue<Thread>();
107 private static readonly ReaderWriterLock m_MTARWLock = new ReaderWriterLock();
108 private static readonly Dictionary<Thread, TTaskThreadContext> m_MTAContext =
109 new Dictionary<Thread, TTaskThreadContext>();
110
111
112 private static int s_PeakSTATaskCount = 0;
113 public static int PeakSTATaskCount => s_PeakSTATaskCount;
114
115 private static int s_PeakMTATaskCount = 0;
116 public static int PeakMTATaskCount => s_PeakMTATaskCount;
117
118
119 public static int PeakTaskCount => PeakMTATaskCount + PeakSTATaskCount;
120
121
122 //-- TTask CreateTask(TTaskDelegate, object)
123 [MethodImpl(0x100)]
124 public static TTask CreateTask(
125 TTaskDelegate callback,
126 object param = null,
127 ApartmentState state = ApartmentState.Unknown)
128 {
129 if (callback == null) {
130 throw new ArgumentNullException(nameof(callback));
131 }
132 return new TTask(callback, param, state);
133 }
134
135
136 //-- bool TryInsertTask(TTask, ThreadQueue<Thread>, Dictionary<Thread, TTaskThreadContext>, ReaderWriterLock)
137 [MethodImpl(0x100)]
138 private static bool TryInsertTask(
139 TTask task,
140 Queue<Thread> queue,
141 Dictionary<Thread, TTaskThreadContext> dict,
142 ReaderWriterLock rwlock,
143 ApartmentState state,
144 bool force)
145 {
146 Thread thread = null;
147 TTaskThreadContext context;
148 bool isNew;
149 lock (queue) {
150 if (queue.Count == 0) {
151 if (! force) return false;
152 isNew = true;
153 }
154 else {
155 thread = queue.Dequeue();
156 isNew = false;
157 }
158 }
159
160 if (isNew) {
161 thread = new Thread(TTaskThreadRoutine);
162 thread.SetApartmentState(state);
163 thread.IsBackground = true;
164 if (state == ApartmentState.STA)
165 Interlocked.Increment(ref s_PeakSTATaskCount);
166 else
167 Interlocked.Increment(ref s_PeakMTATaskCount);
168
169 context = new TTaskThreadContext(queue, task);
170 rwlock.AcquireWriterLock(-1);
171 dict.Add(thread, context);
172 rwlock.ReleaseWriterLock();
173 thread.Start(context);
174 }
175 else {
176 rwlock.AcquireReaderLock(-1);
177 context = dict[thread];
178 rwlock.ReleaseReaderLock();
179 context.Task = task;
180 context.WaitEvent.Set();
181 }
182
183 return true;
184 }
185
186
187 //-- void InsertTask(TTask)
188 public static void InsertTask(TTask task)
189 {
190 if (task == null) {
191 throw new ArgumentNullException(nameof(task));
192 }
193
194 switch (task.ApartmentState) {
195 case (ApartmentState.STA):
196 TryInsertTask(task, m_STAQueue, m_STAContext, m_STARWLock, ApartmentState.STA, true);
197 break;
198
199 case (ApartmentState.MTA):
200 TryInsertTask(task, m_MTAQueue, m_MTAContext, m_MTARWLock, ApartmentState.MTA, true);
201 break;
202
203 default:
204 if (TryInsertTask(task, m_MTAQueue, m_MTAContext, m_MTARWLock, ApartmentState.MTA, false)) return;
205 if (TryInsertTask(task, m_STAQueue, m_STAContext, m_STARWLock, ApartmentState.STA, false)) return;
206 TryInsertTask(task, m_MTAQueue, m_MTAContext, m_MTARWLock, ApartmentState.MTA, true);
207 break;
208 }
209
210 }
211
212
213 //-- TTask CreateInsertTask(STATaskDelegate, object, ApartmentState)
214 [MethodImpl(0x100)]
215 public static TTask CreateInsertTask(
216 TTaskDelegate callback,
217 object param = null,
218 ApartmentState state = ApartmentState.Unknown)
219 {
220 if (callback == null) {
221 throw new ArgumentNullException(nameof(callback));
222 }
223 TTask task = new TTask(callback, param, state);
224 InsertTask(task);
225 return task;
226 }
227
228
229 //-- void TTaskThreadRoutine(object)
230 private static void TTaskThreadRoutine(object threadContext)
231 {
232 TTaskThreadContext context = (TTaskThreadContext)threadContext;
233 Thread thisThread = Thread.CurrentThread;
234 while (true) {
235 if (TTaskThreadContext.CanExit != 0) return;
236 thisThread.Priority = ThreadPriority.Normal;
237
238 TTask task = context.Task;
239 task.Callback.Invoke(task, task.Param);
240 task.Thread = thisThread;
241 thisThread.IsBackground = true;
242
243 task.Event.Set();
244 if (Interlocked.CompareExchange(ref task.Status, 1, 0) == 2) {
245 try {
246 thisThread.Abort();
247 }
248 catch {
249 // Ignored
250 }
251 }
252 if (TTaskThreadContext.CanExit != 0) return;
253
254 lock (context.ThreadQueue) {
255 context.ThreadQueue.Enqueue(thisThread);
256 }
257 context.WaitEvent.WaitOne(-1);
258 }
259 }
260
261
262 } // class TTaskPool
263
264 } // namespace Efficiency.Core