目录
一、项目介绍
本项目实现的是一个高并发的内存池,其原型是Google的开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存malloc,实现了高效的多线程内存管理,用于替换系统的内存分配相关函数malloc和free
tcmalloc的知名度也是非常高的,不少公司都在用它,比如Go语言就直接用它做了内存分配器
本项目是将tcmalloc最核心的框架简化后拿出来,模拟实现出一个高并发内存池,目的是为了学习tcamlloc的精华
该项目主要涉及C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的技术
二、内存池的初步认识
2.1 池化技术
池化技术,就是程序先向系统申请过量的资源,然后自行进行管理,以备不时之需
之所以要申请过量的资源,是因为申请和释放资源都有较大的开销,不如提前申请一些资源放入"池"中,当需要资源时则可以直接从"池"中获取,不需要时就将该资源重新放回"池"中即可。这样使用时就会变得较为快捷,可以达到提高程序的运行效率的目的
在计算机中,有很多地方都使用了"池"这种技术,如连接池、线程池、对象池等。以服务器上的线程池为例,其主要思想就是:先启动若干数量的线程,让它们处于睡眠状态。当接收到客户端的请求时,再唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求后,线程又进入睡眠状态
2.2 内存池
内存池是指程序预先向操作系统申请一块足够大的内存.此后,当程序中需要申请内存的时候,不需直接向操作系统申请,而是直接从内存池中获取;同理,当释放内存的时候,并不是真正将内存返回给操作系统,而是将内存返回给内存池。当程序退出时(或某个特定时间),内存池才将之前申请的内存真正释放
内存池所解决问题
内存池主要解决的就是效率的问题,其能够避免程序频繁的向系统申请和释放内存。其次,内存池作为系统的内存分配器,还需要尝试解决内存碎片的问题
内存碎片分为内部碎片和外部碎片:
外部碎片是一些空闲的小块内存区域,由于这些内存空间不连续,以至于合计的内存足够,但是不能满足一些内存分配申请需求
内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用
注意: 内存池尝试解决的是外部碎片的问题,同时也尽可能的减少内部碎片的产生
2.3 malloc
C语言中动态申请内存并不是直接向堆申请的,而是通过malloc函数去申请的;C++中的new实际上也是封装了malloc函数
申请内存块时是先调用malloc,malloc再去向操作系统申请内存。malloc实际就是一个内存池,malloc相当于向操作系统"批发"了一块较大的内存空间,然后"零售"给程序用,当全部"售完"或程序有大量的内存需求时,再根据实际需求向操作系统"进货"
malloc的实现方式有很多种,一般情况下不同编译器平台用的是不同的。比如Windows的VS系列中的malloc就是微软自行实现的,而Linux下的gcc用的是glibc中的ptmalloc
三、定长内存池
malloc其实是一个通用的内存池,在什么场景下都适用,但也意味着malloc在什么场景下都不会具有很高的性能,因为malloc并不是针对某种场景专门设计的
定长内存池则是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此性能可以达到极致,并且在实现定长内存池时不需要考虑内存碎片等问题,因为申请/释放的都是固定大小的内存块
通过实现定长内存池可以熟悉对简单内存池的控制,其次,这个定长内存池也会在后面会作为高并发内存池的一个基础组件(代替new操作符)
实现定长
在实现定长内存池时要做到"定长"有许多方式,比如可以使用非类型模板参数,使得在该内存池中申请到的对象的大小都是N
template<size_t N>
class ObjectPool
{};
定长内存池也可以被称为"对象池"。在创建对象池时,对象池可以根据传入的对象类型的大小来实现"定长",比如创建定长内存池时传入对象类型int,那么该内存池就只支持 sizeof(int) 字节大小内存的申请和释放
template<class T>
class ObjectPool
{};
向堆区申请内存
既然是内存池,那么首先得向系统申请一块内存空间,然后对其进行管理。要想直接向堆申请内存空间,在Windows下可以调用VirtualAlloc()系统接口;在Linux下可以调用brk()或mmap()系统接口
#ifdef WIN32
#include <windows.h>
#else
#include <sys/mman.h>
#include <unistd.h>
#endif
inline static void* SystemAlloc(size_t kpage)//按页申请内存
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
//Linux下brk mmap等
#endif
if (ptr == nullptr) throw std::bad_alloc();
return ptr;
}
通过条件编译将对应平台下向堆区申请内存的函数进行封装,此后就不必再关心当前所在平台,当需要直接向堆区申请内存时直接调用封装后的SystemAlloc()函数即可
定长内存池中的成员变量
对于向堆区申请到的大块内存,可以用一个指针来对其进行管理,但仅用一个指针肯定不够,还需要一个变量来记录这块内存的长度
由于此后需要将这块内存进行切分,为了方便切分操作,指向这块内存的指针最好是字符指针,因为指针的类型决定了指针的步长以及查看数据的大小。对于字符指针而言,当需要向后移动n个字节时,直接对字符指针进行加n操作即可
使用完后释放回来的定长内存块也需被管理,可以将这些释放回来的定长内存块链接成一个链表。管理释放回来的内存块的链表被称为*链表,为了能找到这个*链表,因此还需要一个指向*链表的指针
char* _memory = nullptr;//char*便于切割分配内存 指向大块内存
void* _freeList = nullptr;//*链表 管理归还的内存块
size_t _remainBytes = 0;//记录剩余字节数
管理被释放内存块的具体方案
对于回收的定长内存块,可以使用*链表将其链接起来,但并不需要为其专门定义链式结构,可以让内存块的前4个字节(32位平台)或8个字节(64位平台)存储后面内存块的起始地址
指针在32位平台上占用4个字节,在64位平台上占用8个字节,那么如何写出既适应32位平台也适应64位平台的代码呢?
当需要访问一个内存块的前4/8个字节时,可以先该内存块的首地址强转为二级指针,由于二级指针存储的是一级指针的地址,二级指针解引用能向后访问一个指针的大小(在32位下为4个字节、64位平台为8个字节,自动适应了环境),此时就访问到了该内存块的前4/8个字节,即下一个内存块的首地址
void*& NextObj(void* ptr) { return *(void**)ptr; }
申请对象
申请对象时,内存池应该优先把还回来的内存块对象再次重复利用,因此若*链表中有内存块的话,就直接从*链表中头删一个内存块直接返回即可
若*链表中没有空闲内存块,那么就在大块内存中切出定长的内存块进行返回。当内存块切出后及时更行 _memory 指针的指向,以及 _remainBytes 的值即可
若大块内存已经不足以切分出一个对象时,就应该调用封装的SystemAlloc()函数,再次向堆申请一块内存空间,此时也要及时更新_memory指针的指向,以及_remainBytes的值(可能存在浪费内存,即所剩内存不足以切出一个对象但_memory却有了新的指向)
由于当内存块释放时需要将内存块链接到*链表当中,因此必须保证切出来的对象至少能够存储得下一个地址,所以当对象的大小小于当前所在平台指针的大小时,需要按指针的大小进行内存块的切分,即需向上对齐
T* New()
{
T * obj = nullptr;
if (_freeList != nullptr)//优先使用分配过的内存
{
obj = (T*)_freeList;
_freeList = *(void**)_freeList;//强转为void**后解引用为void*,即在32位系统下可以看到4个字节,64位系统下可以看到8个字节
}
else
{
size_t objSize = sizeof(T) > sizeof(void*) ? sizeof(T) : sizeof(void*);//确保至少能存储一个指针大小
if (_remainBytes < objSize)//大块内存空间不足
{
_remainBytes = 128 * 1024;
_memory = (char*)SystemAlloc(_remainBytes >> 13);
}
obj = (T*)_memory;
_memory += objSize;
_remainBytes -= objSize;
}
new(obj)T;//定位new 调用对象构造函数初始化
return obj;
}
注意:这是一个定长对象内存池,当内存块切分出来后,应使用定位new,显示调用该对象的构造函数对其进行初始化
释放对象
注意:在释放对象时,应该显示调用该对象的析构函数清理该对象,因为该对象可能还管理着其他某些资源,若不对其进行清理那么这些资源将无法被释放,就会导致内存泄漏
析构后将该内存块头插入_freeList中即可
完整代码
#pragma once
#include <iostream>
using std::cout;
using std::endl;
#ifdef WIN32
#include <windows.h>
#else
#include <sys/mman.h>
#include <unistd.h>
#endif
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr) throw std::bad_alloc();
return ptr;
}
template <class T>
class ObjectPool
{
public:
T* New()
{
T * obj = nullptr;
if (_freeList != nullptr)//优先使用分配过的内存
{
obj = (T*)_freeList;
_freeList = *(void**)_freeList;//强转为void**后解引用为void*,即在32位系统下可以看到4个字节,64位系统下可以看到8个字节
}
else
{
size_t objSize = sizeof(T) > sizeof(void*) ? sizeof(T) : sizeof(void*);//确保至少能存储一个指针大小
if (_remainBytes < objSize)//大块内存空间不足
{
_remainBytes = 128 * 1024;
_memory = (char*)SystemAlloc(_remainBytes >> 13);
}
obj = (T*)_memory;
_memory += objSize;
_remainBytes -= objSize;
}
new(obj)T;//定位new 调用对象构造函数初始化
return obj;
}
void Delete(T* obj)
{
obj->~T();//调用对象析构函数
*(void**)obj = _freeList;//头插
_freeList = obj;
}
private:
char* _memory = nullptr;//char*便于切割分配内存 指向大块内存
void* _freeList = nullptr;//*链表 管理归还的内存块
size_t _remainBytes = 0;//记录剩余字节数
};
性能对比
在只创建定长对象的情况下,使用下面的代码对new/delete和定长内存池进行性能对比
#include "Object_pool.h"
#include <vector>
#include <ctime>
using std::vector;
struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode() :_val(0), _left(nullptr), _right(nullptr) {}
};
void TestObjectPool()
{
const size_t Rounds = 3;// 申请释放的轮次
const size_t N = 1000000;// 每轮申请释放多少次
std::vector<TreeNode*> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i) v1.push_back(new TreeNode);
for (int i = 0; i < N; ++i) delete v1[i];
v1.clear();
}
size_t end1 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
ObjectPool<TreeNode> TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i) v2.push_back(TNPool.New());
for (int i = 0; i < N; ++i) TNPool.Delete(v2[i]);
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
int main()
{
TestObjectPool();
return 0;
}
不难发现,定长内存池消耗的时间比malloc/free消耗的时间要短。这是因为malloc是一个通用的内存池,而定长内存池是专门针对申请定长对象而设计的,因此在这种特殊场景下定长内存池的效率更高
四、整体框架设计介绍
如今很多的开发环境都是多核多线程,因此在申请内存的时,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀了,但是在并发场景下可能会因为频繁的加锁和解锁导致效率有所降低,而该项目的原型tcmalloc实现的就是一种在多线程高并发场景下更胜一筹的内存池
在实现内存池时一般需要考虑到效率问题和内存碎片的问题,但对于高并发内存池来说,还需要考虑在多线程环境下的锁竞争问题
- thread_cache: 线程缓存是每个线程独有的,用于小于等于256KB的内存分配,每个线程独享一个thread_cache
- central_cache: 中心缓存是所有线程所共享的,当thread_cache需要内存时会按需从central _cache中获取内存,而当thread_cache中的内存满足一定条件时,central_cache也会在合适的时机对其进行回收
- page_cache: 页缓存中存储的内存是以页为单位进行存储及分配的,当central_cache需要内存时,page_cache会分配出一定数量的页分配给central_cache,而当central_cache中的内存满足一定条件时,page_cache也会在合适的时机对其进行回收,并将回收的内存尽可能的进行合并,组成更大的连续内存块,缓解内存碎片的问题
解释:
- 每个线程都有一个属于自己的thread cache,也就意味着线程在thread_cache申请内存时是不需加锁的,而一次性申请大于256KB内存的情况是很少的,因此大部分情况下申请内存时都是无锁的,这也是高并发内存池高效的地方
- 每个线程的thread cache会根据自己的情况向central cache申请或归还内存,这避免了出现单个线程的thread cache占用太多内存,而其余thread cache出现内存吃紧的问题
- 多线程的thread cache可能会同时找central cache申请内存,此时就会涉及线程安全的问题,因此在访问central cache时是需要加锁的,但central cache实际上是一个哈希桶的结构,只有当多个线程同时访问同一个桶时才需要加锁,所以这里的锁竞争也不会很激烈
各个模块的作用
thread_cache主要解决锁竞争的问题,每个线程独享thread_cache,当自己的thread_cache中有内存时该线程不会去和其他线程进行竞争,每个线程只要在各自的thread_cache申请内存就行了
central_cache主要起到一个居中调度的作用,每个线程的thread_cache需要内存时从central _cache获取,而当thread_cache的内存多了就会将内存还给central_cache,其作用类似于一个中枢,因此取名为中心缓存
page_cache就负责提供以页为单位的大块内存,当central_cache需要内存时就会去向page_cache申请,而当page_cache没有内存了就会直接去找系统,也就是直接去堆上按页申请内存块
五、申请内存
5.1 ThreadCache
5.1.1 ThreadCache整体设计
定长内存池只需支持固定大小内存块的申请释放,因此定长内存池中只需一个*链表管理释放回来的内存块。现在要支持申请和释放不同大小的内存块,那么就需要多个*链表来管理释放回来的内存块。ThreadCache实际上是一个哈希桶结构,每个桶中存放的都是一个*链表
ThreadCache支持小于等于256KB内存的申请,若将每种字节数的内存块都用一个*链表进行管理的话,那么就需要20多万个(256*1024)*链表,光是存储这些*链表的头指针就需要消耗大量内存,这显然是得不偿失的
此时可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐,例如让这些字节数都按照8字节进行向上对齐。譬如当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推
因此当线程要申请某一大小的内存块时,就需要经过对齐规则计算得到对齐后的字节数,进而找到对应的哈希桶,若该哈希桶中的*链表中有内存块,那就从*链表中头删一个内存块进行返回;若该*链表已经为空了,那么就需要向下一层的CentralCache进行获取
但由于对齐的原因,就会产生一些碎片化的内存无法被利用,比如线程只申请了6Byte的内存,而ThreadCache却直接给了8Byte的内存,多给出的2Byte就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内碎片问题
5.1.2 ThreadCache哈希桶映射与对齐规则
内存块是会被链接到*链表上的,因此一开始肯定是按8字节进行对齐是最合适的,因为必须保证这些内存块,无论是在32位平台下还是64位平台下,都至少能够存储得下一个指针
但若所有的字节数都按照8字节进行对齐的话,那么就需要建立256 * 1024 ÷ 8 = 32768 个桶,这个数量还是比较多的,实际上可以让不同范围的字节数按照不同的对齐数进行对齐
虽然对齐产生的内碎片会引起一定程度上的空间浪费,但按照上面的对齐规则,可以将浪费率控制到百分之十左右。
需要说明的是,1~128这个区间不做讨论,因为1字节就算是对齐到2字节也有百分之五十的浪费率,并且小区间就算浪费率较高也并不会产生太大的浪费,这里从第二个区间开始进行计算
根据上面的公式,要得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。
比如 129~1024 这个区间的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,即144。那么该区间的最大浪费率也就是15 ÷ 144 ≈ 10.42%。同样的道理,后面两个区间的最大浪费率分别是127 ÷ 1152 ≈ 11.02% 和1023 ÷ 9216 ≈ 11.10%。
对齐函数的编写
关于这个函数可以封装到一个DataHandleRules类中,但当中的成员函数最好设置为静态成员函数,否则在调用这些函数时就需要通过对象去调用。并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数
在获取某一字节数向上对齐后的字节数时,可以先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理
static inline size_t AlignUp(size_t size)
{
if (size < 128) return _AlignUp(size, 8);
else if (size < 1024) return _AlignUp(size, 16);
else if (size < 8 * 1024) return _AlignUp(size, 128);
else if (size < 64 * 1024) return _AlignUp(size, 1024);
else if (size < 256 * 1024) return _AlignUp(size, 8 * 1024);
else {
assert(false);
return -1;
}
}
此时就需要编写子函数,该子函数需要通过对齐数计算出某一字节数对齐后的字节数
//一般写法
static inline size_t _AlignUp(size_t bytes, size_t alignNum)
{
size_t alignSize = 0;
if (bytes%alignNum != 0) {
alignSize = (bytes / alignNum + 1)*alignNum;
}
else {
alignSize = bytes;
}
return alignSize;
}
除了上述写法还可以通过位运算的方式来进行计算,虽然位运算并不容易理解,但计算机执行位运算的速度是比执行乘法和除法更快的
static inline size_t _AlignUp(size_t bytes, size_t alignNum) {
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
对于上述位运算,以10字节按8字节对齐为例进行分析。8 − 1 = 7,7就是一个低三位为1其余位为0的二进制序列,将10与7相加,相当于将10字节当中不够8字节的剩余字节数补上了
然后再将该值与7按位取反后的值(11000)进行与运算,而7按位取反后是一个低三位为0其余位为1的二进制序列,该操作进行后相当于屏蔽了该值的低三位而该值的其余位保持不变,此时得到的值就是10字节按8字节对齐后的值,即16字节
映射函数的编写
在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用子函数进一步处理
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128) {
return _Index(bytes, 3);//8 等于 2的3次方
}
else if (bytes <= 1024) {
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024) {
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024) {
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024) {
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else {
assert(false);
return -1;
}
}
为了提高效率同样使用位运算来解决,但是此时传入的并不是该字节数的对齐数,而是将对齐数写成2的n次方的形式后,将这个n值进行传入。比如对齐数是8,传入的就是3
static inline size_t _Index(size_t bytes, size_t align_shift) {
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
以10字节按8字节对齐为例进行分析。此时传入的alignShift就是3,将1左移3位后得到的实际上就是对齐数8,8 − 1 = 7 ,即还是让10与7相加。
之后再将该值向右移3位,实际上就是让17除以8,此时相当于屏蔽了该值二进制的低三位,因为除以8得到的值与其二进制的低三位无关,所以我们可以说是将10对齐后的字节数除以了8,此时得到了2,而最后还需要减一是因为数组的下标是从0开始的
5.1.3 TSL无锁访问
每个线程都有一个各自独享的ThreadCache,那应该如何创建这个ThreadCache?显然不能将这个ThreadCache创建为全局属性,因为全局变量是所有线程共享的,这样就不可避免的需要使用锁来进行控制,会增加了控制成本和代码复杂度,并且效率也会有所降低
要实现每个线程无锁的访问属于独自的ThreadCache,可以使用线程局部存储TLS(Thread Local Storage)。这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性
// TLS Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
通过TLS,每个线程可以无锁的获取各自专属的ThreadCache对象
if (pTLSThreadCache == nullptr) {
pTLSThreadCache = new ThreadCache;
}
5.1.4 ThreadCache核心设计
按照上述的映射对齐规则,ThreadCache中桶的个数即*链表的个数是208,以及ThreadCache允许申请的最大内存大小256KB,可以将这些数据按照如下方式进行定义:
static const size_t MAX_BYTES = 256 * 1024;//能在threadcache申请的最大字节数
static const size_t NFREELIST = 208;//thread_cache && central_cache 桶数
ThreadCache本质是一个存储208个*链表的数组,目前ThreadCache就先提供一个Allocate()函数用于申请对即可,后面随着不断编写增加即可
class ThreadCache
{
public:
void* Allocate(size_t size);
private:
FreeList _freeLists[NFREELIST];
};
在ThreadCache申请对象时,通过所给字节数计算出对应的哈希桶下标。若桶中*链表不为空,则从该*链表中取出一个对象进行返回;但若此时*链表为空,那么就需从CentralCache获取,FetchFromCentralCache()函数就是ThreadCache类中的一个成员函数
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = DataHandleRules::AlignUp(size);
size_t bucketIndex = DataHandleRules::Index(size);
if (!_freeLists[bucketIndex].IsEmpty()){
return _freeLists[bucketIndex].Pop();
}
else {
return FetchFromCentralCache(bucketIndex, alignSize);
}
}
慢开始反馈调节算法
当ThreadCache向CentralCache申请内存时,应该向CentralCach申请多少个小内存块呢?若申请的太少,那么ThreadCache在短时间内用完了又需要申请;但若一次性申请的太多,可能用不完就浪费了
鉴于此,这里采用慢开始反馈调节算法。当ThreadCache向CentralCache申请内存时,若申请的是较小的对象,那么可以多给一点,但若申请的是较大的对象,就可以少给一点
通过下面这个函数,就可以根据所需申请的内存块的大小计算出具体给出的内存块个数的上限值,并且将该上限值控制到2~512个之间。就算ThreadCache要申请的对象再小,最多CentralCache一次性给出512个内存块
static size_t MoveSize(size_t size)
{
assert(size > 0);
// [2, 512] 一次批量移动多少个对象的(慢启动)上限值
int num = MAX_BYTES / size;
if (num < 2) num = 2; //大对象一次批量上限低
if (num > 512) num = 512; //小对象一次批量上限高
return num;
}
既然计算的是上限值,那么具体该给出多少呢?
在FreeList结构中增加一个叫做_maxSize的成员变量,该变量的初始值设置为1,并且提供一个公有成员函数MaxSize()用于获取这个变量。即现在ThreadCache中的每个*链表都会有一个各自的_maxSize
class FreeList//*链表:用于管理切分过的小块内存
{
public:
void Push(void* obj)
{
assert(obj != nullptr);
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
void* Pop()
{
assert(_freeList != nullptr);
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
void PushRange(void* start, void* end, size_t n)//头插一段内存块
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
void PopRange(void*& start, void*& end, size_t n)
{
assert(n <= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; ++i) {
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
bool IsEmpty() { return _freeList == nullptr; }
size_t& MaxSize() { return _maxSize; }
size_t Size() { return _size; }
private:
void* _freeList = nullptr;
size_t _maxSize = 1;
size_t _size = 0;
};
此时当ThreadCache申请对象时,会比较_maxSize和计算得出的值,取出其中的较小值作为本次申请对象的个数。若本次采用的是_maxSize的值,那么会将ThreadCache中该*链表的_maxSize的值增加
ThreadCache第一次向CentralCache申请某大小的内存块时,申请到的都是一个,但下一次申请同样大小的对象时,因为该*链表中的_maxSize增加了,就会申请到三个。直到该*链表中_maxSize的值,超过上限值后就不会继续增长了,此后申请到的内存块数都是计算出的上限值
//慢开始反馈调节算法
//并不会一开始一批量向central_cache索要太多,可能使用不完
size_t batchNum = min(_freeLists[index].MaxSize(), DataHandleRules::MoveSize(size));
if (batchNum == _freeLists[index].MaxSize()) {
_freeLists[index].MaxSize() += 2;//若不断需要size大小的内存,那么batchNum就会不断增长直至上限
}
ThreadCache向CentralCache申请内存块
每次ThreadCache向CentralCache申请对象时,先通过慢开始反馈调节算法计算出本次应申请的小内存块的个数,然后再向CentralCache进行申请
若ThreadCache最终申请到小内存块的个数为1,那么直接将该内存块返回即可。为什么需要返回一个申请到的内存呢?因为ThreadCache要向CentralCache申请内存块,其实是由于某个线程向ThreadCache申请但ThreadCache当中没有,才导致ThreadCache向CentralCache申请内存块。因此CentralCache将内存块返回给ThreadCache后,ThreadCache会将该内存块返回给申请对象的线程
但若ThreadCache最终申请到多个内存块,那么除了将第一个内存块返回之外,还需要将剩下的内存块挂入ThreadCache对应的哈希桶中
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//慢开始反馈调节算法
//并不会一开始一批量向central_cache索要太多,可能使用不完
size_t batchNum = min(_freeLists[index].MaxSize(), DataHandleRules::MoveSize(size));
if (batchNum == _freeLists[index].MaxSize()) {
_freeLists[index].MaxSize() += 2;//若不断需要size大小的内存,那么batchNum就会不断增长直至上限
}
void* start = nullptr, * end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchMemoryBlock(start, end, batchNum, size);
assert(actualNum > 0);//至少分配一个
if (actualNum == 1) {
assert(start == end);
return start;
}
else{
_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);//将后面的内存头插thread_cache*链表中
return start;//将第一个内存块返回给外面使用
}
}
5.2 CentralCache
5.2.1 CentralCache整体设计
CentralCache与ThreadCache的相同之处
CentralCache结构与ThreadCache是基本类似的,都为哈希桶结构,并且遵循的对齐映射规则相同。这样的好处就是:当ThreadCache的某个桶中没有内存了,就可以直接到CentralCache中相同位置的哈希桶里去索取内存
CentralCache与ThreadCache的不同之处
CentralCache与ThreadCache有两个明显不同的地方:
- ThreadCache是每个线程独享的,但是CentralCache是所有线程共享的。每个线程的Thread Cache没有内存了都会去找CentralCache,因此在访问CentralCache时是需要加锁的。但在加锁时并不是将整个CentralCache全部锁上,而是使用桶锁,即每个桶都有一个锁。只有当多个线程同时访问CentralCache的同一个桶时才会存在锁竞争,若是多个线程同时访问CentralCache的不同桶就不会存在锁竞争,这也使得锁竞争并不是十分激烈
- ThreadCache的每个桶中挂的是一个个切好的内存块,而CentralCache的每个桶中挂的是一个个的Span(跨度)
每个span管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的形式链接起来的,并且每个span里面还有一个*链表,这个*链表里面挂的就是一个个切好了的内存块,根据其所在的哈希桶被切成了对应的大小
5.2.2 CentralCache结构设计
页号的类型
每个程序运行起来后都有自己的进程地址空间,在32位平台下,进程地址空间的大小是字节;而在64位平台下,进程地址空间的大小就是字节
页的大小一般是4K或者8K,以8K为例。在32位平台下,进程地址空间就可以被分成 ÷= 个页;在64位平台下,进程地址空间就可以被分成÷=个页。页号本质与地址一样,都是一个编号,只不过地址是以一个字节为一个单位,而页是以多个字节为一个单位
由于页号在64位平台下的取值范围是[0,) ,因此不能简单的用一个无符号整型来存储页号(只使用32位环境下),这时需要借助条件编译来解决该问题
//Win64环境下_WIN64和_WIN32都存在,Win32环境下只存在_WIN32
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else//Linux
//...
#endif
Span的结构
CentralCache的每个桶里挂的是一个个的Span,Span是管理以页为单位的大块内存的,其结构如下:
//管理多个页的跨度结构
struct Span
{
Span* _prev = nullptr;//双向链表中的结构
Span* _next = nullptr;
PAGE_ID _pageId = 0;//页号
size_t _num = 0;//页的数量
void* _freeList = nullptr;//*链表
size_t _use_count = 0;//记录已分配给ThreadCache的小块内存的数量
};
对于Span管理的以页为单位的大块内存,需要知道这块内存具体在哪一个位置,以便于之后PageCache进行前后页的合并缓解内存碎片问题,因此Span结构当中会记录所管理大块内存起始页的页号 (具体如何合并在后面讲解)
每一个Span管理多少页并不是固定的,由后面的算法来控制,因此span结构中有一个_num成员来代表着该Span管理的页的数量
每个Span管理的大块内存,都会被切成小内存块挂到当前Span的*链表中,比如8Byte哈希桶中的Span,会被切成一个个8Byte大小的内存块挂到当前Span的*链表中,因此Span结构中需要*链表_freeList来存储小块内存块
Span结构中的_use_count成员记录的是,当前Span中已经分配给TreadCache的小块内存块,当某个Span的_use_count计数变为0时,代表当前Span分配出去的小内存块已经全部还回来了,此CentralCache就可以将这个Span再还给PageCache
每个桶当中的Span是以双链表的形式组织起来的,当需要将某个Span归还给PageCache时,就可以很方便的将该Span从双链表结构中移出。若用单链表结构的话则较为麻烦,因为单链表在删除时需要知道当前结点的前一个结点
双链表结构
CentralCache的每个哈希桶中存储的都是一个双链表结构,对于该双链表结构可以进行封装:
class SpanList
{
public:
SpanList()
{
_head = new Span;
assert(_head != nullptr);
_head->_next = _head;
_head->_prev = _head;
}
Span* Begin() { return _head->_next; }
Span* End() { return _head; }
bool IsEmpty() { return _head == _head->_next; }
void PushFront(Span* span) { Insert(Begin(), span); }
Span* PopFront() {
Span* front = _head->_next;
Erase(front);
return front;
}
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
newSpan->_next = pos;
prev->_next = newSpan;
newSpan->_prev = prev;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos != nullptr);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head = nullptr;
public:
std::mutex _mtx;//桶锁
};
注意:从双链表删除的Span会还给PageCache,相当于只是把这个Span从双链表中移除,不需要对删除的Span进行delete操作
5.2.3 CentralCaChe核心设计
所有线程使用的都是同一个CentralCache,即在整个项目中只需要有一个CentralCache对象即可,那么可以使用单例模式进行CentralCache的编写
单例模式可以保证项目中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。单例模式又分为饿汉模式和懒汉模式,懒汉模式较为复杂,这里使用饿汉模式即可
//饿汉单例模式
class CentralCache
{
public:
static CentralCache* GetInstance();
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache() {}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;//声明
};
CentralCache CentralCache::_sInst;
CentralCache* CentralCache::GetInstance() { return &_sInst; }
CentralCache向ThreadCache提供内存块
要从CentralCache获取batchNum个指定大小的内存块,这些内存块肯定都是从CentralCache对应哈希桶中的某个Span中取出来的,因此取出来的这batchNum个内存块是链接在一起的,只需要得到这段链表的头和尾即可,可以采用输出型参数进行获取
size_t CentralCache::FetchMemoryBlock(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = DataHandleRules::Index(size);
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
//从span中获取batchNum块内存块,若不够则有多少获取多少
start = span->_freeList;
end = span->_freeList;
size_t count = 0, actualNum = 1;
while (NextObj(end) != nullptr && count < batchNum - 1) {
end = NextObj(end);
++count;
++actualNum;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
span->_use_count += actualNum;
_spanLists[index]._mtx.unlock();
return actualNum;
}
由于CentralCache是所有线程共享的,所以在访问CentralCache中的哈希桶时,需要先给对应的哈希桶加上桶锁,在获取到对象后再将桶锁解掉
在向CentralCache获取内存块时,先是在CentralCache对应的哈希桶中获取到一个非空的Span,然后从这个Span的*链表中取出batchNum个对象即可,但可能这个非空的span的*链表当中对象的个数不足batchNum个,这时该*链表当中有多少个对象就给多少即可
ThreadCache实际从CentralCache获得的对象的个数可能与传入的batchNum值是不一样的,因此需要统计本次过程中实际ThreadCache获取到的内存块个数,并根据该值及时更新_use_count
虽然实际申请到对象的个数可能比batchNum要小,但这并不会产生任何影响。因为ThreadCache的本意就是向CentralCache申请一个内存块,之所以一次多申请一些内存块,是因为制定了这样的策略来提高效率,使得下次线程再申请相同大小的对象时就可以直接在ThreadCache中获取,而不用再向CentralCache申请对象
CentralCache从PageCache中获取一个非空的Span
ThreadCache向CentralCache申请内存块时,CentralCache需要先从对应的哈希桶中获取到一个非空的Span,然后从这个非空的Span中取出若干内存块给ThreadCache
首先遍历CentralCache对应哈希桶中的双链表,若该双链表中有非空的Span,那么直接将该Span进行返回即可。但若遍历双链表后发现双链表中没有空闲的Span,那么此时CentralCache就需要向PageCache申请内存块
但是,该向PageCache申请多大的内存块呢?可以根据具体所需内存块的大小来决定,之前就根据所需内存块的大小计算出ThreadCache一次向CentralCache申请内存块的个数上限,现在则是根据所需内存块的大小计算出CentralCache一次应该向PageCache申请几页的Span
先根据所需内存块的大小计算出ThreadCache一次向CentralCache申请内存块的个数上限,然后将这个上限值乘以单个内存块的大小,就算出了具体需要多少字节,最后再将这个算出来的字节数转换为页数。若转换后不够一页,那么就申请一页,否则转换出来是几页就申请几页。即Central Cache向PageCache申请内存时,要求申请到的内存尽量能够满足ThreadCache向CentralCache申请时的上限
//一次central_cache向page_cache获取多少页的Span
static size_t NumMovePage(size_t size)
{
size_t num = MoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0) npage = 1;
return npage;
}
代码中的PAGE_SHIFT代表页大小转换偏移。以页的大小为8K为例,PAGE_SHIFT的值为13
//页大小转换偏移 一页为2^13,即8KB
static const size_t PAGE_SHIFT = 13;
当CentralCache申请到若干页的Span后,还需要将这个Span切成一个个对应大小的小内存块挂到该Span的*链表中
找到一个Span所管理的大块内存块呢?首先需要计算出该Span的起始地址,即用这个Span的起始页号乘以一页的大小即可得到这个Span的起始地址,然后用这个Span的页数乘以一页的大小就可以得到这个Span所管理的内存块的大小,用起始地址加上内存块的大小即可得到这块内存块的结束位置
明确了这块内存的起始和结束位置后,就可以进行切分了。根据所需内存块的大小,每次从大块内存切出一块固定大小的内存块尾插到Span的*链表中即可
为什么是尾插呢?因为若是将切好的内存块尾插到*链表,这些内存块看起来是按照链式结构链接起来的,而实际其在物理空间上是连续的,这时当把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存命中率
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
//查看当前CentralCache中的spanlist中是否有还有尚未分配的span
Span* it = list.Begin();
while (it != list.End()) {
if (it->_freeList != nullptr) {
return it;
}
else {
it = it->_next;
}
}
list._mtx.unlock();//将central_cache桶锁释放,此时若其他线程释放内存回来并不会导致阻塞
//运行到此处时即没有空闲span,只能向Page_cache索取
PageCache::GetInstance()->_pageMutex.lock();
Span* span = PageCache::GetInstance()->NewSpan(DataHandleRules::NumMovePage(size));
PageCache::GetInstance()->_pageMutex.unlock();
//计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_num << PAGE_SHIFT;
char* end = start + bytes;
//将大块内存切成*链表并链接起来
span->_freeList = start;
start += size;
void* tail = span->_freeList;
while (start < end) {
NextObj(tail) = start;
tail = NextObj(tail);
start += size;
}
NextObj(tail) = nullptr;
list._mtx.lock();
list.PushFront(span);//将span挂入对应的桶中
return span;
}
在访问PageCache前,应先把CentralCache对应的桶锁解开。虽然此时CentralCache的这个桶当中是没有内存供其他ThreadCache申请的,但ThreadCache除了申请内存还会归还内存,若在访问PageCache前将CentralCache对应的桶锁解开,那么此时其他ThreadCache想要归还内存到CentralCache的这个桶时就不会发生阻塞
因此在调用NewSpan()函数之前,应先将CentralCache对应的桶锁解掉,然后再将PageCache的大锁加上,当申请到k页的Span后将PageCache的大锁解开,但此时不需要立刻加上桶锁。因为CentralCache拿到k页的Span后还需进行切分操作,此时别的线程访问不到该Span,可以在Span切好后需要将其挂入Central Cache对应的桶上时,再加上对应的桶锁
5.3 PageCache
5.3.1 PageCache整体设计
PageCache与CentralCache结构的相同之处
PageCache与CentralCache一样,都是哈希桶结构,并且PageCache的每个哈希桶中挂的也是一个个的Span,并且也是按照双链表的结构链接起来的
PageCache与CentralCache结构的不同之处
CentralCache的映射规则与ThreadCache保持一致,而PageCache的映射规则与它们都不相同PageCache的映射规则采用的是直接定址法,如1号桶挂的都是1页的Span,2号桶挂的都是2页的span,以此类推
CentralCache每个桶中的Span被切成了一个个对应大小的内存块,以供ThreadCache申请。而PageCache当中的Span是没有被进一步切小的,因为PageCache服务的是CentralCache,当CentralCache没有Span时,向PageCache申请某一固定页数的Span,而切分申请到的这个Span由CentralCache完成
PageCache中有多少个桶由编写自行决定,本博客中采用的就是最大128页的方案。为了让桶号与页号对应,将第0号桶空出,因此需要将哈希桶的个数设置为129
//page_cache的桶数+1 || page_cache的最大页数+1 (下标为0位置空出)
static const size_t NPAGES = 129;
本博客为什么采用最大128页的方案呢?因为线程申请单个内存块最大是256KB,而128页可以正好被切成4个256KB的内存块,因此是足够的。若是在采用更大页的方案也是可以的,根据具体的需求进行设置即可
PageCache类设计
当每个线程的ThreadCache没有内存时都会向CentralCache申请,此时多个线程的ThreadCache若访问的不是同一个桶,那么这些线程是可以同时进行访问的。这时CentralCache的多个桶就可能同时向PageCache申请内存的,所以PageCache是存在线程安全问题的,因此在访问PageCache时是必须要加锁的
但是在PageCache中不能使用桶锁,因为当CentralCache向PageCache申请内存时,PageCache可能会将其他桶当中大页的Span切小后再给CentralCache。此外,当CentralCache将某个Span归还给PageCache时,PageCache也会尝试将该Span与其他桶当中的Span进行合并
即在访问PageCache时,可能需要访问PageCache中的多个桶,若PageCache用桶锁就会出现大量频繁的加锁和解锁,导致程序的效率低下。因此在访问PageCache时使用一个大锁将整个Page Cache锁住
PageCache对象在整个进程中也是只能存在一个的,因此需要将其设计为单例模式
//饿汉单例模式
class PageCache
{
public:
static PageCache* GetInstance();//提供一个全局访问接口
private:
PageCache() {}
PageCache(const PageCache&) = delete;
static PageCache _sInst;//声明
SpanList _spanLists[NPAGES];
public:
std::mutex _pageMutex;//整个page_cache的锁
};
PageCache PageCache::_sInst;
PageCache* PageCache::GetInstance() { return &_sInst; }
5.3.2 PageCache获取Span
当调用上述的GetOneSpan()尝试从CentralCache的某个哈希桶获取一个非空的Span时,若遍历哈希桶中的双链表后发现双链表中没有Span,或该双链表中的Span都为空,那么CentralCache就需向PageCache申请若干页的Span了,PageCache获取一个k页的Span并提供给CentralCache呢?
PageCache是直接按照页数进行映射的,若CentralCache要获取一个k页的Span,在PageCache的第k号桶中取出一个Span返回给CentralCache即可。但若第k号桶中没有Span了,这时并不是直接转而向堆区申请一个k页的Span,而是要继续在后面更大的桶中寻找Span
直接向堆申请以页为单位的内存时,应尽量申请大块一点的内存块,因为此时申请到的内存是连续的,当线程需要内存时可以将其切小后分配给线程,而当线程将内存释放后又可以将其合并成大块的连续内存。若向堆申请内存时申请的是小块内存,而需申请多次,那么申请到的内存就不一定是连续的了
当第k号桶中没有空闲的Span时,可以继续找第k+1号桶,因为可以将k+1页的Span切分成一个k页的Span和一个1页的Span,这时可以将k页的Span返回,而将切分后1页的Span挂到1号桶中。但若后面的桶当中都没有空闲的Span,此时就只能向堆申请一个128页的内存块,并将其用一个Span结构管理,然后将128页的Span切分成k页的Span和128-k页的Span,其中k页的Span返回给CentralCache,而128-k页的Span就挂到第128-k号桶中
即每次向堆申请的都是128页大小的内存块,CentralCache中的Span实际都是由128页的Span切分而成的
Span* PageCache::NewSpan(size_t k)//获取一个k页的span
{
assert(k > 0);
//检查第k个桶中是否有可用的span
if (!_spanLists[k].IsEmpty()) {
return _spanLists[k].PopFront();
}
//检查后续的桶中是否有可用的span,若有则进行切分
for (int i = k + 1; i < NPAGES; ++i) {
if (!_spanLists[i].IsEmpty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = _spanPool.New();
kSpan->_pageId = nSpan->_pageId;
nSpan->_pageId += k;
kSpan->_num = k;
nSpan->_num -= k;
_spanLists[nSpan->_num].PushFront(nSpan);
return kSpan;
}
}
//运行此处则说明PageCache中没有可用的span,需向堆中申请128页的span
Span* newSpan = new Span;
void* address = SystemAlloc(NPAGES - 1);
newSpan->_pageId = (PAGE_ID)address >> PAGE_SHIFT;
newSpan->_num = NPAGES - 1;
_spanLists[newSpan->_num].PushFront(newSpan);
return NewSpan(k);//通过递归将大span分为小span
}
六、回收内存
6.1 ThreadCache
当某个线程申请的内存不使用了,可以将其归还给ThreadCache。ThreadCache将该内存块插入到对应哈希桶的*链表中即可
但是随着线程不断的释放,对应*链表的长度也会越来越长,这些内存堆积在一个ThreadCache中却没被使用就会浪费,可以这些内存还给CentralCache。归还后这些内存对其他线程来说也是可申请的,因此当ThreadCache某个桶当中的*链表太长时可以进行一些处理
若ThreadCache某个桶当中*链表的长度 大于等于 其一次批量向CentralCache申请的内存块上限值_maxSize,那么此时就把该*链表当中的这些内存块还给CentralCache
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr != nullptr);
assert(size <= MAX_BYTES);
size_t bucketIndex = DataHandleRules::Index(size);
_freeLists[bucketIndex].Push(ptr);
//当链表长度大于一次批量申请的内存时就归还一段*链表上的内存给CentralCache
if (_freeLists[bucketIndex].Size() == _freeLists[bucketIndex].MaxSize())
ListTooLong(_freeLists[bucketIndex], size);
}
当*链表的长度大于一次批量申请的对象时,从该*链表中取出一次批量个数的对象,然后将取出的这些对象还给CentralCache中对应的Span即可
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr, * end = nullptr;
list.PopRange(start, end, list.MaxSize());
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
6.2 CentralCache
当ThreadCache中某个*链表太长时,会将*链表当中的这些内存块还给CentralCache中的Span,但是还给CentralCache的内存块不一定都是属于同一个Span的。CentralCache中的每个哈希桶中可能都不止一个Span,因此当计算出还回的内存应该还给CentralCache的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个Span
根据小内存块的地址得到其所在的页号
某个页当中的所有地址除以页的大小都等该页的页号。比如假设一页的大小是100,那么地址0~99都属于第0页,并且除以100都等于0,而地址100~199都属于第1页,它们除以100都等于1
找到小内存块对应的Span
虽然可以通过内存块的地址得到其所在的页号,但还是不能知道这个内存块属于哪个Span。因为一个Span管理的可能是多个页
为了解决这个问题,可以建立页号和Span之间的映射。由于这个映射关系在PageCache进行Span的合并时也需要用到,因此直接将其存放到PageCache中。
所以就需要在PageCache类中添加一个映射关系了,可以用C++当中的unordered_map进行实现,并且添加一个函数接口,用于让CentralCache获取这里的映射关系。(下面代码中只展示了PageCache类当中新增的成员)
//单例模式
class PageCache
{
public:
Span* MapObjectToSpan(void* obj);//获取从对象到span的映射
private:
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};
每当PageCache分配Span给CentralCache时,都需要记录一下页号和Span之间的映射关系。此后当ThreadCache还对象给CentralCache时,才知道应该还给哪一个Span
因此当CentralCache在调用NewSpan()接口向PageCache申请k页的Span时,PageCache在返回这个k页的Span给CentralCache之前,应该建立这k个页号与其Span之间的映射关系
Span* PageCache::NewSpan(size_t k)//获取一个k页的span
{
assert(k > 0);
//检查第k个桶中是否有可用的span
if (!_spanLists[k].IsEmpty()) {
Span* kSpan = _spanLists[k].PopFront();
for (PAGE_ID i = 0; i < kSpan->_num; ++i) {
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查后续的桶中是否有可用的span,若有则进行切分
for (int i = k + 1; i < NPAGES; ++i) {
if (!_spanLists[i].IsEmpty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = _spanPool.New();
kSpan->_pageId = nSpan->_pageId;
nSpan->_pageId += k;
kSpan->_num = k;
nSpan->_num -= k;
_spanLists[nSpan->_num].PushFront(nSpan);
//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_num; ++i) {
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//运行此处则说明PageCache中没有可用的span,需向堆中申请128页的span
Span* newSpan = _spanPool.New();
void* address = SystemAlloc(NPAGES - 1);
newSpan->_pageId = (PAGE_ID)address >> PAGE_SHIFT;
newSpan->_num = NPAGES - 1;
_spanLists[newSpan->_num].PushFront(newSpan);
return NewSpan(k);//通过递归将大span分为小span
}
此时就可以通过小内存块的地址 找到其对应页号 再找到其对应的Span了,直接将该内存块的地址除以页的大小得到页号,然后在unordered_map当中找到其对应的Span即可
//获取从小内存块到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end()) {
return ret->second;
}
else {
assert(false);
return nullptr;
}
}
此时当ThreadCache还对象给CentralCache时,就可以依次遍历这些内存块,将这些内存块插入到其对应Span的*链表当中,并且及时更新该Span的_useCount计数即可
在ThreadCache还内存块给CentralCache的过程中,若CentralCache中某个Span的_useCount减到0时,说明这个Span分配出去的内存块全部都回来了,那么此时就可以将这个Span再进一步还给PageCache
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t bucketIndex = DataHandleRules::Index(size);
_spanLists[bucketIndex]._mtx.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_use_count--;
if (span->_use_count == 0) //说明该span切分的内存块都已经归还了,该span可以归还给PageCache
{
_spanLists[bucketIndex].Erase(span);
span->_freeList = nullptr;//看作整体,可使用页号转换为地址来找到内存
span->_next = nullptr;
span->_prev = nullptr;
_spanLists[bucketIndex]._mtx.unlock();
PageCache::GetInstance()->_pageMutex.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMutex.unlock();
_spanLists[bucketIndex]._mtx.lock();
}
start = next;
}
_spanLists[bucketIndex]._mtx.unlock();
}
若把某个Span还给PageCache,需先将这个Span从CentralCache对应的双链表中移除,然后再将该Span的*链表置空,因为PageCache中的Span是不需要切分成一个个的小对象的,以及该Span的前后指针也都应该置空,因为之后要将其插入到PageCache对应的双链表中。但Span当中记录的起始页号以及它管理的页数是不能清除的,否则对应内存块就找不到了
在CentralCache还Span给PageCache时也存在锁的问题,此时需要先将CentralCache中对应的桶锁解开,然后加上PageCache的大锁之后再进入PageCache进行相关操作(此时别的线程可以在桶中进行申请,并不会影响该线程归还,解开桶锁之前已经将要归还给PageCache的Span从CentralCache中的双链表中移除,其他线程无法找到该Span)
当处理完毕回到CentralCache时,除了将PageCache的大锁解开,还需立刻获得CentralCache对应的桶锁,然后将还未还完对象继续还给CentralCache中对应的Span
6.3 PageCache
若CentralCache中有某个Span的_useCount减到0了,那么CentralCache就需将这个Span还给PageCache
这个过程看似是非常简单的,PageCache只需将还回来的Span挂到对应的哈希桶上即可。但实际为了缓解内存外碎片问题,PageCache还需尝试将还回来的Span与其他空闲的Span进合并
PageCache前后页的合并
合并的过程可以分为向前合并和向后合并:
若还回来的Span的起始页号是num,该Span所管理的页数是n。那么在向前合并时,就需要判断第num-1页对应Span是否空闲,若空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止
而在向后合并时,就需要判断第num+n页对应的Span是否空闲,若空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止
因此PageCache在合并Span时需要通过页号获取到对应的Span的,这就是把页号与Span之间的映射关系存储到page cache的原因
但当通过页号找到其对应的Span时,这个Span此时可能挂在PageCache,也可能挂在Central Cache。而在只能合并挂在PageCache的Span,因为挂在CentralCache的Span中的内存块正在被其他线程使用
可是并不能通过Span结构当中的_useCount成员,来判断某个Span是在CentralCache还是在Page Cache。因为当CentralCache刚向PageCache申请到一个Span时,这个Span的_useCount就是等于0的,这时可能正在对该Span进行切分时,PageCache就把这个Span拿去进行合并了,这显然是不合理的
于是,可以在Span结构中增加一个_isUse成员,用于标记这个Span是否正在被使用,而当一个Span结构被创建时默认该Span是没有被使用的
//管理多个页的跨度结构
struct Span
{
Span* _prev = nullptr;//双向链表中的结构
Span* _next = nullptr;
PAGE_ID _pageId = 0;//页号
size_t _num = 0;//页的数量
void* _freeList = nullptr;//*链表
size_t _use_count = 0;//记录已分配给ThreadCache的小块内存的数量
bool _IsUse = false;//是否被使用
};
当CentralCache向PageCache申请到一个span时,需立即将该Span中的_isUse改为true
span->_isUse = true;
当CentralCache将某个Span还给PageCache时,也需将该Span的_isUse改为false
span->_isUse = false;
由于在合并PageCache当中的Span时,需要通过页号找到其对应的Span,因此PageCache中的Span也需要建立页号与Span之间的映射关系
与CentralCache中的Span不同的是,在PageCache中的Span只需建立Span的首尾页号与该Span之间的映射关系。因为当一个Span在尝试进行合并时,若是往前合并,那么只需要通过Span的尾页找到这个Span;若是向后合并,那么只需要通过Span的首页找到这个Span。即在进行合并时只需要用到Span与其首尾页之间的映射关系
因此获取k页的Span时,若是将n页的Span切成了一个k页的Span和一个n-k页的Span,除了需要建立k页Span中每个页与该Span之间的映射关系之外,还需要建立剩下的n-k页的span与其首尾页之间的映射关系
Span* PageCache::NewSpan(size_t k)//获取一个k页的span
{
assert(k > 0);
//检查第k个桶中是否有可用的span
if (!_spanLists[k].IsEmpty()) {
Span* kSpan = _spanLists[k].PopFront();
for (PAGE_ID i = 0; i < kSpan->_num; ++i) {
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查后续的桶中是否有可用的span,若有则进行切分
for (int i = k + 1; i < NPAGES; ++i) {
if (!_spanLists[i].IsEmpty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
kSpan->_pageId = nSpan->_pageId;
nSpan->_pageId += k;
kSpan->_num = k;
nSpan->_num -= k;
_spanLists[nSpan->_num].PushFront(nSpan);
//存储nSpan的首尾页号与nSpan建立映射关系,利于PageCache回收Central_Cache中span时进行合并页
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_num - 1] = nSpan;
//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_num; ++i) {
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//运行此处则说明PageCache中没有可用的span,需向堆中申请128页的span
Span* newSpan = _spanPool.New();
void* address = SystemAlloc(NPAGES - 1);
newSpan->_pageId = (PAGE_ID)address >> PAGE_SHIFT;
newSpan->_num = NPAGES - 1;
_spanLists[newSpan->_num].PushFront(newSpan);
return NewSpan(k);//通过递归将大span分为小span
}
此时PageCache当中的Span就都与其首尾页之间建立了映射关系,合并代码如下:
void PageCache::ReleaseSpanToPageCache(Span* span)
{
while (1)//向前合并
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
if (ret == _idSpanMap.end()) { break; }
//前面相邻页的span正在使用中
Span* prevSpan = ret->second;
if (prevSpan->_IsUse == true) break;
if (prevSpan->_num + span->_num > NPAGES - 1) break;//若合并后大于128页则无法管理
span->_pageId = prevSpan->_pageId;
span->_num += prevSpan->_num;
_spanLists[prevSpan->_num].Erase(prevSpan);
delete prevSpan;
}
while (1)//向后合并
{
PAGE_ID nextId = span->_pageId + span->_num;
auto ret = _idSpanMap.find(nextId );
if (ret == _idSpanMap.end()) { break; }
//后面相邻页的span正在使用中
Span* nextSpan = ret->second;
if (nextSpan->_IsUse == true) break;
if (nextSpan->_num + span->_num > NPAGES - 1) break;//若合并后大于128页则无法管理
span->_num += nextSpan->_num;
_spanLists[nextSpan->_num].Erase(nextSpan);
delete nextSpan;
}
_spanLists[span->_num].PushFront(span);
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_num - 1] = span
span->_IsUse = false;
}
- 若没有通过页号获取到对应的Span,说明对应到该页的内存块还未申请,此时需停止合并
- 若通过页号获取到了其对应的Span,但该Span处于被使用的状态,也必须停止合并
- 若合并后大于128页则不能进行合并,因为PageCache无法对大于128页的Span进行管理
- 在合并Span时,由于这个Span是在PageCache的某个哈希桶的双链表当中,因此在合并后需要将其从对应的双链表中移除,然后再将这个被合并了的Span结构进行delete操作
- 在合并结束后,将合并后的Span挂入PageCache对应哈希桶的双链表中,并且需要建立该Span与其首位页之间的映射关系,便于以后能合并出更大的Span
七、存在问题以及解决
7.1 大于256KB的大块内存问题
申请过程
每个线程的ThreadCache是用于申请小于等于256KB的内存的,而对于大于256KB的内存,可以考虑直接向PageCache申请,但PageCache中最大的页也只有128页,因此若是大于128页的内存申请,则只能直接向堆申请
当申请大于256KB时,若然不能向ThreadCache申请,但分配内存时仍需向上对齐。而之前实现AlignUp()函数时,对传入字节数大于256KB的情况直接做了断言处理,因此需要对RoundUp函数稍作修改
static inline size_t AlignUp(size_t size)
{
if (size < 128) return _AlignUp(size, 8);
else if (size < 1024) return _AlignUp(size, 16);
else if (size < 8 * 1024) return _AlignUp(size, 128);
else if (size < 64 * 1024) return _AlignUp(size, 1024);
else if (size < 256 * 1024) return _AlignUp(size, 8 * 1024);
else return _AlignUp(size, 1 << PAGE_SHIFT);
}
之前的申请逻辑也需要进行修改,当申请内存的大小大于256KB时,就不向ThreadCache申请了,这时先计算出按页对齐后实际需申请的页数,然后通过调用NewSpan()申请指定页数的Span即可
static void* hcmalloc(size_t size)
{
if (size > MAX_BYTES)
{
size_t alignSize = DataHandleRules::AlignUp(size);
size_t kPage = alignSize >> PAGE_SHIFT;
PageCache::GetInstance()->_pageMutex.lock();
Span* span = PageCache::GetInstance()->NewSpan(kPage);
PageCache::GetInstance()->_pageMutex.unlock();
void* address = (void*)(span->_pageId << PAGE_SHIFT);
return address;
}
else
{
static ObjectPool<ThreadCache> threadCachePool;
if (pTLSThreadCache == nullptr) pTLSThreadCache = new TreadCache;
//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
}
即申请大于256KB的内存时,会直接调用PageCache当中的NewSpan()函数进行申请
因此需要再对NewSpan()函数进行改造,当需要申请的内存页数大于128页时,就直接向堆区申请对应页数的内存块。而如果申请的内存页数是小于128页的,那就在PageCache中进行申请。因此当申请大于256KB的内存调用NewSpan()函数时也是需要加锁的,因为可能是在PageCache中进行申请的
Span* PageCache::NewSpan(size_t k)//获取一个k页的span
{
assert(k > 0);
//大于128页的需求直接向堆区申请
if (k > NPAGES - 1) {
void* address = SystemAlloc(k);
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)address >> PAGE_SHIFT;
span->_num = k;
_idSpanMap[span->_pageId] = span;//便于归还内存时,通过地址找到对应的span
return span;
}
//检查第k个桶中是否有可用的span
if (!_spanLists[k].IsEmpty()) {
Span* kSpan = _spanLists[k].PopFront();
for (PAGE_ID i = 0; i < kSpan->_num; ++i) {
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查后续的桶中是否有可用的span,若有则进行切分
for (int i = k + 1; i < NPAGES; ++i) {
if (!_spanLists[i].IsEmpty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
kSpan->_pageId = nSpan->_pageId;
nSpan->_pageId += k;
kSpan->_num = k;
nSpan->_num -= k;
_spanLists[nSpan->_num].PushFront(nSpan);
//存储nSpan的首尾页号与nSpan建立映射关系,利于PageCache回收Central_Cache中span时进行合并页
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_num - 1] = nSpan;
//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_num; ++i) {
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//运行此处则说明PageCache中没有可用的span,需向堆中申请128页的span
Span* newSpan = _spanPool.New();
void* address = SystemAlloc(NPAGES - 1);
newSpan->_pageId = (PAGE_ID)address >> PAGE_SHIFT;
newSpan->_num = NPAGES - 1;
_spanLists[newSpan->_num].PushFront(newSpan);
return NewSpan(k);//通过递归将大span分为小span
}
释放过程
当释放对象时也需要判断释放对象的大小:
当释放内存块时,需先找到该内存块对应的Span。之前在申请大于256KB的内存时,已经给申请到的内存建立Span结构,并建立了起始页号与该Span之间的映射关系。此时就可以通过内存块的起始地址计算出起始页号,进而通过页号找到该对象对应的Span
static void hcfree(void* ptr,size_t size)
{
if (size > MAX_BYTES)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
PageCache::GetInstance()->_pageMutex.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMutex.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
PageCache在回收Span时也需要进行判断,若该Span的大小是小于等于128页的,那么直接还给PageCache即可,PageCache会尝试对其进行合并。而若该Span的大小是大于128页的,那么说明该Span是直接向堆区申请的,直接将这块内存释放给堆区,然后将这个Span结构进行delete
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//大于128页的内存直接归还给系统堆区
if (span->_num > NPAGES - 1)
{
void* address = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(address);
delete span;
return;
}
//......
}
向区堆申请内存时调用的系统接口是VirtualAlloc(),则将内存释放的接口为VirtualFree(),而Linux下的brk和mmap对应的释放系统接口为sbrk和unmmap。此时可以将这些释放接口封装成一个叫做SystemFree()的接口,当需要将内存释放给堆时直接调用SystemFree()即可
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// sbrk unmmap等
#endif
}
7.2 未完全脱离使用new
tcmalloc是要在高并发场景下替代malloc进行内存申请的,因此tcmalloc在实现的时,其内部是不可能调用malloc函数的,但当前的代码中存在通过new获取到的内存,而new在底层实际上就是封装了malloc函数
为了完全脱离掉malloc函数,之前实现的定长对象内存池就该发挥作用了,代码中使用new时基本都是为Span结构的对象申请空间,而Span对象基本都是在PageCache层创建的,因此可以在PageCache类当中定义一个_spanPool,用于Span对象的申请和释放
ObjectPool<Span> _spanPool;
然后将代码中使用new的地方替换为调用定长内存池当中的New函数,将代码中使用delete的地方替换为调用定长内存池当中的Delete函数
//申请span对象
Span* span = _spanPool.New();
//释放span对象
_spanPool.Delete(span);
每个线程第一次申请内存时也都会创建其专属的ThreadCache对象,而这个ThreadCache目前也是new出来的,也需要对其进行替换
static ObjectPool<ThreadCache> threadCachePool;
static std::mutex hcMtx;
if (pTLSThreadCache == nullptr) {
hcMtx.lock();
pTLSThreadCache = threadCachePool.New();
hcMtx.unlock();
}
将用于申请ThreadCache类对象的定长内存池定义为静态的,保持全局只有一个,让所有线程创建ThreadCache时,都在该定长内存池中申请内存即可
但在该定长内存池中申请内存时需要加锁,防止多个线程申请各自的ThreadCache对象时同时访问定长内存池而导致线程安全问题
7.3 释放对象时优化为不传对象大小
当使用malloc()函数申请内存时,需要指明申请内存的大小;当使用free()函数释放内存时,只需要传入指向这块内存的指针即可
而本博客目前讲述的内存池,在释放对象时除了需要传入指向该对象的指针,还需要传入该对象的大小
若也想在释放对象时不用传入对象的大小,那么就需要建立内存块地址与对象大小之间的关系。由于现在可以通过内存块的地址找到其对应的Span,而Span的*链表中挂的都是相同大小的小内存块,因此可以在Span结构中再增加一个_objSize成员,该成员代表着这个Span管理的小内存块的大小
//管理多个页的跨度结构
struct Span
{
Span* _prev = nullptr;//双向链表中的结构
Span* _next = nullptr;
PAGE_ID _pageId = 0;//页号
size_t _num = 0;//页的数量
void* _freeList = nullptr;//*链表
size_t _use_count = 0;//记录已分配给ThreadCache的小块内存的数量
bool _IsUse = false;//是否被使用
size_t _objSize = 0;//切好的小内存块的大小
};
而所有的Span都是从PageCache中获取的,因此每当调用NewSpan()获取到一个k页的Span时,就应该将这个Span的_objSize保存下来
Span* span = PageCache::GetInstance()->NewSpan(DataHandleRules::NumMovePage(size));
span->_objSize = size;
代码中有两处:
- 在CentralCache获取非空Span时,若CentralCache对应的桶中没有非空的Span,此时会调用NewSpan()获取一个k页的span
- 另一处是当申请大于256KB内存时,会直接调用NewSpan()获取一个k页的Span
当释放小内存块时,就可以直接从内存块对应的Span中获取到该内存块的大小,准确来说获取到的是对齐以后的大小
static void hcfree(void* ptr)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
size_t size = span->_objSize;
if (size > MAX_BYTES)
{
PageCache::GetInstance()->_pageMutex.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMutex.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
7.4 读取映射关系时的加锁问题
页号与Span之间的映射关系是存储在PageCache类中的unordered_map的,当访问这个映射关系时是需要加锁的,因为STL容器是不保证线程安全的
若是在CentralCache访问这个unordered_map,或是在调用hcfree()函数释放内存时访问这个容器,那么就存在线程安全的问题。因为可能存在某个线程在PageCache中进行某些操作导致unordered_map修改,而其他线程可能也正在访问这个映射关系,因此当在PageCache外部访问这个映射关系时是需要加锁的
实际就是在调用PageCache对外提供访问映射关系的函数时需要加锁,可以考虑使用C++中的unique_lock
//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
std::unique_lock<std::mutex> lock(_pageMutex); //构造时加锁,析构时自动解锁
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end()) {
return ret->second;
}
else {
assert(false);
return nullptr;
}
}
八、性能瓶颈及其优化
8.1 多线程场景下性能对比
在多线程场景下对比malloc进行测试:
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(16));
//v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread) {
t.join();
}
cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc"
<< ntimes << "次: 花费:" << malloc_costtime << "ms" << endl;
cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次free"
<< ntimes << "次: 花费:" << free_costtime << "ms" << endl;
cout << nworks << "个线程并发 malloc && free" << nworks * rounds * ntimes << "轮次,总计花费"
<< malloc_costtime + free_costtime << "ms" << endl;
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(hcmalloc(16));
//v.push_back(hcmalloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
hcfree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次hcmalloc"
<< ntimes << "次: 花费:" << malloc_costtime << "ms" << endl;
cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次hcfree"
<< ntimes << "次: 花费:" << free_costtime << "ms" << endl;
cout << nworks << "个线程并发 hcmalloc && hcfree" << nworks * rounds * ntimes << "轮次,总计花费"
<< malloc_costtime + free_costtime << "ms" << endl;
}
int main()
{
size_t n = 1000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}
测试函数中,通过clock(0函数分别获取到每轮次申请和释放所花费的时间,然后将其对应累加到malloc_costtime和free_costtime上。最后就得到了nworks个线程跑rounds轮,每轮申请和释放ntimes次,这个过程申请所消耗的时间、释放所消耗的时间、申请和释放总共消耗的时间
创建线程时让线程执行的是lambda表达式,以值传递的方式捕捉了变量k,以引用传递的方式捕捉了其他父作用域中的变量,因此可以将各个线程消耗的时间累加到一起
将所有线程申请内存消耗的时间都累加到malloc_costtime上, 将释放内存消耗的时间都累加到free_costtime上,此时malloc_costtime和free_costtime可能被多个线程同时进行累加操作的,所以存在线程安全的问题。于是在定义这两个变量时使用了atomic类模板,这时对这两个变量的操作就是原子的了
固定大小内存的申请和释放
v.push_back(hcmalloc(16));
v.push_back(malloc(16));
由于此时申请释放的都是固定大小的对象,每个线程申请释放时访问的都是各自ThreadCache中的同一个桶,当ThreadCache的这个桶中没有内存块或内存块太多要归还时,也都会访问Central Cache的同一个桶,此时CentralCache中的桶锁的策略并没有起到作用
不同大小内存的申请和释放
v.push_back(hcmalloc((16 + i) % 8192 + 1));
v.push_back(malloc((16 + i) % 8192 + 1));
由于申请和释放内存的大小是不同的,此时CentralCache当中的桶锁就起作用了,hcmalloc的效率也有了较大增长,但总体来说还是差一点点
8.2 性能瓶颈分析
该项目此时与malloc之间还是有差距的,下面用VS中性能分析的工具来进行分析:
不难发现,性能的瓶颈点主要是MapObjectToSpan()中的锁问题
tcmalloc中针对这一点使用了基数树进行优化,使得在读取这个映射关系时可以做到不加锁
8.3 基数树优化
基数树实际上就是一个分层的哈希表,根据所分层数不同可分为单层基数树、二层基数树、三层基数树等
单层基数树
单层基数树实际采用的就是直接定址法,每一个页号对应Span的地址就存储数组中在以该页号为下标的位置
最坏的情况下需要建立所有页号与其Span之间的映射关系,因此这个数组中元素个数应该与页号的数目相同,数组中每个位置存储的就是对应Span的指针
// Single-level array
template <int BITS>
class HCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
explicit HCMalloc_PageMap1() {
size_t size = sizeof(void*) << BITS;
size_t alignSize = DataHandleRules::_AlignUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
代码中的非类型模板参数BITS表示存储页号最多需要bit位的个数。在32位下传入的是32-PAGE_SHIFT,在64位下传入的是64-PAGE_SHIFT。而其中的LENGTH成员代表的就是页号的数目,即
如32位平台下,以一页大小为8K为例,此时页的数目就是 ,因此存储页号最多需要19个比特位,此时传入非类型模板参数的值就是32 − 13 = 19 。由于32位平台下指针的大小是4字节,因此该数组的大小就是 * 4 = =2M,内存消耗不大,是可行的。但若是在64位平台下,此时该数组的大小是 * 8 = = G,明显是不可行的,对于64位的平台需使用三层基数树
二层基数树
以32位平台下,一页的大小为8K为例来说明,此时存储页号最多需要19个bit位
而二层基数树实际上就是把这19个比特位分为两次进行映射。如用前5个bit位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的bit位在基数树的第二层进行映射,映射后最终得到该页号对应的Span指针
在二层基数树中,第一层的数组占用 * 4 = Byte空间,第二层的数组最多占用 * * 4 = = 2M。二层基数树相比一层基数树的好处就是,一层基数树必须一开始就把2M的数组开辟出来,而二层基数树一开始时只需将第一层的数组开辟出来,当需要进行某一页号映射时再开辟对应的第二层的数组即可
// Two-level radix tree
template <int BITS>
class HCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
explicit HCMalloc_PageMap2() {
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
assert(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
static ObjectPool<Leaf> leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
因此在二层基数树中有一个Ensure(0函数,当需要建立某一页号与其Span之间的映射关系时,需先调用该Ensure()函数确保用于映射该页号的空间是开辟了的,若没有开辟则会立即开辟。
而在32位平台下,就算将二层基数树第二层的数组全部开辟出来也就消耗了2M的空间,内存消耗也不算太多,因此可以在构造二层基数树时就把第二层的数组全部开辟出来
三层基数树
上述一层基数树和二层基数树都适用于32位平台,而对于64位的平台就需要用三层基数树
三层基数树与二层基数树类似,三层基数树实际上就是把存储页号的若干bit位分为三次进行映射
只有当要建立某一页号的映射关系时再开辟对应的数组空间,而没有建立映射的页号就可以不用开辟其对应的数组空间,可以在一定程度上节省内存空间,因此也存在Ensure()函数
//Three-level radix tree
template <int BITS>
class HCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit HCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() { }
};
8.4 代码实现
用基数树对代码进行优化,此时将PageCache类当中的unorder_map用基数树进行替换即可,由于当前是32位平台,因此这里用几层基数树都可以(本博客以两层基数树为例)
//单例模式
class PageCache
{
public:
//...
private:
//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
HCMalloc_PageMap2<32 - PAGE_SHIFT> _idSpanMap;
};
需要建立页号与span的映射时,调用基数树中的set函数
_idSpanMap.set(span->_pageId, span);
需要读取某一页号对应的Span时,调用基数树中的get函数
Span* ret = (Span*)_idSpanMap.get(id);
用于读取映射关系的MapObjectToSpan函数内部就不需要加锁了
//获取从小内存块到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
auto ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
}
为什么读取基数树映射关系时不需要加锁?
当某个线程在读取映射关系时,可能另外一个线程正在建立其他页号的映射关系,而此时无论我们用的是C++当中的map还是unordered_map,在读取映射关系时都是需要加锁的
因为C++中map的底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,而无论是红黑树还是哈希表,当在插入数据时其底层的结构都有可能会发生变化。如红黑树在插入数据时可能会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此在读取映射关系的时候是需要加锁的
而对于基数树来说就不一样了,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且不会同时对同一个页进行读取映射和建立映射的操作,因为只有在释放对象时才需要读取映射,而建立映射的操作都是在PageCache中进行的。即读取映射时读取的都是对应Span的_useCount不等于0的页,而建立映射时建立的都是对应Span的_useCount等于0的页,所以不会同时对同一个页进行读取映射和建立映射的操作
优化后对比
固定大小内存的申请和释放
申请释放不同大小的内存