c++封装编写线程池

时间:2023-02-19 17:51:31

在csapp学习或者其他linux底层编程的过程中,一般都会举一些多线程或多进程的例子,配合底层同步原语、系统调用api来解释怎么创建多线程/多进程。

但是这些例子和实际项目中所用到的多线程/多进程编程知识有很大的距离(小例子很好理解,但是为了完成一个任务基本就没有什么思路了)。

我学习多线程/多进程编程由4个月了,一开始就知道有线程池实现的问题(面试现场白板撸代码的),可是怎么都实现不了,这一次说什么都要学会(那怕是从别的blog中copy的代码)。


 什么是线程池?

线程池ThreadPool 就是为了存放“线程”的对象池,它的引入就是为了:限制应用程序中同一时刻运行的线程数。根据系统的环境,可以手动或者自动设置线程数量,达到最优效果。

在程序中,如果某个创建某种对象所需要的代价太高,同时这个对象又可以反复使用,那么我们往往就会准备一个容器,用来保存一批这样的对象。于是乎,我们想要用这种对象时,就不需要每次去创建一个,而直接从容器中取出一个现成的对象就可以了。由于节省了创建对象的开销,程序性能自然就上升了。这个容器就是“池”。很容易理解的是,因为有了对象池,因此在用完对象之后必须有一个“归还”的动作,这样便可以把对象放回池中,下次需要的时候就可以再次拿出来使用了。[来自http://www.cnblogs.com/JeffreyZhao/archive/2009/07/22/thread-pool-1-the-goal-and-the-clr-thread-pool.html]

对于线程是稀缺资源,创建和销毁开销比较大。我们可以在一开始就直接创建一批线程对象放在线程池中,然后需要用到线程对象的时候从线程池中取,线程对象用完了放入线程池中。在线程池内部,任务被插入到一个task队列(任务队列,这个在线程池的实现过程中都会用到的数据结构,你可以看看网络上每一个线程池的例子都会包括这个任务队列)上,线程池的线程会去取这个队列的任务。当一个新任务插入到队列时,一个空闲线程就会从任务队列上取job。这是最直观的线程池工作过程。

线程池进程用在多线程服务器上,每个通过网络到达服务器的链接都被包装成一个任务并且传递给线程池。线程池的线程会并发的处理连接上的请求。

线程池的作用:

合理利用线程池能够带来三个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

线程池实现了系统中线程的数量。
根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

线程池的实现:

我们需要解决的是这些线程对象是由谁来创建的,这些线程对象是在什么创建的?

Java中jdk1.5之后加入了java.util.concurrent包,封装好了线程池。里面有很多参数和属性,可以编写并发程序。

但是c++中,自己封装同步原语,自己调 系统调用,造个*。

使用的同步原语有
pthread_mutex_t mutex_l;//互斥锁
pthread_cond_t condtion_l;//条件变量
使用的系统调用有
pthread_mutex_init();
pthread_cond_init();
pthread_create(&thread_[i],NULL,threadFunc,this)
pthread_mutex_lock()
pthread_mutex_unlock()
pthread_cond_signal()
pthread_cond_wait()
pthread_cond_broadcast();
pthread_join()
pthread_mutex_destory()
pthread_cond_destory()

上面函数、变量的意思可以在这里找到答案[ http://www.cnblogs.com/li-daphne/p/5558435.html ]。

实现的思路--->就是怎么组装这些函数和原语:

c++封装编写线程池

在ThreadPool中封装基础原语和任务队列,利用面向对象的方法,具体的Mytask任务派生自Task,可以利用多态的机制向ThreadPool中添加一些任务。


具体的代码在这里

#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include "ThreadPool.h" using namespace std;
class Mytask : public lz::Task{
public:
Mytask(){}
virtual int run(){
printf("thread[%lu] : %s\n",pthread_self(),(char*)this->arg_);
sleep();
return ;
}
}; int main()
{
cout << "begin" << endl;
char szTmp[] = "hello world";
Mytask taskobj;
taskobj.setArg((void*)szTmp); lz::ThreadPool threadPool();
threadPool.start(); for(int i = ;i<;i++){
threadPool.addTask(&taskobj);
} while(){
printf("there are still %d tasks need to process\n",threadPool.size());
if(threadPool.size()==){
threadPool.stop();
printf("now i will exit from main\n");
exit();
}
sleep();
}
cout<<"end"<<endl;
return ;
}

----------

#ifndef THREADPOOL_H_INCLUDED
#define THREADPOOL_H_INCLUDED /**********************************************
*Project:TinyThreadPool
*
*Author:lizhen
*email:lizhen_ok@aliyun.com
*
***********************************************/ #include <deque>
#include <string>
#include <pthread.h>
#include <string.h>
#include <stdlib.h> // 使用C++98 语言规范实现的线程池: 面向对象做法,每一个job都是Task继承类的对象
namespace lz
{
class Task
{
public:
Task(void* arg = NULL, const std::string taskName = "")
: arg_(arg)
, taskName_(taskName)
{
}
virtual ~Task()
{
}
void setArg(void* arg)
{
arg_ = arg;
} virtual int run() = ; protected:
void* arg_;
std::string taskName_;
}; class ThreadPool
{
public:
ThreadPool(int threadNum = );
~ThreadPool(); public:
size_t addTask(Task *task);
void stop();
int size();
void start();
Task* take(); private:
int createThreads();
static void* threadFunc(void * threadData); private:
ThreadPool& operator=(const ThreadPool&);
ThreadPool(const ThreadPool&); private:
volatile bool isRunning_;
int threadsNum_;
pthread_t* threads_; std::deque<Task*> taskQueue_;
pthread_mutex_t mutex_;
pthread_cond_t condition_;
};
}
#endif // THREADPOOL_H_INCLUDED

------

/**********************************************
*Project:TinyThreadPool
*
*Author:lizhen
*email:lizhen_ok@aliyun.com
*
***********************************************/
#include "ThreadPool.h"
#include <stdio.h>
#include <assert.h> namespace lz
{
ThreadPool::ThreadPool(int threadNum)
{
threadsNum_ = threadNum;
//isRunning_ = true;
} void ThreadPool::start(){
createThreads();
isRunning_ = true;
} ThreadPool::~ThreadPool()
{
stop();
for(std::deque<Task*>::iterator it = taskQueue_.begin(); it != taskQueue_.end(); ++it)
{
delete *it;
}
taskQueue_.clear();
} int ThreadPool::createThreads()
{
pthread_mutex_init(&mutex_, NULL);
pthread_cond_init(&condition_, NULL);
threads_ = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum_);
for (int i = ; i < threadsNum_; i++)
{
pthread_create(&threads_[i], NULL, threadFunc, this);
}
return ;
} size_t ThreadPool::addTask(Task *task)
{
pthread_mutex_lock(&mutex_);
taskQueue_.push_back(task);
int size = taskQueue_.size();
pthread_mutex_unlock(&mutex_);
pthread_cond_signal(&condition_);
return size;
} void ThreadPool::stop()
{
if (!isRunning_)
{
return;
} isRunning_ = false;
pthread_cond_broadcast(&condition_); for (int i = ; i < threadsNum_; i++)
{
pthread_join(threads_[i], NULL);
} free(threads_);
threads_ = NULL; pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&condition_);
} int ThreadPool::size()
{
pthread_mutex_lock(&mutex_);
int size = taskQueue_.size();
pthread_mutex_unlock(&mutex_);
return size;
} Task* ThreadPool::take()
{
Task* task = NULL;
while (!task)
{
pthread_mutex_lock(&mutex_);
while (taskQueue_.empty() && isRunning_)
{
pthread_cond_wait(&condition_, &mutex_);
} if (!isRunning_)
{
pthread_mutex_unlock(&mutex_); break;
}
else if (taskQueue_.empty())
{
pthread_mutex_unlock(&mutex_);
continue;
} assert(!taskQueue_.empty());
task = taskQueue_.front();
taskQueue_.pop_front();
pthread_mutex_unlock(&mutex_);
}
return task;
} void* ThreadPool::threadFunc(void* arg)
{
pthread_t tid = pthread_self();
ThreadPool* pool = static_cast<ThreadPool*>(arg);
while (pool->isRunning_)
{
Task* task = pool->take(); if (!task)
{
printf("thread %lu will exit\n", tid);
break;
} assert(task);
task->run();
}
return ;
}
//bool ThreadPool::getisRunning_(){return isRunning_;} }

c++封装编写线程池的更多相关文章

  1. 浅谈线程池(中):独立线程池的作用及IO线程池

    原文地址:http://blog.zhaojie.me/2009/07/thread-pool-2-dedicate-pool-and-io-pool.html 在上一篇文章中,我们简单讨论了线程池的 ...

  2. java 中的线程池

    1.实现下面的一个需求,控制一个执行函数只能被五个线程访问 package www.weiyuan.test; public class Test { public static void main( ...

  3. 《Android开发艺术探索》读书笔记 &lpar;11&rpar; 第11章 Android的线程和线程池

    第11章 Android的线程和线程池 11.1 主线程和子线程 (1)在Java中默认情况下一个进程只有一个线程,也就是主线程,其他线程都是子线程,也叫工作线程.Android中的主线程主要处理和界 ...

  4. java线程池分析和应用

    比较 在前面的一些文章里,我们已经讨论了手工创建和管理线程.在实际应用中我们有的时候也会经常听到线程池这个概念.在这里,我们可以先针对手工创建管理线程和通过线程池来管理做一个比较.通常,我们如果手工创 ...

  5. 《android开发艺术探索》读书笔记(十一)--Android的线程和线程池

    接上篇<android开发艺术探索>读书笔记(十)--Android的消息机制 No1: 在Android中可以扮演线程角色的有很多,比如AsyncTask.IntentService.H ...

  6. Python并发编程之线程池&amp&semi;进程池

    引用 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我 ...

  7. Python并发复习4- concurrent&period;futures模块&lpar;线程池和进程池&rpar;

    Python标准库为我们提供了threading(多线程模块)和multiprocessing(多进程模块).从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提 ...

  8. &lbrack;python&rsqb; ThreadPoolExecutor线程池 python 线程池

    初识 Python中已经有了threading模块,为什么还需要线程池呢,线程池又是什么东西呢?在介绍线程同步的信号量机制的时候,举得例子是爬虫的例子,需要控制同时爬取的线程数,例子中创建了20个线程 ...

  9. Python3 里面的线程池

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time def task(i): print ...

随机推荐

  1. WEB – Architecture

  2. python 学习(二)--关于类

    1.没有权限控制,在类方法或变量前加 "__" 两下划线,则变为"私有"变量(实际通过_<类名>__<变量或方法名> 可以访问) 2.类 ...

  3. eclipse&colon; workspace出错导致无法启用的解决

    通常我们会在eclipse中创建多个workspace,比如一个用于学习,一个用于工作... ,因为种种原因,时不时会发现eclipse切换workspace后启动失败,提示让你去看workspace ...

  4. iOS开发--浅谈CocoaAsyncSocket编程

    Socket就是一种特殊的文件.它是一个连接了两个用户的文件,任何一个用户向Socket里写数据,另一个用户都能看得到,不管这两个用户分布在世界上相距多么遥远的角落,感觉就像坐在一起传纸条一样. 这么 ...

  5. 将文件从数据库(MySQL)中进行读取

    package com.play; import java.io.FileOutputStream; import java.io.OutputStream; import java.sql.Blob ...

  6. 可存放任意类型变量的动态数组--C语言实现

    之前在训练营的时候被要求用C语言实现一个可以存放任意类型数据的栈.现在尝试实现一个数组版本. 首先用到的结构体如下(接触了Win32编程所以长得有点像里面的那些类型): typedef struct ...

  7. JUnit 3&period;8&period;1 源码学习

    JUnit 3.8.1 源码学习 环境搭建(源码加载配置) 由于IDE自身含有JUint插件,因此通过正常途径是没有源码加载入口的,因此需通过手动加载扩展JAR,然后再添加对应源码JAR,如图:项目右 ...

  8. gulp的使用&lpar;二&rpar;之gulpfile&period;js文件的配置

    Gulpfile.js是什么文件: gulp是前端开发过程中对代码进行构建的工具,是自动化项目的构建利器:她不仅能对网站资源进行优化,而且在开发过程中很多重复的任务能够使用正确的工具自动完成:使用她, ...

  9. VC&period;文件时间

    1. #include <stdio.h> #include <windows.h> void GetFileTimeZ(char *_pcFullFileName, FILE ...

  10. &lbrack;LeetCode&amp&semi;Python&rsqb; Problem 728&period; Self Dividing Numbers

    A self-dividing number is a number that is divisible by every digit it contains. For example, 128 is ...