目录
一、线程池概念
二、线程的封装及线程池类成员变量的介绍
三、单例模式
饿汉方式(Eager Initialization)
懒汉方式(Lazy Initialization)
四、RAII类型的互斥锁
五、日志类的实现
六、简单的任务类创建
七、线程池的创建
一、线程池概念
线程池(Thread Pool)是一种基于池化技术的线程使用模式,它创建了一个线程的集合,这些线程可以被多个任务重复使用。线程池的主要目的是减少在创建和销毁线程时所产生的性能开销。
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的一些关键特性:
-
资源管理:线程池通过限制系统中执行线程的数量,可以有效地管理资源。这样可以避免因为创建过多线程而导致的资源耗尽问题。
-
性能提升:线程池中的线程可以被多个任务重复使用,这减少了线程创建和销毁的开销,从而提高了程序的性能。
-
任务调度:线程池通常包含一个任务队列,用于存放待执行的任务。线程池中的线程可以从队列中获取任务并执行。
-
线程复用:线程池中的线程执行完一个任务后,不会销毁,而是可以被重新分配去执行其他任务。
-
控制并发:线程池可以控制同时执行的线程数量,从而控制并发级别。
-
提高响应速度:线程池中的线程通常处于就绪状态或运行状态,因此可以快速响应新的任务请求。
-
灵活性:线程池可以根据需要进行配置,例如设置线程数量、任务队列的大小等。
二、线程的封装及线程池类成员变量的介绍
【声明】:本篇代码中所使用的 LOG() 函数为记录日志信息的函数,在后续章节中我们会单独讲解。在本篇文章中,大家将其看作类似于 printf 的打印信息操作即可。
对原生线程的封装头文件 Thread.hpp:
#include <pthread.h>
#include <string>
#include <iostream>
#include <functional>
#include "Log.hpp"
class Thread;//声明类
using FuncType = std::function<void(Thread* thread)>;//包装器
//线程类的封装
class Thread
{
private:
pthread_t _tid; //线程ID
std::string _thread_name; //线程名
FuncType _func; //线程的执行函数
bool _is_running; //线程的状态
private:
//执行任务函数
void Excute()
{
_is_running = true;
_func(this);
_is_running = false;
}
//类中的函数参数包含this指针,使用static修饰
static void* ThreadRoute(void* arg)
{
Thread* self = static_cast<Thread*>(arg);
self->Excute();
return (void*)0;
}
public:
Thread(const std::string& thread_name, FuncType func)
:_thread_name(thread_name), _func(func)
{
_is_running = false;
}
//线程启动
bool Start(){
int ret = pthread_create(&_tid, NULL, ThreadRoute, (void*)this);
if (ret != 0){
return false;
}
LOG(DEBUG, "Threads Start!!!\n");
return true;
}
//线程取消
bool Stop()
{
if(_is_running){
int ret = pthread_cancel(_tid);
if (ret != 0){
return false;
}
LOG(DEBUG, "Threads Stop!!!\n");
_is_running = false;
}
return true;
}
//回收线程
bool Join()
{
if(!_is_running){
int ret = pthread_join(_tid, NULL);//不关心线程返回值,设置为NULL
if (ret != 0){
return false;
}
}
LOG(DEBUG, "Threads Are Joined!!!\n");
return true;
}
const std::string get_name() const{
return _thread_name;
}
};
线程池类的成员变量:
template <typename T>
class ThreadPool
{
private:
bool _is_running; // 线程池的状态:运行或终止
int _threads_sleep_num; // 正在休眠的线程个数
std::queue<T> _task_queue; // 临界资源,用于储存线程池需要执行的任务
pthread_mutex_t _mutex; // 保护临界资源
pthread_cond_t _cond; // 条件变量,当线程无任务可执行时,在条件变量下等待休眠,直到被唤醒
std::vector<Thread> _threads_pool; // 线程池:用数组组织起来
int _max_capacity; // 线程池中线程的最大个数
};
在本节线程池的代码当中,我们将采用懒汉方式实现的单例模式来设计线程池。
三、单例模式
饿汉方式(Eager Initialization)
饿汉方式在程序启动时就立即初始化单例对象。这种方式简单且线程安全,因为它避免了延迟初始化可能带来的线程同步问题。
Linux C++实现:
#include <pthread.h>
class Singleton {
private:
Singleton() {}
Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;
static Singleton* instance; // 不需要 mutex
public:
static Singleton* getInstance() {
return instance; // 直接返回已经创建的实例
}
};
// 饿汉方式:在类加载时创建实例
Singleton* Singleton::instance = new Singleton(); // 直接在这里实例化
懒汉方式(Lazy Initialization)
懒汉方式在第一次使用时才创建单例对象。这种方式可以延迟对象的创建,节省资源,但需要处理线程同步问题。
Linux C++实现:
#include <pthread.h>
class Singleton {
private:
static Singleton* instance;
static pthread_mutex_t mutex;
Singleton() {}
Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;
public:
static Singleton* getInstance() {
if (instance == nullptr) {
pthread_mutex_lock(&mutex);
if (instance == nullptr) {
instance = new Singleton();
}
pthread_mutex_unlock(&mutex);
}
return instance;
}
};
// 初始化静态成员变量
Singleton* Singleton::instance = nullptr;
pthread_mutex_t Singleton::mutex = PTHREAD_MUTEX_INITIALIZER;
在懒汉方式中,我们使用了双重检查锁定(Double-Check Locking)模式来确保线程安全。首先检查实例是否已经创建,如果未创建,则进入锁定区域并再次检查。这样可以减少锁的竞争所带来 的开销,因为一旦实例被创建,后续的调用就不需要锁了。
注意: 在Linux系统中,使用pthread_mutex_t
来实现线程同步。在饿汉方式中,我们只在初始化实例时加锁,而在懒汉方式中,我们使用双重检查锁定模式来确保线程安全。
总结:
- 饿汉方式:类加载时就创建实例,线程安全,但可能造成资源浪费。
- 懒汉方式:第一次使用时才创建实例,节省资源,但需要处理线程同步问题。
在实际应用中,选择哪种方式取决于具体需求和场景。如果对性能要求较高,可以考虑使用懒汉方式,并结合双重检查锁定来优化。如果对线程安全要求更高,可以选择饿汉方式。
为什么要使用互斥锁来保证单例模式创建线程池对象的原子性?
防止多线程同时初始化实例、确保实例的唯一性: 在多线程环境中,如果多个线程同时访问单例类的getInstance()
方法,并且实例尚未创建,那么每个线程都可能尝试创建一个新的实例。这将违反单例模式的原则,即只创建一个实例。 互斥锁确保只有一个线程可以执行实例化代码,即使在高并发的情况下。这样可以保证无论有多少线程尝试创建实例,都只会有一个实例被创建。
四、RAII类型的互斥锁
RAII(Resource Acquisition Is Initialization,资源获取即初始化)是一种管理资源的编程技术,它通过将资源的生命周期绑定到对象的生命周期来确保资源的正确管理。对于互斥锁而言,RAII意味着在对象创建时获取锁,在对象销毁时释放锁。
#pragma once
#include <pthread.h>
class LockGuard
{
private:
pthread_mutex_t* _mutex;
public:
LockGuard(pthread_mutex_t* mutex)
: _mutex(mutex)
{
if (_mutex) {
pthread_mutex_lock(_mutex); // 加锁
}
}
~LockGuard()
{
if (_mutex) {
pthread_mutex_unlock(_mutex); // 解锁
}
}
};
五、日志类的实现
#pragma once
#include <iostream>
#include <fstream>
#include <vector>
#include <string>
#include "LockGuard.hpp"
#include <unistd.h>
#include <sys/types.h>
#include <ctime>
#include <cstdarg>
#include <cstring>
#include "LockGuard.hpp"
#include <pthread.h>
// 日志信息等级
// "DEBUG" /* 调试信息 */, "INFO" /* 正常信息 */, "WARNING" /* 警告信息 */, "ERROR" /* 错误信息 */, "FATAL" /* 致命信息 */
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
struct Log_Message
{
std::string _level;
pid_t _pid;
std::string _file_name;
int _file_number;
std::string _log_info;
std::string _current_time;
};
#define TO_SCREEN 1 // 向屏幕打印选项
#define TO_FILE 2 // 向文件打印选项
#define TIME_BUFF 128
#define INFO_BUFF 1024
class Log
{
private:
Log_Message log_message; // 日志信息
int _print_choice; // 打印选项:屏幕/文件
std::string _print_file_name; // 如果向文件打印,文件的名称
pthread_mutex_t _log_mutex; // 保证多线程操作时,日志打印的原子性
public:
Log(int print_choice = TO_SCREEN, const std::string &print_file_name = "./log.txt")
: _print_choice(print_choice), _print_file_name(print_file_name)
{
pthread_mutex_init(&_log_mutex, nullptr);
}
// struct tm
// {
// int tm_sec; /* Seconds (0-60) */
// int tm_min; /* Minutes (0-59) */
// int tm_hour; /* Hours (0-23) */
// int tm_mday; /* Day of the month (1-31) */
// int tm_mon; /* Month (0-11) */
// int tm_year; /* Year - 1900 */
// int tm_wday; /* Day of the week (0-6, Sunday = 0) */
// int tm_yday; /* Day in the year (0-365, 1 Jan = 0) */
// int tm_isdst; /* Daylight saving time */
// };
std::string get_current_time()
{
time_t now = time(nullptr); // 使用time函数获取时间戳
struct tm *time = localtime(&now); // 将时间戳作为参数传给localtime,获取当前时间。函数返回值为指向结构体的指针。
char time_buff[TIME_BUFF];
snprintf(time_buff, TIME_BUFF, "%d/%02d/%02d %02d:%02d:%02d", time->tm_year + 1900, time->tm_mon + 1,
time->tm_mday, time->tm_hour, time->tm_min, time->tm_sec);
return time_buff;
}
void CreateLogMessage(int level, std::string filename, int filenumber, const char *format, ...)
{
log_message._level = LevelToString(level);
log_message._file_name = filename;
log_message._file_number = filenumber;
log_message._pid = getpid();
log_message._current_time = get_current_time();
// 可变参数的使用
// 1、定义可变参数变量
va_list arg;
// 2、初始化可变参数
va_start(arg, format); // ap 是 va_list 类型的变量。last_fixed_parameter 是函数中最后一个固定参数的名称,va_start 使用它来确定可变参数的起始位置。
// 3、使用vsnprintf函数将可变参数写入数组中
char info_buff[INFO_BUFF];
vsnprintf(info_buff, INFO_BUFF, format, arg);
log_message._log_info = info_buff;
// 4、销毁可变参数变量
va_end(arg);
FlushLog(log_message);
}
void FlushLogToScreen(const Log_Message &lg)
{
printf("[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._pid,
lg._file_name.c_str(),
lg._file_number,
lg._current_time.c_str(),
lg._log_info.c_str());
}
void FlushLogToFile(const Log_Message &lg)
{
/*std::ios::app 是一个打开模式(open mode),表示以“追加”(append)方式打开文件。
当以 std::ios::app 模式打开文件时,所有写入的内容将被添加到文件的末尾,而不是覆盖文件已有的内容。
如果文件不存在,在以追加模式打开时会创建一个新文件。*/
std::ofstream out(_print_file_name, std::ios::app);
char info[INFO_BUFF];
snprintf(info, sizeof(info), "[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._pid,
lg._file_name.c_str(),
lg._file_number,
lg._current_time.c_str(),
lg._log_info.c_str());
out.write(info, strlen(info));
out.close();
}
void FlushLog(const Log_Message &lg)
{
// 加过滤逻辑 --- 例如:某些等级不需要写入日志当中,直接过滤掉即可
LockGuard lockguard(&_log_mutex); // 加锁,保证多线程写入日志时的操作的原子性
switch (_print_choice)
{
case TO_SCREEN:
FlushLogToScreen(lg);
break;
case TO_FILE:
FlushLogToFile(lg);
break;
}
}
// 改变日志打印路径
void ChangePrintChoice(int new_choice)
{
_print_choice = new_choice;
}
~Log()
{
pthread_mutex_destroy(&_log_mutex);
}
};
/* 各种预定义宏 */
// __LINE__:在源代码中插入当前源代码行号;
// __FILE__:在源文件中插入当前源文件名;
// __DATE__:在源文件中插入当前的编译日期
// __TIME__:在源文件中插入当前编译时间;
// __VA_ARGS__ 就是一个可变参数的宏,替代上面的"..."
// ##__VA_ARGS__ 就是当可变参数个数为0时,将参数列表中前面多余的,(逗号)去掉.
Log lg; // 当包含该头文件时,自动创建一个日志对象
// 如此,使用LOG(...)即可写入日志,简化操作
#define LOG(LEVEL, FORMAT, ...) \
do \
{ \
lg.CreateLogMessage(LEVEL, __FILE__ /*使用预定义宏自动填充参数*/, __LINE__, FORMAT, ##__VA_ARGS__); \
} while (0)
#define EnableScreen() \
do \
{ \
lg.Enable(SCREEN_TYPE); \
} while (0)
#define EnableFILE() \
do \
{ \
lg.Enable(FILE_TYPE); \
} while (0)
六、简单的任务类创建
#pragma once
#include<iostream>
#include<functional>
// 要做加法
class Task
{
public:
Task()
{}
Task(int x, int y) : _x(x), _y(y)
{}
void Excute()
{
_result = _x + _y;
}
void operator ()()
{
Excute();
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "= ?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
七、线程池的创建
线程池的创建和管理通常涉及以下几个步骤:
- 初始化:创建线程池,设置线程池的大小、任务队列等参数。
- 启动线程池:为线程池添加创建的线程,并使线程中的线程去执行任务
- 任务提交:将任务提交给线程池执行。
- 任务执行:线程池中的线程从任务队列中取出任务并执行。
- 线程复用:执行完任务的线程可以被重新用于执行其他任务。
- 关闭:当线程池不再需要时,可以关闭线程池,释放资源。
#include <iostream>
#include <string>
#include <pthread.h>
#include <queue>
#include <vector>
#include "Thread.hpp"
#include <functional>
#include "Log.hpp"
#include "LockGuard.hpp"
static const int NUMBER = 5;
template <typename T>
class ThreadPool
{
private:
bool _is_running; // 线程池的状态:运行或终止
int _threads_sleep_num; // 正在休眠的线程个数
std::queue<T> _task_queue; // 临界资源,用于储存线程池需要执行的任务
pthread_mutex_t _mutex; // 保护临界资源
pthread_cond_t _cond; // 条件变量,当线程无任务可执行时,在条件变量下等待休眠,直到被唤醒
std::vector<Thread> _threads_pool; // 线程池:用数组组织起来
int _max_capacity; // 线程池中线程的最大个数
static ThreadPool<T>* _single_instance_ptr; // 单例模式,对象指针,类外初始化
static pthread_mutex_t _instance_mutex; // 保证创建单例模式时的线程安全
// 构造
ThreadPool(const int max_num = NUMBER) // 提供需要创建的线程池中线程的最大数量
: _max_capacity(max_num), _threads_sleep_num(0)
{
// 线程池中的线程数不能超过NUMBER个
if (max_num > NUMBER)
{
_max_capacity = NUMBER;
}
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
// 提供线程执行函数
// 由于初始化Thread类对象需要提供返回值为void,参数为Thread* 的函数,而我们想要线程创建完成后直接执行线程池内部的逻辑
// 但由于类域中的函数有一个隐形的this指针参数,所以我们使用bind绑定HandlerTask的第一个参数为this指针,再留取一个占位符来自行传入Thread*类型的参数
FuncType task = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
// 创建线程池,初始化线程池中的线程
for (int i = 1; i <= _max_capacity; i++)
{
std::string thread_name = "Thread - " + std::to_string(i);
_threads_pool.emplace_back(thread_name, task);
}
LOG(DEBUG, "ThreadPool Is Inited!!!\n");
}
// 启动线程池
void Start()
{
_is_running = true;
for (auto &t : _threads_pool)
{
if (t.Start() == false)
{
perror("Thread Start False!!!");
exit(-1);
}
}
LOG(DEBUG, "ThreadPool Is Started!!!\n");
}
// 线程执行函数
void HandlerTask(const Thread *thread)
{
// 保证线程池中的各个线程不退出,一直休眠/处理任务
while (true)
{
//----------进入临界区,先竞争互斥锁-----------
LockQueue();
// 判断任务队列是否为空,为空则线程在条件变量下等待任务
// 此时线程池需处于运行态
while (IsEmpty() && _is_running)
{
// 如果任务队列为空,此时该线程无任务处理,需要休眠,线程休眠个数+1
_threads_sleep_num++;
Sleep();
// 此时线程被唤醒,线程池线程休眠个数-1
_threads_sleep_num--;
}
if (IsEmpty() && !_is_running) // 如果线程池任务队列为空并且线程池已经停止运行,此时需要终止线程,释放互斥锁
{
std::cout << thread->get_name() << " quit..." << std::endl;
// 释放互斥锁
UnLockQueue();
// 终止循环
break;
}
//如果任务队列【不为空】,就要继续执行完任务队列中的任务:
// 1、线程池在【运行态】时,继续执行任务。
// 2、线程池处于终止态时,需要先执行完任务,直到符合 【队列为空】 且 【线程池处于终止态】 条件后退出执行流。
// 从任务队列中提取任务
T thread_task = _task_queue.front();
// 从任务队列中删除已经提取的任务
_task_queue.pop();
UnLockQueue();
//----------退出临界区,释放互斥锁--------------
// 执行任务
thread_task();
LOG(DEBUG, "%s : %s\n", thread->get_name().c_str(), thread_task.result().c_str());
// ps:为什么要把执行任务放在临界区之外呢?因为当执行任务时,就代表该线程对临界资源的操作已经执行完毕。
// 此时继续占用保护临界区的互斥锁会导致其他线程在该线程执行任务期间无法从任务队列获取任务
// 同时,线程池的目的是为了提高执行任务的效率,增加执行的并发与并行性。所以我们需要减少线程对锁的占用时间
// 如果将执行任务这一动作放入临界区中,会导致各个线程的操作转变为串行,大大降低了执行效率
}
}
ThreadPool(const ThreadPool<T>&) = delete;
ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;
public:
static ThreadPool<T>* GetInstance(int capacity = NUMBER)
{
// 在单例的获取过程中,双检测锁会先检查实例是否存在,如果不存在,再进入临界区(使用同步机制如互斥锁)来确保多线程环境中单例的创建是线程安全的。
// 一旦实例创建完成,后续的获取操作就不需要进入临界区,从而减少了线程同步的开销。
if(_single_instance_ptr == nullptr)
{
LockGuard lm(&_instance_mutex);
if(_single_instance_ptr == nullptr)// 双重判定空指针, 降低锁冲突的概率, 提高性能.
{
LOG(DEBUG, "Single Instance ThreadPool Is Created\n");
_single_instance_ptr = new ThreadPool<T>(capacity);// 使用互斥锁, 保证多线程情况下也只调用一次 new.
_single_instance_ptr->Start();
}
else
{
LOG(DEBUG, "Single Instance ThreadPool Has Been Created\n");
}
}
return _single_instance_ptr;
}
// 线程在条件变量下等待
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mutex);
}
// 向线程池(的任务队列)中推送任务
void Push(const T &in)
{
//----------进入临界区,先竞争互斥锁-----------
LockQueue();
if (_is_running) // 只有当线程池处于运行状态时,才可以推送任务
{
_task_queue.push(in);
if (_threads_sleep_num > 0) // 只要有正在休眠的线程,就唤醒一个来处理任务
{
WakeUpOne(); // 使用signal函数唤醒一个线程
}
}
UnLockQueue();
//----------退出临界区,释放互斥锁--------------
}
// 唤醒一个线程
void WakeUpOne()
{
pthread_cond_signal(&_cond);
}
// 唤醒所有线程
void WakeUpAll()
{
pthread_cond_broadcast(&_cond);
}
// 判断任务队列是否为空
bool IsEmpty()
{
return _task_queue.empty();
}
// 终止线程池
void Stop()
{
// 加锁:保证对线程池状态修改操作的原子性,确保各个线程所看到线程池状态的一致性
LockQueue();
_is_running = false; // 将线程池运行状态设置为终止,此时无法再向任务队列中推送任务
// 如果有线程在进行休眠,直接唤醒所有休眠的线程。
// 让线程池中的线程处理完任务队列中的任务后再进行退出
WakeUpAll();
//进行线程等待,回收线程资源
for(auto& t : _threads_pool)
{
t.Join();
}
//解锁
UnLockQueue();
LOG(DEBUG, "ThreadPool Has Been Stopped!!!\n");
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
};
//静态变量类外初始化
template <typename T>
ThreadPool<T>* ThreadPool<T>::_single_instance_ptr = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_instance_mutex = PTHREAD_MUTEX_INITIALIZER;
主函数测试逻辑:
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <unistd.h>
#include "Log.hpp"
int main()
{
srand(time(nullptr));
int c = 6;
while(c--)
{
int x = rand() % 5;
int y = rand() % 6;
Task t(x, y);
ThreadPool<Task>::GetInstance(3)->Push(t);
sleep(1);
}
ThreadPool<Task>::GetInstance(3)->Stop();
return 0;
}