任务时间较长,且单个任务执行时间不一致的情况下多线程分配

时间:2022-12-29 19:25:02

对于开发人员来说遇到多线程问题是十分常见的,一般处理方式是简单粗暴直接开N的线程然后把任务分为N组。这种处理方式有一个明显的弊端就是每个分组执行时间不一定一致,这样会导致线程空闲的状态。

下文中很多地方提到线程不是指技术上的而是逻辑上的。

第一步:

在构造器中初始化信息:

_reqsBusy:忙碌信息,_workingSignals:工作线程信息,_reqCount:线程数,list:待执行列表,cList:执行完成列表

 

 public class ExtensionTaskFactory<T>
{
        private bool[] _reqsBusy = null;
        private bool[] _workingSignals = null;
        private int _reqCount = 4;
        private List<T> list;
        private List<T> cList = new List<T>();
        private Timer _checkTimer;
        public bool IsFinish;
        private DateTime _createTime;
        public ExtensionTaskFactory(List<T> l)
        {
            list = l;
            _reqsBusy = new bool[_reqCount];
            _workingSignals = new bool[_reqCount];
            for (int i = 0; i < _workingSignals.Count(); i++)
            {
                _workingSignals[i] = true;
            }
            _createTime = DateTime.Now;
        }
        public ExtensionTaskFactory(List<T> l, int count)
        {
            list = l;
            _reqCount = count;
            _reqsBusy = new bool[_reqCount];
            _workingSignals = new bool[_reqCount];
            for (int i = 0; i < _workingSignals.Count(); i++)
            {
                _workingSignals[i] = true;
            }
            _createTime = DateTime.Now;
        }
}    

 第二步:

开始任务(Start())-> 队空闲线程派发任务(DispatchWork()) -> 任务派发后修改当前线程状态,并且执行相关任务逻辑 (Process(int i)) -> 任务执行完成后再次请求派发任务

       private async void Process(int i)
        {
            _reqsBusy[i] = true;
            #region Adjust queue
            if (list.Count() > 0)
            {
                Console.WriteLine("Remaining {0} data need to be executed.", list.Count());
                T model = list.FirstOrDefault();
                list.Remove(model);
                cList.Add(model);
                await Dotaskfunction(model);
            }
            else
            {
                _workingSignals[i] = false;
                _reqsBusy[i] = false;
                return;
            }
            #endregion
            _reqsBusy[i] = false;
            DispatchWork();
        }
        private void DispatchWork()
        {
            for (int i = 0; i < _reqCount; i++)
            {
                if (!_reqsBusy[i])
                {
                    Process(i);
                }

            }
        }

        private async Task Dotaskfunction(T model)
        {
            await Task.Run(() =>
            {
                //Do someThing
            });
        }

        public void Start()
        {
            DispatchWork();
        }

 

第三步:将任务逻辑以事件形式注入到 Dotaskfunction 中

  1. 定义事件
    public event TaskEventHandler<T> TaskEvent;
  2. 修改 Dotaskfunction
    private async Task Dotaskfunction(T model)
            {
                await Task.Run(() =>
                {
                    //Console.WriteLine("task {0}  has been started!", s);
                    //Do someThing
                    if (TaskEvent != null)
                    {
                        TaskEvent(model, _createTime);
                    }
                    //Thread.Sleep(3000);
                    //Console.WriteLine("task {0}  has been done!", s);
                });
            }

第四步:

监测任务完成情况

  1. 监测任务线程是否都已经闲置
    private void CheckFinish(object param)
            {
                if (IsFinished())
                {
                    _checkTimer.Dispose(); //Stop Timer
                    _checkTimer = null;
                    IsFinish = true;
                    Console.WriteLine("Async Task End !");
                    Console.WriteLine("-------------------" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "-------------------");
                }
            }
            private bool IsFinished()
            {
                bool result = false;
                for (int i = 0; i < _workingSignals.Count(); i++)
                {
                    if (_workingSignals[i])
                    {
                        break;
                    }
                    if (i == _workingSignals.Count() - 1)
                    {
                        result = true;
                    }
                }
                return result;
            }
  2. 修改 Start() 方法,注册Timer定时检测任务是否完成
    public void Start()
            {
                _checkTimer = new Timer(new TimerCallback(CheckFinish), null, 0, 3000);
                DispatchWork();
            }

     

第五步:辅助功能

  1. 添加任务
    public void AddList(List<T> addlist)
            {
                list.AddRange(addlist);
            }
  2. 重新启动任务
    public void Restart()
            {
                if (_checkTimer == null)
                {
                    _checkTimer = new Timer(new TimerCallback(CheckFinish), null, 0, 3000);
                }
                for (int i = 0; i < _workingSignals.Count(); i++)
                {
                    _workingSignals[i] = true;
                }
                DispatchWork();
            }
  3. 获取已完成任务
    public List<T> GetResult()
            {
                return cList;
            }

     

第六步:调用示例

  1. 调用
    List<Test> tasks;       
    ExtensionTaskFactory<Test> task = new ExtensionTaskFactory<Test>(tasks, 4); task.TaskEvent += Task_RunQueryEvent; task.Start();
  2. 注册任务处理逻辑事件
    private void Task_RunQueryEvent(RunQueryLog log, DateTime time)
    {
        //Do something
    }

最后附上源码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace NewsletterAlert.Core.Common
{
    public delegate void TaskEventHandler<T>(T model, DateTime time);
    public class ExtensionTaskFactory<T>
    {
        private bool[] _reqsBusy = null;
        private bool[] _workingSignals = null;
        private int _reqCount = 4;
        private List<T> list;
        private List<T> cList = new List<T>();
        private Timer _checkTimer;
        public bool IsFinish;
        private DateTime _createTime;

        public event TaskEventHandler<T> TaskEvent;

        public ExtensionTaskFactory(List<T> l)
        {
            list = l;
            _reqsBusy = new bool[_reqCount];
            _workingSignals = new bool[_reqCount];
            for (int i = 0; i < _workingSignals.Count(); i++)
            {
                _workingSignals[i] = true;
            }
            _createTime = DateTime.Now;
        }
        public ExtensionTaskFactory(List<T> l, int count)
        {
            list = l;
            _reqCount = count;
            _reqsBusy = new bool[_reqCount];
            _workingSignals = new bool[_reqCount];
            for (int i = 0; i < _workingSignals.Count(); i++)
            {
                _workingSignals[i] = true;
            }
            _createTime = DateTime.Now;
        }

        private async void Process(int i)
        {
            _reqsBusy[i] = true;
            #region Adjust queue
            if (list.Count() > 0)
            {
                Console.WriteLine("Remaining {0} data need to be executed.", list.Count());
                T model = list.FirstOrDefault();
                list.Remove(model);
                cList.Add(model);
                await Dotaskfunction(model);
            }
            else
            {
                _workingSignals[i] = false;
                _reqsBusy[i] = false;
                return;
            }
            #endregion
            _reqsBusy[i] = false;
            DispatchWork();
        }
        private void DispatchWork()
        {
            for (int i = 0; i < _reqCount; i++)
            {
                if (!_reqsBusy[i])
                {
                    Process(i);
                }

            }
        }

        private async Task Dotaskfunction(T model)
        {
            await Task.Run(() =>
            {
                //Console.WriteLine("task {0}  has been started!", s);
                //Do someThing
                if (TaskEvent != null)
                {
                    TaskEvent(model, _createTime);
                }
                //Thread.Sleep(3000);
                //Console.WriteLine("task {0}  has been done!", s);
            });
        }

        public void Start()
        {
            _checkTimer = new Timer(new TimerCallback(CheckFinish), null, 0, 3000);
            DispatchWork();
        }

        private void CheckFinish(object param)
        {
            if (IsFinished())
            {
                _checkTimer.Dispose(); //Stop Timer
                _checkTimer = null;
                IsFinish = true;
                Console.WriteLine("Async Task End !");
                Console.WriteLine("-------------------" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "-------------------");
            }
        }
        private bool IsFinished()
        {
            bool result = false;
            for (int i = 0; i < _workingSignals.Count(); i++)
            {
                if (_workingSignals[i])
                {
                    break;
                }
                if (i == _workingSignals.Count() - 1)
                {
                    result = true;
                }
            }
            return result;
        }

        public void AddList(List<T> addlist)
        {
            list.AddRange(addlist);
        }
        public void Restart()
        {
            if (_checkTimer == null)
            {
                _checkTimer = new Timer(new TimerCallback(CheckFinish), null, 0, 3000);
            }
            for (int i = 0; i < _workingSignals.Count(); i++)
            {
                _workingSignals[i] = true;
            }
            DispatchWork();
        }

        public List<T> GetResult()
        {
            return cList;
        }
    }
}