这次的例子,是一个很简单的控制台,她将面对瞬间提交的百万的数据,而面不改色(CPU、内存非常平稳),队列中始终只保存最新的数据,每次只处理cpu 个数据(我的机器是双核的,所以,在我这里,就是每个CPU一个线程,真正的并行运行哦....),OK不废话,进入正题:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using Microsoft.Ccr.Core; namespace CCRDemo1 { class Program { static void Main(string[] args) { int maxiQueueDepth = ; // step1: 创建一个Dispatcher对象 Dispatcher dispatcher = new Dispatcher(, "调度器名称"); // step2: 创建一个与step1创建对象关联的DispatcherQueue对象 DispatcherQueue depthThrottledQueue = new DispatcherQueue( "任务队列的名称", // 关联到该队列的调度器 dispatcher, // 队列保存数据的策略:保存最近消息策略 TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks, // 队列的深度 maxiQueueDepth ); // step3: 创建一个能够接收整型数据的Port Port<int> intPort = new Port<int>(); // step4: 把Port与处理函数关联,然后再与DispatcherQueue关联 Arbiter.Activate(depthThrottledQueue, Arbiter.Receive(true, intPort, delegate(int i) // 这里用了一个匿名方法,作为处理函数 { Thread.Sleep(); Console.WriteLine("[{0}] {1}", DateTime.Now.ToString("o"), i); } ) ); // step5: 快速的提交大量的任务 Console.WriteLine("[{0}] 开始提交大量的任务", DateTime.Now.ToString("o")); for (int i = ; i < maxiQueueDepth * ; i++) { // 把数据Post到intPort内 intPort.Post(i); } Console.WriteLine("[{0}] 大量任务提交完毕。", DateTime.Now.ToString("o")); Console.WriteLine("Press any key to exit"); Console.ReadKey(); dispatcher.Dispose(); } } }
最后,我们要把数据、处理函数、任务队列 组合起来,这就是上面代码中的step4,这步其实做了2个工作:
step1. 为投递进来的元素创建一个容器。容器的类型(IPortElement)允许CCR在不知道元素类型的情况下将元素排队并将元素赋值给Task实例。
step2. 容器被放入队列。
step3. 如 果接收器列表不是null,并且其中有一个以上的接收器,port对象将会调用ReceiverTask.Evaluate方法来让接收器和它里面的仲裁 器层次检测元素是否可以被使用,在这个例子中,Evaluate方法将会返回true,并使用收到的元素和用户的delegate作为参数创建一个 Task<int>实例。
step4. port使用调用Evaluate方法返回的Task对象作为参数调用taskQueue.Enqueue,注意,当一个接收器是第一次被激活,它会被关联到由Arbiter.Activate方法提供的DispatcherQueue实例。
step1. DispatcherQueue向它所属的Dispatcher发信号,告诉Dispatcher一个新的任务可以被执行了。
step2. Dispatcher通知一个或者多个TaskExecutionWorker类型对象。每个TaskExecutionWorker对象管理一个操作系统线程。它将线程设置到一种高效的休眠状态,直到Dispatcher发出信号通知有元素可以被调度时。
step3. TaskExecutionWorker对象调用DispatcherQueue.Test方法从队列中获取一个任务。如果是可用的任务,TaskExecutionWorker对象则调用ITask.Execute。
step4. Task.Execute方法调用关联在task对象上的delegate,并将一个或者多个关联在task上的参数传递进去。
总之:在CCR中,线程池处理的任务,是由DispatcherQueue产生的;而DispathcerQueue有是根据用户线程通过Port或PortSet提交给的数据 和 初始化时指定的委托来产生任务的。因此可知影响任务调度的地方有3处:
using System; namespace Microsoft.Ccr.Core { public enum TaskExecutionPolicy { Unconstrained = , ConstrainQueueDepthDiscardTasks = , ConstrainQueueDepthThrottleExecution = , ConstrainSchedulingRateDiscardTasks = , ConstrainSchedulingRateThrottleExecution = , } }
1、ConstrainQueueDepthDiscardTasks 按队列深度丢弃最旧任务
2、ConstrainQueueDepthThrottleExecution 按照队列深度阻塞任务产生
3、ConstrainSchedulingRateDiscardTasks 按照固定速度处理消息且丢失未处理的最旧消息
4、ConstrainSchedulingRateThrottleExecution 按照固定速度处理消息且阻塞任务缠上
例子程序在运行的时候出现:数据会乱序 和 数据丢失的现象。(下面是例子程序的主要代码,与上一篇代码完全相同,此处贴出,是为了阅读方便)
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading; 6 using Microsoft.Ccr.Core; 7 8 namespace CCRDemo1 9 { class Program { static void Main(string[] args) { int maxiQueueDepth = ; // step1: 创建一个Dispatcher对象 Dispatcher dispatcher = new Dispatcher(, "调度器名称"); // step2: 创建一个与step1创建对象关联的DispatcherQueue对象 DispatcherQueue depthThrottledQueue = new DispatcherQueue( "任务队列的名称", // 关联到该队列的调度器 dispatcher, // 队列保存数据的策略:保存最近消息策略 TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks, // 队列的深度 maxiQueueDepth ); // step3: 创建一个能够接收整型数据的Port Port<int> intPort = new Port<int>(); // step4: 把Port与处理函数关联,然后再与DispatcherQueue关联 Arbiter.Activate(depthThrottledQueue, Arbiter.Receive(true, intPort, delegate(int i) // 这里用了一个匿名方法,作为处理函数 { Thread.Sleep(); Console.WriteLine("[{0}] {1}", DateTime.Now.ToString("o"), i); } ) ); // step5: 快速的提交大量的任务 Console.WriteLine("[{0}] 开始提交大量的任务", DateTime.Now.ToString("o")); for (int i = ; i < maxiQueueDepth * ; i++) { // 把数据Post到intPort内 intPort.Post(i); } Console.WriteLine("[{0}] 大量任务提交完毕。", DateTime.Now.ToString("o")); Console.WriteLine("Press any key to exit"); Console.ReadKey(); dispatcher.Dispose(); } } }
public sealed class Dispatcher : IDisposable { // 1、构造函数 public Dispatcher(); public Dispatcher(int threadCount, string threadPoolName); public Dispatcher(int threadCount, ThreadPriority priority, bool useBackgroundThreads, string threadPoolName); public Dispatcher(int threadCount, ThreadPriority priority, DispatcherOptions options, string threadPoolName); public Dispatcher(int threadCount, ThreadPriority priority, DispatcherOptions options, ApartmentState threadApartmentState, string threadPoolName); // 2、资源释放函数 public void Dispose(); // 3、属性 public static ICollection<Causality> ActiveCausalities { get; } public List<DispatcherQueue> DispatcherQueues { get; } public string Name { get; set; } public DispatcherOptions Options { get; set; } public int PendingTaskCount { get; set; } public long ProcessedTaskCount { get; set; } public static int ThreadsPerCpu { get; set; } public int WorkerThreadCount { get; set; } // 4、因果关系函数 public static void AddCausality(Causality causality); public static void AddCausalityBreak(); public static void ClearCausalities(); public static bool RemoveCausality(Causality causality); public static bool RemoveCausality(string name); }
1/// <summary> 2/// Constructs a Dispatcher instance using the default number of threads and no friendly tag 3/// 使用默认线程数和空名作为参数构造一个Dispatcher实例 4/// </summary> 5public Dispatcher() 6 : this(, null) 7{ 8} 9 10/// <summary> 11/// Constructs a Dispatcher instance. 12/// The instance is usable only after AddPort is called at least once 13/// 该实例仅在调用AddPort之后才生效 14/// </summary> 15/// <param name="threadCount"> 16/// Number of OS threads to use for processing CCR Tasks 17/// 处理CCR任务的操作系统线程数 18/// </param> 19/// <param name="threadPoolName"> 20/// Friendly name to use for the OS Threads and this dispatcher instance 21/// Dispatcher实例和系统线程的名称 22/// </param> 23public Dispatcher(int threadCount, string threadPoolName) 24 : this(threadCount, ThreadPriority.Normal, DispatcherOptions.None, threadPoolName) 25{ 26} 27 28/// <summary> 29/// Constructs a Dispatcher instance. 30/// The instance is usable only after AddPort is called at least once 31/// </summary> 32/// <param name="threadCount"> 33/// Number of OS threads to use for processing CCR Tasks 34/// </param> 35/// <param name="priority"> 36/// OS Thread priority to use for threads exexuting CCR tasks 37/// 系统线程执行CCR任务所使用的优先级 38/// </param> 39/// <param name="options"> 40/// Dispatcher scheduling options 41/// 调度选项 42/// </param> 43/// <param name="threadPoolName"> 44/// Friendly name to use for the OS Threads and this dispatcher instance 45/// </param> 46public Dispatcher(int threadCount, ThreadPriority priority, DispatcherOptions options, string threadPoolName) 47 : this(threadCount, priority, options, ApartmentState.Unknown, threadPoolName) 48{ 49} 50 51/// <summary> 52/// Constructs a Dispatcher instance. 53/// The instance is usable only after AddPort is called at least once 54/// </summary> 55/// <param name="threadCount"> 56/// Number of OS threads to use for processing CCR Tasks 57/// </param> 58/// <param name="priority"> 59/// OS Thread priority to use for threads exexuting CCR tasks 60/// </param> 61/// <param name="useBackgroundThreads"> 62/// If true, background threads are used, which do not prevent application exit 63/// 是否使用后台线程,若是用,则应用程序可以*的退出,而不用担心 64/// </param> 65/// <param name="threadPoolName"> 66/// Friendly name to use for the OS Threads and this dispatcher instance 67/// </param> 68public Dispatcher(int threadCount, ThreadPriority priority, bool useBackgroundThreads, string threadPoolName) 69 : this(threadCount, 70 priority, 71 useBackgroundThreads ? DispatcherOptions.UseBackgroundThreads : DispatcherOptions.None, 72 threadPoolName) 73{ 74} 75 76/// <summary> 77/// Constructs a Dispatcher instance. 78/// The instance is usable only after AddPort is called at least once 79/// </summary> 80/// <param name="threadCount"> 81/// Number of OS threads to use for processing CCR Tasks 82/// </param> 83/// <param name="priority"> 84/// OS Thread priority to use for threads exexuting CCR tasks 85/// </param> 86/// <param name="options"> 87/// Dispatcher scheduling options 88/// </param> 89/// <param name="threadApartmentState"> 90/// Thread apartment state. Use ApartmentState.Unknown when STA/MTA is not required for interop 91/// 线程单元状态,当COM interop不需要STA/MTA的时候可以使用ApartmentState.Unknown 92/// </param> 93/// <param name="threadPoolName"> 94/// Friendly name to use for the OS Threads and this dispatcher instance 95/// </param> 96public Dispatcher(int threadCount, ThreadPriority priority, DispatcherOptions options, 97 ApartmentState threadApartmentState, string threadPoolName) 98{ 99 this._startupCompleteEvent = new ManualResetEvent(false); this._dispatcherQueues = new List<DispatcherQueue>(); this._taskExecutionWorkers = new List<TaskExecutionWorker>(); this._nameToQueueTable = new Dictionary<string, DispatcherQueue>(); // 线程数 if (threadCount == ) {// 默认情况下 threadCount = Math.Max(NumberOfProcessorsInternal, ) * ThreadsPerCpu; } else if (threadCount < ) { throw new ArgumentException("Cannot create a negative number of threads. Pass 0 to use default.", "threadCount"); } if (threadPoolName == null) { this._name = string.Empty; } else { this._name = threadPoolName; } this._options = options; for (int i = ; i < threadCount; i++) { this.AddWorker(priority, threadApartmentState); } this.StartWorkers(); } /// <summary> /// Creates one TaskExecutionWorker instance associated with one OS thread /// 创建一个与线程想关联的TaskExecutionWorker实例 /// </summary> /// <remarks> /// This routine should only be called once per dispatcher instance /// 每个Dispatcher实例只能调用该函数一次 /// </remarks> private void AddWorker(ThreadPriority priority, ApartmentState apartmentState) { TaskExecutionWorker item = new TaskExecutionWorker(this); Thread thread = new Thread(new ThreadStart(item.ExecutionLoop)); thread.SetApartmentState(apartmentState); thread.Name = this._name; thread.Priority = priority; thread.IsBackground = DispatcherOptions.None < (this._options & DispatcherOptions.UseBackgroundThreads); item._thread = thread; this._taskExecutionWorkers.Add(item); this._cachedWorkerListCount++; }
1 /// <summary> 2 /// Stops all scheduling and disposes this dispatcher instance 3 /// 停止所有的调度并释放当前Dispatcher实例 4 /// </summary> 5 public void Dispose() 6 { 7 if (this._cachedWorkerListCount != ) 8 { 9 lock (this._taskExecutionWorkers) { foreach (TaskExecutionWorker worker in this._taskExecutionWorkers) { worker.Shutdown(); } this._cachedWorkerListCount = ; } if (this._startupCompleteEvent != null) { this._startupCompleteEvent.Close(); } this.Shutdown(true); } } /// <summary> /// Stops all dispatcher worker threads and cleans up the dispatcher /// 停止所有的工作线程并清理资源 /// </summary> private void Shutdown(bool wait) { //this.Dispose(); lock (this._taskExecutionWorkers) { this._hasShutdown = true; Monitor.PulseAll(this._taskExecutionWorkers); if (wait) { while (!this._hasShutdown) { Monitor.Wait(this._taskExecutionWorkers); } } } }
1、 ThreadPriority: 指定Thread 的调度优先级。
public enum ThreadPriority { // 可以将 System.Threading.Thread 安排在具有任何其他优先级的线程之后。 Lowest = , // 可以将 System.Threading.Thread 安排在具有 Normal 优先级的线程之后, // 在具有 Lowest 优先级的线程之前。 BelowNormal = , // 可以将 System.Threading.Thread 安排在具有 AboveNormal 优先级的线程之后, // 在具有 BelowNormal 优先级的线程之前。默认情况下,线程具有 // Normal 优先级。 Normal = , // 可以将 System.Threading.Thread 安排在具有 Highest 优先级的线程之后, // 在具有 Normal 优先级的线程之前。 AboveNormal = , // 可以将 System.Threading.Thread 安排在具有任何其他优先级的线程之前。 Highest = , }
2、DispatcherOptions: 线程池运行选项
public enum DispatcherOptions { None, /// <summary> /// 时候后台线程 /// </summary> UseBackgroundThreads, /// <summary> /// 按照CPU来分配线程 /// </summary> UseProcessorAffinity }
3、ApartmentState: 指定 System.Threading.Thread 的单元状态
public enum ApartmentState { // System.Threading.Thread 将创建并进入一个单线程单元。 STA = , // System.Threading.Thread 将创建并进入一个多线程单元。 MTA = , // 尚未设置 System.Threading.Thread.ApartmentState 属性。 Unknown = , }
二、预览 DispatcherQueue类内,主要实现了5部分的功能: 1、构造实例对象; public DispatcherQueue(); public DispatcherQueue(string name); public DispatcherQueue(string name, Dispatcher dispatcher); public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, double schedulingRate); public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, int maximumQueueDepth); 2、销毁相关资源;
public void Dispose(); protected virtual void Dispose(bool disposing);
public virtual bool Enqueue(ITask task); public virtual bool TryDequeue(out ITask task);
public virtual void Suspend(); public virtual void Resume();
public virtual Timer EnqueueTimer(TimeSpan timeSpan, Port<DateTime> timerPort); 这5个功能中,常用的是前面4个,因此本基础篇就只讲这4个功能相关的函数的使用和实现原理,最后一个功能不常用,计划与Dispatcher内的因果关系部分放在后面作为高级篇细说。
三、构造函数 DispatcherQueue类内根据所使用的线程池的种类不同,而分为2类: 1、一类是使用CLR的线程池的构造函数: /// <summary> /// Default constructor /// 默认构造函数 /// </summary> public DispatcherQueue() : this("Unnamed queue using CLR Threadpool") { } /// <summary> /// Constructs an instance of the dispatcher port using the CLR thread pool for task execution /// 构建一个不使用CCR的线程池,而是使用CLR线程池执行任务的实例 /// </summary> /// <param name="name"> /// 名称 /// </param> public DispatcherQueue(string name) { this._taskQueue = new Store<ITask>(); this._timescale = 1.0; this._timerTable = new Dictionary<long, Timer>(); this._name = name; } 2、一类是使用CCR线程池(也即操作系统线程池)的构造函数: /// <summary> /// Constructs an instance of the dispatcher port using the specified CCR dispatcher /// </summary> /// <param name="name"></param> /// <param name="dispatcher"> /// 指定的Dispatcher对象 /// </param> public DispatcherQueue(string name, Dispatcher dispatcher) : this(name, dispatcher, TaskExecutionPolicy.Unconstrained, , 1.0) { } /// <summary> /// Constructs an instance of the dispatcher port using the specified CCR dispatcher /// </summary> /// <param name="name">Friendly name</param> /// <param name="dispatcher"> /// Dispatcher instance for executing tasks /// 执行任务的Dispatcher实例 /// </param> /// <param name="policy"> /// Task scheduling policy /// 任务调度策略 /// </param> /// <param name="schedulingRate"> /// Average desired scheduling rate, in tasks per second. /// 期望的任务平均调度速率(每秒执行几个任务) /// Only valid when appropriate policy is specified /// 仅当指定对应策略的时候才生效 /// </param> public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, double schedulingRate) : this(name, dispatcher, policy, , schedulingRate) { } /// <summary> /// Constructs an instance of the dispatcher port using the specified CCR dispatcher /// </summary> /// <param name="name"> /// Friendly name /// </param> /// <param name="dispatcher"> /// Dispatcher instance for executing tasks /// </param> /// <param name="policy"> /// Task scheduling policy /// </param> /// <param name="maximumQueueDepth"> /// Maximum number of pending tasks. /// 最大待处理任务数 /// Only valid when appropriate policy is specified /// 仅当指定对应策略的时候才生效 /// </param> public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, int maximumQueueDepth) : this(name, dispatcher, policy, maximumQueueDepth, 0.0) { } /// <summary> /// Constructs an instance of the dispatcher port using the specified CCR dispatcher /// </summary> /// <param name="name"> /// Friendly name /// </param> /// <param name="dispatcher"> /// Dispatcher instance for executing tasks /// </param> /// <param name="policy"> /// Task scheduling policy /// </param> /// <param name="maximumQueueDepth"> /// Maximum number of pending tasks. /// Only used when appropriate policy is specified /// </param> /// <param name="schedulingRate"> /// Average desired scheduling rate, in tasks per second. /// Only used when appropriate policy is specified /// </param> private DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, int maximumQueueDepth, double schedulingRate) { // 1.初始化 任务队列、时间刻度、定时器表 this._taskQueue = new Store<ITask>(); this._timescale = 1.0; this._timerTable = new Dictionary<long, Timer>(); // 2.初始化 任务调度策略、 if (dispatcher == null) { throw new ArgumentNullException("dispatcher"); } if (((policy == TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks) || (policy == TaskExecutionPolicy.ConstrainQueueDepthThrottleExecution)) && (maximumQueueDepth <= )) { throw new ArgumentOutOfRangeException("maximumQueueDepth"); } if (((policy == TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks) || (policy == TaskExecutionPolicy.ConstrainSchedulingRateThrottleExecution)) && (schedulingRate <= 0.0)) { throw new ArgumentOutOfRangeException("schedulingRate"); } this._dispatcher = dispatcher; this._name = name; this._policy = policy; this._maximumQueueDepth = maximumQueueDepth; this._maximumSchedulingRate = schedulingRate; // 3.把DispatcherQueue关联到指定的Dispatcher上 dispatcher.AddQueue(name, this); // 4.判断是否需要开启CCR秒表 if (policy >= TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks) { this._watch = CcrStopwatch.StartNew(); } }
四、资源释放函数 DispatcherQueue内含任务队列,因此也需要做资源的释放,而且改函数的调用应该在Diapatcher的Dispose函数调用之前,详细原因看代码: /// <summary> /// Dispose releases resources associated with this instance /// 释放DispatcherQueue内的相关资源 /// </summary> public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } /// <summary> /// Implementation of dispose /// </summary> /// <param name="disposing"></param> protected virtual void Dispose(bool disposing) { // 移除对应Dispatcher与当前DispatcherQueue的关联 if ((disposing && (this._dispatcher != null)) && !this._dispatcher.RemoveQueue(this._name)) { // 释放任务队列内未处理的任务 int elementCount = ; lock (this._taskQueue) { elementCount = this._taskQueue.ElementCount; this._taskQueue = null; } // 调整对应Dispatcher内的未处理任务数 this._dispatcher.AdjustPendingCount(-elementCount); } }
五、任务操作函数 用户除了可以通过Port向DispatcherQueue推入任务外,还可以自己生产ITask任务,然后把它推入DiapatcherQueue内调度执行。为此DispatcherQueue具备了进队、出队2个功能。而且前面第三篇所提到的任务调度策略,也是在进队这个函数内实现的。 /// <summary> /// Enqueue ITask instance /// 任务实例进队 /// </summary> /// <exception cref="T:Microsoft.Ccr.Core.PortNotFoundException"> /// Thrown if message type is not derived from ITask /// 若消息没有实现ITask接口,则抛出异常 /// </exception> /// <param name="task"> /// ITask instance /// 任务实例 /// </param> public virtual bool Enqueue(ITask task) { bool flag = true; // 1.空消息异常 if (task == null) { throw new ArgumentNullException("message"); } task.TaskQueue = this; // 2.CCR线程池 if (this._dispatcher == null) { lock (this._taskQueue) { this._scheduledTaskCount += 1L; } ThreadPool.QueueUserWorkItem(new WaitCallback(TaskExecutionWorker.ThreadPoolExecute), task); return flag; } // 3.任务进队列 if (this._taskQueue == null) { throw new ObjectDisposedException(typeof(DispatcherQueue).Name + ":" + this.Name); } lock (this._taskQueue) { // 4.根据不同的任务调度策略,来把任务插入队列不同的位置 int num; switch (this._policy) { case TaskExecutionPolicy.Unconstrained: this._taskQueue.ElementListAddLast(new PortElement<ITask>(task)); goto Label_0285; case TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks: if (this._taskQueue.ElementCount >= this._maximumQueueDepth) { Dispatcher.LogInfo( "DispatcherQueue.Enqueue: Discarding oldest task because queue depth limit reached"); this._taskQueue.ElementListRemoveFirst(); Interlocked.Decrement(ref this._dispatcher._pendingTaskCount); flag = false; } this._taskQueue.ElementListAddLast(new PortElement<ITask>(task)); goto Label_0285; case TaskExecutionPolicy.ConstrainQueueDepthThrottleExecution: if (this._taskQueue.ElementCount < this._maximumQueueDepth) { goto Label_0172; } Microsoft.Ccr.Core.Dispatcher.LogInfo( "DispatcherQueue.Enqueue: Forcing thread sleep because queue depth limit reached"); goto Label_015F; case TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks: this.RecalculateSchedulingRate(); if (this._currentSchedulingRate >= this._maximumSchedulingRate) { Microsoft.Ccr.Core.Dispatcher.LogInfo( "DispatcherQueue.Enqueue: Discarding task because task scheduling rate exceeded"); this._taskQueue.ElementListRemoveFirst(); Interlocked.Decrement(ref this._dispatcher._pendingTaskCount); this.RecalculateSchedulingRate(); flag = false; } this._scheduledItems++; this._taskQueue.ElementListAddLast(new PortElement<ITask>(task)); goto Label_0285; case TaskExecutionPolicy.ConstrainSchedulingRateThrottleExecution: this.RecalculateSchedulingRate(); if (this._currentSchedulingRate < this._maximumSchedulingRate) { goto Label_025E; } num = ; Microsoft.Ccr.Core.Dispatcher.LogInfo( "DispatcherQueue.Enqueue: Forcing thread sleep because task scheduling rate exceeded"); goto Label_0250; default: goto Label_0285; } Label_0142: Monitor.Exit(this._taskQueue); Thread.Sleep(); Monitor.Enter(this._taskQueue); Label_015F: if (this._taskQueue.ElementCount >= this._maximumQueueDepth) { goto Label_0142; } Label_0172: this._taskQueue.ElementListAddLast(new PortElement<ITask>(task)); goto Label_0285; Label_0219: Monitor.Exit(this._taskQueue); Thread.Sleep( + num); num *= ; if (num > 0x3e8) { num = 0x3e8; } Monitor.Enter(this._taskQueue); this.RecalculateSchedulingRate(); Label_0250: if (this._currentSchedulingRate > this._maximumSchedulingRate) { goto Label_0219; } Label_025E: this._scheduledItems++; this._taskQueue.ElementListAddLast(new PortElement<ITask>(task)); Label_0285: this._scheduledTaskCount += 1L; } this._dispatcher.Signal(); return flag; } // 重新计算调度速率 private void RecalculateSchedulingRate() { this._currentSchedulingRate = this._scheduledItems / this._watch.Elapsed.TotalSeconds; } /// <summary> /// Atomically removes an ITask instance if the port is non empty /// 若当前待执行任务队列不空,则原子地取走一个任务 /// </summary> /// <param name="task"> /// ITask instance if port is not empty. Null otherwise /// 任务实例 /// </param> /// <returns> /// True if port is not empty /// 若队列不空则返回True /// </returns> public virtual bool TryDequeue(out ITask task) { // 1.异常判断 if (this._dispatcher == null) { // 不支持CCR线程池 throw new InvalidOperationException(Resource1.DispatcherPortTestNotValidInThreadpoolMode); } if (this._taskQueue == null) { // 任务队列已经被释放了 throw new ObjectDisposedException(typeof(DispatcherQueue).Name + ":" + this.Name); } // 2.从任务队列的头部取一个任务 lock (this._taskQueue) { // 判断是否挂起 if (this._isSuspended) { task = null; return false; } if (this._taskQueue.ElementCount > ) { task = this._taskQueue.ElementListRemoveFirst().TypedItem; } else { task = null; return false; } } Interlocked.Decrement(ref this._dispatcher._pendingTaskCount); return true; }
六、运行状态控制函数 DispatcherQueue提供了挂起、恢复的操作,以调度线程池对任务的运行,不过要注意的是,挂起状态下,用户仍然可以向DispatcherQueue提交任务。 /// <summary> /// Suspend scheduling of tasks. Tasks can still be queued /// 挂起调度中的任务,但任务仍然保持排列 /// </summary> public virtual void Suspend() { lock (this._taskQueue) { this._isSuspended = true; } } /// <summary> /// Resumes execution of tasks, including any tasks queued while in paused state /// 恢复任务的执行,包含所有处于暂停状态的排队任务 /// </summary> public virtual void Resume() { lock (this._taskQueue) { this._isSuspended = false; } this._dispatcher.Signal(); }