using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace ThreadPoolImp
{
public class MyThreadExcutor
{
//创建
private static volatile bool RUNNING = true;
//所有任务都放队列中,让工作线程来消费
private static ConcurrentQueue<Action> queue = null;
private static HashSet<Worker> workers = new HashSet<Worker>();
private static List<Thread> threadList = new List<Thread>();
//工作线程数
int poolSize = 0;
//核心线程数(创建了多少个工作线程)
int coreSize = 0;
static bool shutdown = false;
public MyThreadExcutor(int poolSize)
{
this.poolSize = poolSize;
queue = new ConcurrentQueue<Action>();
}
public void Exec(Action action)
{
if (action == null) { throw new ArgumentNullException(); }
if (coreSize < poolSize)
{
addThread(action);
}
else
{
try
{
queue.Enqueue(action);
}
catch (Exception ex)
{
throw ex;
}
}
}
private void addThread(Action action)
{
coreSize++;
Worker worker = new Worker(action);
workers.Add(worker);
Thread t = new Thread(worker.Run);
threadList.Add(t);
try
{
t.Start();
}
catch (Exception ex)
{
throw ex;
}
}
public void ShutDown()
{
RUNNING = false;
if (workers.Count > 0)
{
foreach (Worker item in workers)
{
item.InterruptAll();
}
}
shutdown = true;
Thread.CurrentThread.Interrupt();
}
class Worker
{
private readonly static object lockObj = new object();
public Worker(Action action)
{
queue.Enqueue(action);
}
public void Run()
{
while (RUNNING)
{
if (shutdown==true)
{
Thread.CurrentThread.Interrupt();
}
Action task = null;
try
{
task = GetTask();
if (task != null)
{
task();
}
else
{
Thread.CurrentThread.Interrupt();
break;
}
}
catch (Exception ex)
{
throw ex;
}
}
}
public Action GetTask()
{
Action result;
queue.TryDequeue(out result);
return result;
}
public void InterruptAll()
{
for (int i = 0; i < threadList.Count; i++)
{
threadList[i].Interrupt();
}
}
}
}
}