池(Pool)是一个很常见的提高性能的方式。比如线程池连接池等,之所以有这些池是因 为线程和数据库连接的创建和关闭是一种比较昂贵的行为。对于这种昂贵的资源我们往往会考虑在一个池容器中放置一些资源,在用的时候去拿,在不够的时候添 点,在用完就归还,这样就可以避免不断的创建资源和销毁资源。
如果您做过相关实验的话可能会觉得不以为然,似乎开1000个线程也用不了几百毫秒。我们要这么想,对于一个高并发的环境来说,每一秒假设有100 个请求,每个请求需要使用(开和关)10个线程,也就是一秒需要处理1000个线程的开和关,每个线程独立堆栈1M,可以想象在这一秒中内存分配和回收是 多么夸张,这个开销不能说不昂贵。
首先,要理解线程池线程分为两类工作线程和IO线程,可以单独设置最小线程数和最大线程数:
ThreadPool.SetMinThreads(2, 2);
ThreadPool.SetMaxThreads(4, 4);
最大线程数很好理解,就是线程池最多创建这些线程,如果最大4个线程,现在这4个线程都在运行的话,后续进来的线程只能排队等待了。那么为什么有最 小线程一说法呢?其实之所以使用线程池是不希望线程在创建后运行结束后理解回收,这样的话以后要用的时候还需要创建,我们可以让线程池至少保留几个线程,
即使没有线程在工作也保留。上述语句我们设置线程池一开始就保持2个工作线程和2个IO线程,最大不超过4个线程。
至于线程池的使用相当简单先来看一段代码:
for (int i = 0; i < totalThreads; i++)
{
ThreadPool.QueueUserWorkItem(o =>
{
Thread.Sleep(1000);
int a, b;
ThreadPool.GetAvailableThreads(out
a, out b);
Console.WriteLine(string.Format("({0}/{1}) #{2} : {3}", a, b,
Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToString("mm:ss")));
});
}
Console.WriteLine("Main thread finished");
Console.ReadLine();
代码里面用到了一个事先定义的静态字段:
static readonly int totalThreads = 10;
代码运行结果如下:
每一个线程都休眠一秒然后输出当前线程池可用的工作线程和IO线程以及当前线程的托管ID和时间。我们通过这段代码可以发现线程池的几个特性:
1) 线程池中的线程都是后台线程,如果没有在主线程使用ReadLine的话,程序马上会退出。
2) 线程池一开始就占用了2个线程,一秒后占用了4个线程,工作线程将会由3-6四个线程来处理。
3) 线程池最多使用了4个工作线程和0个IO线程。
那么,我们如何知道线程池中的线程都运行结束了呢,可以想到上文用过的Monitor结构:
Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < totalThreads; i++)
{
ThreadPool.QueueUserWorkItem(o =>
{
Thread.Sleep(1000);
int a, b;
ThreadPool.GetAvailableThreads(out
a, out b);
Console.WriteLine(string.Format("({0}/{1}) #{2} : {3}", a, b,
Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToString("mm:ss")));
lock (locker)
{
runningThreads--;
Monitor.Pulse(locker);
}
});
}
lock (locker)
{
while (runningThreads > 0)
Monitor.Wait(locker);
}
Console.WriteLine(sw.ElapsedMilliseconds);
Console.ReadLine();
程序中用到了两个辅助字段:
static object locker
= new object();
static int runningThreads
= totalThreads;
程序运行结果如下:
我们看到,10个线程使用了3.5秒全部执行完毕。20个线程呢?
需要6秒。细细分析这2个图我们不难发现,新的线程不是在不够用的时候立即创建而是延迟了0.5秒左右的时间,这是因为线程池会等待一下看是不是有 线程在这段时间内可用,如果实在没有的话再创建。其实可以这么理解这6秒,前一秒只有2个线程,后4秒有4个线程执行了16个,最后1秒又只有2个线程 了,所以一共是2+4*4+2=20,6秒处理了20个线程。
ThreadPool还有一个很有用的方法可以注册一个信号量,我们发出信号后所有关联的线程才执行,否则就一直等待,还可以指定等待的时间:
首先定义信号量和存储结果的字段:
static ManualResetEvent mre = new ManualResetEvent(false);
static int result = 0;
程序如下:
Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < totalThreads; i++)
{
ThreadPool.RegisterWaitForSingleObject(mre, (state,
istimeout) =>
{
Thread.Sleep(1000);
int a, b;
ThreadPool.GetAvailableThreads(out a, out b);
Interlocked.Increment(ref result);
Console.WriteLine(string.Format("({0}/{1}) #{2} : {3}", a, b,
Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToString("mm:ss")));
lock
(locker)
{
runningThreads--;
Monitor.Pulse(locker);
}
}, null, 500, true);
}
Thread.Sleep(1000);
result = 10;
mre.Set();
lock (locker)
{
while (runningThreads > 0)
Monitor.Wait(locker);
}
Console.WriteLine(sw.ElapsedMilliseconds);
Console.WriteLine(result);
Console.ReadLine();
第一个参数就是信号量,第二个参数就是方法主体(接受两个参数分别是传给线程的一个状态变量以及线程执行的时候是否超时),第三个参数是状态变量, 第四个参数超时时间我们设置了500毫秒,由于主线程在1秒后发出信号,超时500毫秒,所以这些线程并没等到信号的发出500毫秒之后就运行了。之所以 程序的运行结果为30是因为即使500毫秒之后线程超时开始执行但是也要等1秒才累加结果,在这个时候主线程早已把结果更新为10了,所以累加从10开始 而不是0开始。最后布尔参数为true表明接受到信号后只线程执行一次。
观察到,所有线程执行完毕花了7秒的时间,除去开始的等待时间0.5秒,相比之前的例子还多了0.5秒的时间。这是为什么呢?请大家帮忙分析分析。 还有一个更奇怪的问题是,RegisterWaitForSingleObject消耗的是IO线程而不是工作线程,难道微软觉得
RegisterWaitForSingleObject常见于IO操作的应用还是不希望不浪费工作线程?
这节我们按照线程池的核心思想来自定义一个简单的线程池:
1) 池中使用的线程不少于一定数量,不多于一定数量
2) 池中线程不够的时候创建,富裕的时候收回
3) 任务排队,没有可用线程时,任务等待
我们的目的只是实现这些“需求”,不去考虑性能(比如等待一段时间再去创建新的线程等策略)以及特殊的处理(异常),在实现这个需求的过程中我们也回顾了线程以及线程同步的基本概念。
首先,把任务委托和任务需要的状态数据封装一个对象:
public class WorkItem
{
public WaitCallback Action { get; set; }
public object State { get; set; }
public WorkItem(WaitCallback action,
object state)
{
this.Action = action;
this.State = state;
}
}
然后来创建一个对象作为线程池中的一个线程:
public class SimpleThreadPoolThread
{
******* object locker = new object();
******* AutoResetEvent are = new AutoResetEvent(false);
******* WorkItem wi;
******* Thread t;
******* bool b = true;
******* bool isWorking;
public bool IsWorking
{
get
{
lock
(locker)
{
return isWorking;
}
}
}
public event Action<SimpleThreadPoolThread>
WorkComplete;
public SimpleThreadPoolThread()
{
lock (locker)
{
// 当前没有实际任务
isWorking =
false;
}
t = new Thread(Work) { IsBackground
= true };
t.Start();
}
public void SetWork(WorkItem wi)
{
this.wi = wi;
}
public void StartWork()
{
// 发出信号
are.Set();
}
public void StopWork()
{
// 空任务
wi = null;
// 停止线程循环
b = false;
// 发出信号结束线程
are.Set();
}
******* void Work()
{
while (b)
{
// 没任务,等待信号
are.WaitOne();
if (wi !=
null)
{
lock (locker)
{
// 开始
isWorking = true;
}
// 执行任务
wi.Action(wi.State);
lock (locker)
{
// 结束
isWorking = false;
}
// 结束事件
WorkComplete(this);
}
}
}
代码的细节可以看注释,对这段代码的整体结构作一个说明:
1) 由于这个线程是被线程池中任务复用的,所以线程的任务处于循环中,除非线程池打算回收这个线程,否则不会退出循环结束任务
2) 使用自动信号量让线程没任务的时候等待,由线程池在外部设置任务后发出信号来执行实际的任务,执行完毕后继续等待
3) 线程公开一个完成的事件,线程池可以挂接处理方法,在任务完成后更新线程池状态
4) 线程池中的所有线程都是后台线程
下面再来实现线程池:
public class SimpleThreadPool : IDisposable
{
******* object locker = new object();
******* bool b = true;
******* int minThreads;
******* int maxThreads;
******* int currentActiveThreadCount;
******* List<SimpleThreadPoolThread>
simpleThreadPoolThreadList = new List<SimpleThreadPoolThread>();
******* Queue<WorkItem> workItemQueue = new
Queue<WorkItem>();
public int CurrentActiveThreadCount
{
get
{
lock
(locker)
{
return currentActiveThreadCount;
}
}
}
public int CurrentThreadCount
{
get
{
lock
(locker)
{
return simpleThreadPoolThreadList.Count;
}
}
}
public int CurrentQueuedWorkCount
{
get
{
lock
(locker)
{
return workItemQueue.Count;
}
}
}
public SimpleThreadPool()
{
minThreads = 4;
maxThreads = 25;
Init();
}
public SimpleThreadPool(int minThreads,
int maxThreads)
{
if (minThreads > maxThreads)
throw new
ArgumentException("minThreads > maxThreads",
"minThreads,maxThreads");
this.minThreads = minThreads;
this.maxThreads = maxThreads;
Init();
}
public void QueueUserWorkItem(WorkItem
wi)
{
lock (locker)
{
// 任务入列
workItemQueue.Enqueue(wi);
}
}
******* void Init()
{
lock (locker)
{
// 一开始创建最小线程
for (int i =
0; i < minThreads; i++)
{
CreateThread();
}
currentActiveThreadCount = 0;
}
new Thread(Work) { IsBackground =
true }.Start();
}
******* SimpleThreadPoolThread
CreateThread()
{
SimpleThreadPoolThread t = new
SimpleThreadPoolThread();
// 挂接任务结束事件
t.WorkComplete += new
Action<SimpleThreadPoolThread>(t_WorkComplete);
// 线程入列
simpleThreadPoolThreadList.Add(t);
return t;
}
******* void Work()
{
// 线程池主循环
while (b)
{
Thread.Sleep(100);
lock
(locker)
{
// 如果队列中有任务并且当前线程小于最大线程
if (workItemQueue.Count > 0 && CurrentActiveThreadCount <
maxThreads)
{
WorkItem wi = workItemQueue.Dequeue();
// 寻找闲置线程
SimpleThreadPoolThread availableThread =
simpleThreadPoolThreadList.FirstOrDefault(t => t.IsWorking == false);
// 无则创建
if (availableThread == null)
availableThread = CreateThread();
// 设置任务
availableThread.SetWork(wi);
// 开始任务
availableThread.StartWork();
// 增加个活动线程
currentActiveThreadCount++;
}
}
}
}
******* void
t_WorkComplete(SimpleThreadPoolThread t)
{
lock (locker)
{
// 减少个活动线程
currentActiveThreadCount--;
// 如果当前线程数有所富裕并且比最小线程多
if
((workItemQueue.Count + currentActiveThreadCount) < minThreads &&
CurrentThreadCount > minThreads)
{
// 停止已完成的线程
t.StopWork();
// 从线程池删除线程
simpleThreadPoolThreadList.Remove(t);
}
}
}
public void Dispose()
{
// 所有线程停止
foreach (var t in
simpleThreadPoolThreadList)
{
t.StopWork();
}
// 线程池主循环停止
b = false;
}
}
线程池的结构如下:
1) 在构造方法中可以设置线程池最小和最大线程
2) 维护一个任务队列和一个线程池中线程的列表
3) 初始化线程池的时候就创建最小线程数量定义的线程
4) 线程池主循环每20毫秒就去处理一次,如果有任务并且线程池还可以处理任务的话,先是找闲置线程,找不到则创建一个
5) 通过设置任务委托以及发出信号量来开始任务
6) 线程池提供了三个属性来查看当前活动线程数,当前总线程数和当前队列中的任务数
7) 任务完成的回调事件中我们判断如果当前线程有富裕并且比最小线程多则回收线程
8) 线程池是IDispose对象,在Dispose()方法中停止所有线程后停止线程池主循环
写一段代码来测试线程池:
using (SimpleThreadPool t = new SimpleThreadPool(2, 4))
{
Stopwatch sw2 = Stopwatch.StartNew();
for (int i = 0; i < 10; i++)
{
t.QueueUserWorkItem(new
WorkItem((index =>
{
Console.WriteLine(string.Format("#{0}
: {1} / {2}", Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToString("mm:ss"), index));
Console.WriteLine(string.Format("CurrentActiveThread: {0} / CurrentThread:
{1} / CurrentQueuedWork: {2}", t.CurrentActiveThreadCount,
t.CurrentThreadCount, t.CurrentQueuedWorkCount));
Thread.Sleep(1000);
}), i));
}
while (t.CurrentQueuedWorkCount > 0 ||
t.CurrentActiveThreadCount > 0)
{
Thread.Sleep(10);
}
Console.WriteLine("All work completed");
Console.WriteLine(string.Format("CurrentActiveThread:
{0} / CurrentThread: {1} / CurrentQueuedWork: {2}",
t.CurrentActiveThreadCount, t.CurrentThreadCount, t.CurrentQueuedWorkCount));
Console.WriteLine(sw2.ElapsedMilliseconds);
}
代码中我们向线程池推入10个任务,每个任务需要1秒执行,任务执行前输出当前任务的所属线程的Id,当前时间以及状态值。然后再输出线程池的几个状态属性。主线程循环等待所有任务完成后再次输出线程池状态属性以及所有任务完成耗费的时间:
我们可以看到:
1) 线程池中的线程总数从2到4到2
2) 线程池中活动的线程数从2到4到0
3) 线程池中排队的任务数从9到0
4) 所有线程完成一共使用了3秒时间
相比.NET内置的线程池,性能虽然有0.5秒的提高(可以见前文,.NET线程池在创建新的线程之前会等待0.5秒左右的时间),但其实一个好的
线程池的实现需要考虑很多策略(什么时候去创建新线程,什么时候去回收老线程),.NET的ThreadPool在整体性能上做的很好,所以不建议随便去 使用自定义的线程池。本例更只能作为实验和演示。
ThreadPool(线程池)是一个静态类,它没有定义任何的构造方法(),我们只能够使用它的静态方法,这是因为,这是因为ThreadPool是托管线程池,是由CLR管理的。
ThreadPool使用WaitCallback委托,它所要做的工作是在后台进行的。使工作项的排队和运行更容易,可以给工作者线程传递一个状态对象(提供数据)。状态对象是私有的作用域位于线程层,所以不需要进行同步。
ThreadPool 目标是为了减除线程的初始化开销,实现并行处理。.NET类库中的ThreadPool是异步IO的基础,比如,在System.Net.Socket 中,我们可以使用BeginAccept , EndAccept将Socket需要阻塞的操作放到系统的线程池中运行,而在执行结束以后通知主线程。
一个ThreadPool里面注册的线程拥有默认的堆栈大小,默认的优先级。并且,他们都存在于多线程空间(Multithreaded apartment)中。
ThreadPool中的Thread不能手动取消,也不用手动开始。所以ThreadPool并不适用比较长的线程。你要做的只是把一个 WaitCallback委托塞给ThreadPool,然后剩下的工作将由系统自动完成。系统会在ThreadPool的线程队列中一一启动线程。
当线程池满时,多余的线程会在队列里排队,当线程池空闲时,系统自动掉入排队的线程,以保持系统利用率。
我们的程序中使用ThreadPool来进行一些比较耗时或者需要阻塞的操作。当学要复杂的同步技术,例如事件,或需要对一个现场表调用Join方法时线程池就不能满足需求了.在以下情况中不宜使用ThreadPool而应该使用单独的Thread:
1,需要为线程指定详细的优先级
2,线程执行需要很长时间
3,需要把线程放在单独的线程apartment中
4,在线程执行中需要对线程操作,如打断,挂起等。
通常是将计算密集型的操作放在worker线程池中运行,而线程池的大小会根据当前的CPU使用量自动调整,通过下面两个方法,我们可以设置线程池的大小:
ThreadPool.SetMaxThreads(10, 200);
ThreadPool.SetMinThreads(2, 40);
两个参数分别是WorkThread和IO thread的限制。
先看一个简单的例子(运行结果不会每次都一样,这应该是ThreadPool后台处理的正常反应)
using System;
using System.Threading;
using System.Collections.Generic;
using System.Text;
namespace ThreadPoolDemo
...{
class Program
...{
static void Main(string[] args)
...{
for (int i = 0; i < 20; i++)
...{
ThreadPool.QueueUserWorkItem(
new WaitCallback(DoWork), i);
}
Console.ReadLine();
}
static void
DoWork(object state)
...{
int
threadNumber = (int)state;
Console.WriteLine("Thread {0}reporting for duty.", state);
Console.WriteLine();
}
}
}
接下来考虑如何用ThreadPool来调度一些周期性运行的工作,.NET提供了System.Threading.Timer类实现这一个功能。涉及Timer和TimerCallback。后者也是一个委托,其声明如下:
public delegate void TimerCallback(object state);
显然,他的使用方法与上面WaitCallback的完全相同,我们可以简单的将上面的例子变成周期性运行的:
Timer tm = new Timer(new TimerCallback(DoWork) , new testObject() , 0 , 2000);
后面的两个参数是启动的延迟时间和周期
Timer 的线程分配机制与当前同时进行的其它Timer的时间复杂度有关系,当定义几个Timer同时工作的时候,如果每一个操作耗时较长,而且可能同时到期的 话,线程池可能为每一个Timer操作定义不同的执行线程,而对于简单操作,有可能多个Timer被放在同一个线程中执行.
即使你没有在线程中显示的调用过ThreadPool的方法,只要你在写.NET程序,你就可能已经在使用线程池了,不信,打开一个.NET程序, 在任务管理器中看看他的线程数,你会发现有N个线程运行中,即使你可能只使用了一个线程。如果你用了异步API的话,线程数目可能会让你觉得目瞪口呆。