c# 线程池实现 只是一个原理性的实现细节内容忽略

时间:2021-01-17 17:07:22

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace ThreadPoolImp
{
    public class MyThreadExcutor
    {
        //创建
        private static volatile bool RUNNING = true;

        //所有任务都放队列中,让工作线程来消费
        private static ConcurrentQueue<Action> queue = null;

        private static HashSet<Worker> workers = new HashSet<Worker>();

        private static List<Thread> threadList = new List<Thread>();
        //工作线程数
        int poolSize = 0;
        //核心线程数(创建了多少个工作线程)
        int coreSize = 0;

        static bool shutdown = false;
        public MyThreadExcutor(int poolSize)
        {
            this.poolSize = poolSize;
            queue = new ConcurrentQueue<Action>();
        }
        public void Exec(Action action)
        {
            if (action == null) { throw new ArgumentNullException(); }
            if (coreSize < poolSize)
            {
                addThread(action);
            }
            else
            {
                try
                {
                    queue.Enqueue(action);
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
        }

        private void addThread(Action action)
        {
            coreSize++;
            Worker worker = new Worker(action);
            workers.Add(worker);
            Thread t = new Thread(worker.Run);
            threadList.Add(t);
            try
            {
                t.Start();
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public void ShutDown()
        {
            RUNNING = false;
            if (workers.Count > 0)
            {
                foreach (Worker item in workers)
                {
                    item.InterruptAll();
                }
            }
            shutdown = true;
            Thread.CurrentThread.Interrupt();
        }
        class Worker
        {
            private readonly static object lockObj = new object();
            public Worker(Action action)
            {
                queue.Enqueue(action);
            }

            public void Run()
            {
                while (RUNNING)
                {
                    if (shutdown==true)
                    {
                        Thread.CurrentThread.Interrupt();
                    }
                    Action task = null;
                    try
                    {
                        task = GetTask();
                        if (task != null)
                        {
                            task();
                        }
                        else
                        {
                            Thread.CurrentThread.Interrupt();
                            break;
                        }
                    }
                    catch (Exception ex)
                    {
                        throw ex;
                    }
                }
            }

            public Action GetTask()
            {
                Action result;
                queue.TryDequeue(out result);
                return result;
            }
            public void InterruptAll()
            {
                for (int i = 0; i < threadList.Count; i++)
                {
                    threadList[i].Interrupt();
                }
            }
        }

    }
}