前言
本文配套代码:https://github.com/TTGuoying/ThreadPool
先看看几个概念:
- 线程:进程中负责执行的执行单元。一个进程中至少有一个线程。
- 多线程:一个进程中有多个线程同时运行,根据cpu切换轮流工作,在多核cpu上可以几个线程同时在不同的核心上同时运行。
- 线程池:基本思想还是一种对象池思想,开辟一块内存空间,里面存放一些休眠(挂起Suspend)的线程。当有任务要执行时,从池中取一个空闲的线程执行任务,执行完成后线程休眠放回池中。这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。
我们为什么要使用线程池呢?
简单来说就是线程本身存在开销,我们利用多线程来进行任务处理,单线程也不能滥用,无止禁的开新线程会给系统产生大量消耗,而线程本来就是可重用的资源,不需要每次使用时都进行初始化,因此可以采用有限的线程个数处理无限的任务。
代码实现
本文的线程池是在Windows上实现的。主要思路是维护一个空闲线程队列、一个忙碌线程队列和一个任务队列,一开始建立一定数量的空闲线程放进空闲线程队列,当有任务进入任务队列时,从空闲线程队列中去一个线程执行任务,线程变为忙碌线程移入忙碌线程队列,任务执行完成后,线程到任务队列中取任务继续执行,如果任务队列中没有任务线程休眠后从忙碌线程队列回到空闲线程队列。下面是线程池的工作原理图:
本线程池类实现了自动调节池中线程数。
废话少说,直接上代码:
/*
==========================================================================
* 类ThreadPool是本代码的核心类,类中自动维护线程池的创建和任务队列的派送 * 其中的TaskFun是任务函数
* 其中的TaskCallbackFun是回调函数 *用法:定义一个ThreadPool变量,TaskFun函数和TaskCallbackFun回调函数,然后调用ThreadPool的QueueTaskItem()函数即可 Author: TTGuoying Date: 2018/02/19 23:15 ==========================================================================
*/
#pragma once
#include <Windows.h>
#include <list>
#include <queue>
#include <memory> using std::list;
using std::queue;
using std::shared_ptr; #define THRESHOLE_OF_WAIT_TASK 20 typedef int(*TaskFun)(PVOID param); // 任务函数
typedef void(*TaskCallbackFun)(int result); // 回调函数 class ThreadPool
{
private:
// 线程类(内部类)
class Thread
{
public:
Thread(ThreadPool *threadPool);
~Thread(); BOOL isBusy(); // 是否有任务在执行
void ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback); // 执行任务 private:
ThreadPool *threadPool; // 所属线程池
BOOL busy; // 是否有任务在执行
BOOL exit; // 是否退出
HANDLE thread; // 线程句柄
TaskFun task; // 要执行的任务
PVOID param; // 任务参数
TaskCallbackFun taskCb; // 回调的任务
static unsigned int __stdcall ThreadProc(PVOID pM); // 线程函数
}; // IOCP的通知种类
enum WAIT_OPERATION_TYPE
{
GET_TASK,
EXIT
}; // 待执行的任务类
class WaitTask
{
public:
WaitTask(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL bLong)
{
this->task = task;
this->param = param;
this->taskCb = taskCb;
this->bLong = bLong;
}
~WaitTask() { task = NULL; taskCb = NULL; bLong = FALSE; param = NULL; } TaskFun task; // 要执行的任务
PVOID param; // 任务参数
TaskCallbackFun taskCb; // 回调的任务
BOOL bLong; // 是否时长任务
}; // 从任务列表取任务的线程函数
static unsigned int __stdcall GetTaskThreadProc(PVOID pM)
{
ThreadPool *threadPool = (ThreadPool *)pM;
BOOL bRet = FALSE;
DWORD dwBytes = ;
WAIT_OPERATION_TYPE opType;
OVERLAPPED *ol;
while (WAIT_OBJECT_0 != WaitForSingleObject(threadPool->stopEvent, ))
{
BOOL bRet = GetQueuedCompletionStatus(threadPool->completionPort, &dwBytes, (PULONG_PTR)&opType, &ol, INFINITE);
// 收到退出标志
if (EXIT == (DWORD)opType)
{
break;
}
else if (GET_TASK == (DWORD)opType)
{
threadPool->GetTaskExcute();
}
}
return ;
} //线程临界区锁
class CriticalSectionLock
{
private:
CRITICAL_SECTION cs;//临界区
public:
CriticalSectionLock() { InitializeCriticalSection(&cs); }
~CriticalSectionLock() { DeleteCriticalSection(&cs); }
void Lock() { EnterCriticalSection(&cs); }
void UnLock() { LeaveCriticalSection(&cs); }
}; public:
ThreadPool(size_t minNumOfThread = , size_t maxNumOfThread = );
~ThreadPool(); BOOL QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb = NULL, BOOL longFun = FALSE); // 任务入队 private:
size_t getCurNumOfThread() { return getIdleThreadNum() + getBusyThreadNum(); } // 获取线程池中的当前线程数
size_t GetMaxNumOfThread() { return maxNumOfThread - numOfLongFun; } // 获取线程池中的最大线程数
void SetMaxNumOfThread(size_t size) // 设置线程池中的最大线程数
{
if (size < numOfLongFun)
{
maxNumOfThread = size + numOfLongFun;
}
else
maxNumOfThread = size;
}
size_t GetMinNumOfThread() { return minNumOfThread; } // 获取线程池中的最小线程数
void SetMinNumOfThread(size_t size) { minNumOfThread = size; } // 设置线程池中的最小线程数 size_t getIdleThreadNum() { return idleThreadList.size(); } // 获取线程池中的线程数
size_t getBusyThreadNum() { return busyThreadList.size(); } // 获取线程池中的线程数
void CreateIdleThread(size_t size); // 创建空闲线程
void DeleteIdleThread(size_t size); // 删除空闲线程
Thread *GetIdleThread(); // 获取空闲线程
void MoveBusyThreadToIdleList(Thread *busyThread); // 忙碌线程加入空闲列表
void MoveThreadToBusyList(Thread *thread); // 线程加入忙碌列表
void GetTaskExcute(); // 从任务队列中取任务执行
WaitTask *GetTask(); // 从任务队列中取任务 CriticalSectionLock idleThreadLock; // 空闲线程列表锁
list<Thread *> idleThreadList; // 空闲线程列表
CriticalSectionLock busyThreadLock; // 忙碌线程列表锁
list<Thread *> busyThreadList; // 忙碌线程列表 CriticalSectionLock waitTaskLock;
list<WaitTask *> waitTaskList; // 任务列表 HANDLE dispatchThrad; // 分发任务线程
HANDLE stopEvent; // 通知线程退出的时间
HANDLE completionPort; // 完成端口
size_t maxNumOfThread; // 线程池中最大的线程数
size_t minNumOfThread; // 线程池中最小的线程数
size_t numOfLongFun; // 线程池中最小的线程数
};
#include "stdafx.h"
#include "ThreadPool.h"
#include <process.h> ThreadPool::ThreadPool(size_t minNumOfThread, size_t maxNumOfThread)
{
if (minNumOfThread < )
this->minNumOfThread = ;
else
this->minNumOfThread = minNumOfThread;
if (maxNumOfThread < this->minNumOfThread * )
this->maxNumOfThread = this->minNumOfThread * ;
else
this->maxNumOfThread = maxNumOfThread;
stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, , ); idleThreadList.clear();
CreateIdleThread(this->minNumOfThread);
busyThreadList.clear(); dispatchThrad = (HANDLE)_beginthreadex(, , GetTaskThreadProc, this, , );
numOfLongFun = ;
} ThreadPool::~ThreadPool()
{
SetEvent(stopEvent);
PostQueuedCompletionStatus(completionPort, , (DWORD)EXIT, NULL); CloseHandle(stopEvent);
} BOOL ThreadPool::QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL longFun)
{
waitTaskLock.Lock();
WaitTask *waitTask = new WaitTask(task, param, taskCb, longFun);
waitTaskList.push_back(waitTask);
waitTaskLock.UnLock();
PostQueuedCompletionStatus(completionPort, , (DWORD)GET_TASK, NULL);
return TRUE;
} void ThreadPool::CreateIdleThread(size_t size)
{
idleThreadLock.Lock();
for (size_t i = ; i < size; i++)
{
idleThreadList.push_back(new Thread(this));
}
idleThreadLock.UnLock();
} void ThreadPool::DeleteIdleThread(size_t size)
{
idleThreadLock.Lock();
size_t t = idleThreadList.size();
if (t >= size)
{
for (size_t i = ; i < size; i++)
{
auto thread = idleThreadList.back();
delete thread;
idleThreadList.pop_back();
}
}
else
{
for (size_t i = ; i < t; i++)
{
auto thread = idleThreadList.back();
delete thread;
idleThreadList.pop_back();
}
}
idleThreadLock.UnLock();
} ThreadPool::Thread *ThreadPool::GetIdleThread()
{
Thread *thread = NULL;
idleThreadLock.Lock();
if (idleThreadList.size() > )
{
thread = idleThreadList.front();
idleThreadList.pop_front();
}
idleThreadLock.UnLock(); if (thread == NULL && getCurNumOfThread() < maxNumOfThread)
{
thread = new Thread(this);
} if (thread == NULL && waitTaskList.size() > THRESHOLE_OF_WAIT_TASK)
{
thread = new Thread(this);
InterlockedIncrement(&maxNumOfThread);
}
return thread;
} void ThreadPool::MoveBusyThreadToIdleList(Thread * busyThread)
{
idleThreadLock.Lock();
idleThreadList.push_back(busyThread);
idleThreadLock.UnLock(); busyThreadLock.Lock();
for (auto it = busyThreadList.begin(); it != busyThreadList.end(); it++)
{
if (*it == busyThread)
{
busyThreadList.erase(it);
break;
}
}
busyThreadLock.UnLock(); if (maxNumOfThread != && idleThreadList.size() > maxNumOfThread * 0.8)
{
DeleteIdleThread(idleThreadList.size() / );
} PostQueuedCompletionStatus(completionPort, , (DWORD)GET_TASK, NULL);
} void ThreadPool::MoveThreadToBusyList(Thread * thread)
{
busyThreadLock.Lock();
busyThreadList.push_back(thread);
busyThreadLock.UnLock();
} void ThreadPool::GetTaskExcute()
{
Thread *thread = NULL;
WaitTask *waitTask = NULL; waitTask = GetTask();
if (waitTask == NULL)
{
return;
} if (waitTask->bLong)
{
if (idleThreadList.size() > minNumOfThread)
{
thread = GetIdleThread();
}
else
{
thread = new Thread(this);
InterlockedIncrement(&numOfLongFun);
InterlockedIncrement(&maxNumOfThread);
}
}
else
{
thread = GetIdleThread();
} if (thread != NULL)
{
thread->ExecuteTask(waitTask->task, waitTask->param, waitTask->taskCb);
delete waitTask;
MoveThreadToBusyList(thread);
}
else
{
waitTaskLock.Lock();
waitTaskList.push_front(waitTask);
waitTaskLock.UnLock();
} } ThreadPool::WaitTask *ThreadPool::GetTask()
{
WaitTask *waitTask = NULL;
waitTaskLock.Lock();
if (waitTaskList.size() > )
{
waitTask = waitTaskList.front();
waitTaskList.pop_front();
}
waitTaskLock.UnLock();
return waitTask;
} ThreadPool::Thread::Thread(ThreadPool *threadPool) :
busy(FALSE),
thread(INVALID_HANDLE_VALUE),
task(NULL),
taskCb(NULL),
exit(FALSE),
threadPool(threadPool)
{
thread = (HANDLE)_beginthreadex(, , ThreadProc, this, CREATE_SUSPENDED, );
} ThreadPool::Thread::~Thread()
{
exit = TRUE;
task = NULL;
taskCb = NULL;
ResumeThread(thread);
WaitForSingleObject(thread, INFINITE);
CloseHandle(thread);
} BOOL ThreadPool::Thread::isBusy()
{
return busy;
} void ThreadPool::Thread::ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback)
{
busy = TRUE;
this->task = task;
this->param = param;
this->taskCb = taskCallback;
ResumeThread(thread);
} unsigned int ThreadPool::Thread::ThreadProc(PVOID pM)
{
Thread *pThread = (Thread*)pM; while (true)
{
if (pThread->exit)
break; //线程退出 if (pThread->task == NULL && pThread->taskCb == NULL)
{
pThread->busy = FALSE;
pThread->threadPool->MoveBusyThreadToIdleList(pThread);
SuspendThread(pThread->thread);
continue;
} int resulst = pThread->task(pThread->param);
if(pThread->taskCb)
pThread->taskCb(resulst);
WaitTask *waitTask = pThread->threadPool->GetTask();
if (waitTask != NULL)
{
pThread->task = waitTask->task;
pThread->taskCb = waitTask->taskCb;
delete waitTask;
continue;
}
else
{
pThread->task = NULL;
pThread->param = NULL;
pThread->taskCb = NULL;
pThread->busy = FALSE;
pThread->threadPool->MoveBusyThreadToIdleList(pThread);
SuspendThread(pThread->thread);
}
} return ;
}
// ThreadPool.cpp: 定义控制台应用程序的入口点。
// #include "stdafx.h"
#include "ThreadPool.h"
#include <stdio.h> class Task
{
public:
static int Task1(PVOID p)
{
int i = ;
while (i >= )
{
printf("%d\n", i);
Sleep();
i--;
}
return i;
}
}; class TaskCallback
{
public:
static void TaskCallback1(int result)
{
printf(" %d\n", result);
}
}; int main()
{
ThreadPool threadPool(, );
for (size_t i = ; i < ; i++)
{
threadPool.QueueTaskItem(Task::Task1, NULL, TaskCallback::TaskCallback1);
}
threadPool.QueueTaskItem(Task::Task1, NULL, TaskCallback::TaskCallback1, TRUE); getchar(); return ;
}