对于计算密集型的业务通常使用线程池进行多线程并发处理,线程池在一定程度上可以有效的提高CPU的利用率,降低无用消耗。Ceph中实现了两种类型的线程池类(本质相同),并实现消息队列处理类。其中线程池类负责具体线程创建,维护和销毁。消息队列处理类则负责队列的管理,包括入队,出队等,消息队列类为一个模板类,保证其处理数据类型的通用性。
线程池(ThreadPool),在Ceph的代码中广泛用到:
Ceph中线程池的实现也比较复杂:
如上图所示:左边为成员函数的实现,右边为成员变量,包含一些比较重要的数据成员:
工作线程集合_threads。
等待Join操作的旧线程集合_old_threads。
work_queues变量则保存了由该线程池中线程处理的消息队列。一般情况下,一个工作队列对应一个类型的处理任务,一个线程池对应一个工作队列,专门用于处理该类型的任务。如果是后台任务,又不紧急,就可以将多个工作队列放置到一个线程池里,该线程池可以处理不同类型的任务。
线程池创建后可以创建指定数量(以参数而定)的线程,而线程会调用其工作函数进行具体的工作。在ThreadPool线程池中,线程的工作函数会调用线程池的worker成员函数进行具体的工作。
线程池的worker成员函数会遍历work_queues,从其中找出每一个消息队列实例,并调用该实例的函数遍历其中保存的消息,并进行消息的具体处理工作。
线程池可以对多种类型的消息队列进行处理,其实现方式是利用了面向对象多态的特性。线程池成员work_queues本身是一个消息队列超类,而具体使用时的消息队列都有各自的实现。
线程池的实现主要包括:线程池的启动过程,线程池对应的工作队列的管理,线程池对应的执行函数如何执行任务。
1.1 线程池的启动
函数ThreadPool::start() 用来启动线程池,其在加锁的情况下,调用函数start_threads,该函数检查当前线程数,如果小于配置的线程池,就创建新的工作线程。
void ThreadPool::start()
{
ldout(cct,10)<< "start" <<dendl;
if(_thread_num_option.length()) {
ldout(cct, 10) << "registering config observer on " << _thread_num_option << dendl;
cct->_conf->add_observer(this);
}
_lock.Lock();
start_threads();
_lock.Unlock();
ldout(cct,15)<< "started" <<dendl;
}
1.2 工作队列
工作队列(WorkQueue)定义了线程池要处理的任务。
WorkQueue实现了一部分功能:进队列和出队列,以及加锁,并用通过条件变量通知相应的处理线程。
1.3 线程池的执行函数
函数worker为线程池的执行函数:
voidThreadPool::worker(WorkThread *wt)
其处理过程如下:
1)首先检查_stop标志,确保线程池没有关闭。
2)调用函数join_old_threads把旧的工作线程释放掉。检查如果线程数量大于配置的数量_num_threads,就把当前线程从线程集合中删除,并加入_old_threads队列中,并退出循环。
3)如果线程池没有暂时中止,并且work_queues不为空,就从last_work_queue开始,遍历每一个工作队列,如果工作队列不为空,就取出一个item,调用工作队列的处理函数做处理。
1.4 超时检查
TPHandle是一个有意思的事情。
class TPHandle{
friendclass ThreadPool;
CephContext *cct;
heartbeat_handle_d *hb;
time_t grace;
time_t suicide_grace;
public:
TPHandle(
CephContext *cct,
heartbeat_handle_d *hb, //心跳
time_t grace,
time_t suicide_grace) //自杀的超时时间
: cct(cct),hb(hb),grace(grace),suicide_grace(suicide_grace) {}
voidreset_tp_timeout();
voidsuspend_tp_timeout();
};
每次线程函数执行时,都会设置一个grace超时时间,当线程执行超过该时间,就认为是unhealthy的状态。当执行时间超过suicide_grace时,OSD就会产生断言而导致自杀。
结构heartbeat_handle_d记录了相关信息,并把该结构添加到HeartbeatMap的系统链表中保存。OSD会有一个定时器,定时检查是否超时。
1.5 ShardedThreadPool
Ceph中实现的另一个线程池类是SharedThreadPool,该类的实现思路与ThreadPool类似。两者的不同之处在于对消息队列的处理方式。
ThreadPool实现对多(种)队列的处理,每个线程都有机会处理工作队列的任意一个任务。这就会导致一个问题,如果任务之间有互斥性,那么正在处理该任务的两个线程有一个必须等待另一个处理完成后才能处理,从而导致线程的阻塞,性能下降。
而SharedThreadPool则实现了一种多线程共享队列的处理方式,也即只有一个消息队列,多个线程同时对该队列进行处理
例2-1 如表2-1所示,线程Thread1和Thread2分别正在处理Job1和Job2。
由于Job1和Job2的关联性,二者不能并发执行,只能顺序执行,二者之间用一个互斥锁来控制。如果Thread1先获得互斥锁就先执行,Thread2必须等待,直到Thread1执行完Job1后释放了该互斥锁,Thread2获得该互斥锁后才能执行Job2。显然,这种任务的调度方式应对这种不能完全并行的任务是有缺陷的。实际上Thread2可以去执行其他任务,比如Job5。Job1和Job2既然是顺序的,就都可以交给Thread1执行。
表2-1 ThreadPool的处理模型示列
因此,引入了Sharded ThreadPool进行管理。ShardedThreadPool对上述的任务调度方式做了改进,其在线程的执行函数里,添加了表示线程的thread_index:
voidshardedthreadpool_worker(uint32_t thread_index);
具体如何实现Shard方式,还需要使用者自己去实现。其基本的思想就是:每个线程对应一个任务队列,所有需要顺序执行的任务都放在同一个线程的任务队列里,全部由该线程执行。
整体摘自《Ceph源码分析》