万字详解C++内存池:提高内存分配效率的利器

发布时间 2023-11-02 09:46:17作者: 冰山奇迹

内存池(Memory Pool)是一种内存分配方式。通常我们习惯直接使用new、malloc等API申请分配内存,这样做的缺点在于:由于所申请内存块的大小不定,当频繁使用时会造成大量的内存碎片并进而降低性能。

内存池则是在真正使用内存之前,先申请分配一定数量的、大小相等(一般情况下)的内存块留作备用。当有新的内存需求时,就从内存池中分出一部分内存块,若内存块不够再继续申请新的内存。这样做的一个显著优点是尽量避免了内存碎片,使得内存分配效率得到提升。

一、什么是内存池

1.1池化技术

池化技术是计算机视觉和深度学习领域中常用的一种特征降维方法。它的作用是在保留图像或特征图中最显著信息的同时减少数据量,从而提高计算效率并防止过拟合。

其内涵在于:将程序中需要经常使用的核心资源先申请出来,放到一个池内,有程序自管理,这样可以提高资源的利用率,也可以保证本程序占有的资源数量,经常使用的池化技术包括内存池,线程池,和连接池等,其中尤以内存池和线程池使用最多。池化操作通常应用于卷积神经网络(CNN)中,对输入的特征图进行处理。

常见的池化方式有最大池化(Max Pooling)和平均池化(Average Pooling)两种:

最大池化:在每个区域内选择最大值作为输出值。这样可以保留局部区域内的最显著特征,丢弃次要信息。它具有平移不变性,在一定程度上提高了模型对位置变化的鲁棒性。

平均池化:在每个区域内计算平均值作为输出值。这种方式可以对整个区域进行平均,从而得到更加全局的统计信息,适用于一些需要考虑整体趋势而不仅仅关注局部细节的任务。

通过不断地堆叠卷积层和池化层,可以有效地减小输入数据尺寸、降低复杂度,并增强网络对平移、缩放等形变操作的鲁棒性。池化技术在卷积神经网络中被广泛应用,有助于提取更具有抽象特征的表示,进而提高模型的准确性和泛化能力。

1.2内存池

内存池是一种用于管理和分配内存的技术,它通过预先申请一块固定大小的内存空间,并将其划分为多个小块,提供给程序按需分配和释放。下面我会详细解释内存池的工作原理和优势。

工作原理:

    • 初始化阶段:在程序启动时,内存池会申请一块连续的大块内存空间。

    • 分配阶段:当程序需要申请内存时,内存池会从已经分配好的空闲块中找到合适大小的小块来分配给程序。

    • 释放阶段:当程序不再需要某个内存块时,可以将其返回给内存池进行复用。

    • 扩展阶段:如果当前内存池中没有足够的可用空间了,可以根据需要扩展内存池的大小。

优势:

    • 减少碎片化:由于内存池事先划分好了固定大小的小块,避免了频繁的系统调用和堆碎片产生。

    • 提高性能:由于减少了对系统调用和堆管理操作的依赖,提高了程序运行效率。

    • 控制资源消耗:通过限制总共使用的内存量,避免了程序过度申请内存的问题,更好地管理系统资源。

需要注意的是,内存池并不适用于所有场景。在某些情况下,例如对于大块连续内存的分配需求或者动态变化的内存需求,使用内存池可能不太合适。

1.3为什么需要内存池

内存池是为了提高程序运行效率和减少内存碎片而设计的一种技术。以下是使用内存池的主要原因:

  1. 减少内存分配和释放的开销:传统的动态内存分配(如malloc、free)会涉及到操作系统的系统调用,这样会产生较大的开销。而使用内存池可以预先分配一块连续的内存空间,并在需要时直接从中取出或回收,避免频繁进行系统调用,从而提高了程序性能。

  2. 减少内存碎片:频繁地进行动态内存分配和释放容易导致堆中产生碎片化的空间。而使用内存池可以固定大小的块来管理内存,避免了不同大小的对象交错排布在堆上导致的碎片问题。

  3. 提高缓存命中率:将对象预先分配到连续的块中,可以利用局部性原理提高缓存命中率。相邻对象在物理上也相邻,利于CPU缓存预取等优化。

1)内存碎片问题

内存碎片是指已分配的内存空间中存在的一些零散、不连续的小块未被使用的内存。这会导致内存利用率下降,可能影响程序的性能和稳定性。造成堆利用率很低的一个主要原因就是内存碎片化。如果有未使用的存储器,但是这块存储器不能用来满足分配的请求,这时候就会产生内存碎片化问题。

内存碎片主要分为两种类型:外部碎片和内部碎片。

  1. 外部碎片:即由于已分配内存块的释放而造成的未被充分利用的空闲内存块。这些空闲块虽然总大小足够,但由于彼此之间存在占用的块,无法组合成足够大的连续空闲区域。

  2. 内部碎片:即已分配给进程或应用程序的内存中,实际使用但没有被完全填满的部分。例如,某个进程申请了100个字节的内存,但只实际使用了80个字节,那么剩下的20个字节就构成了一个内部碎片。

解决内存碎片问题可以采取以下几种方法:

  1. 碎片整理(Defragmentation):通过将已分配和未分配的内存重新组织排列来减少外部碎片。这通常需要暂停程序运行并进行较大规模数据搬移操作。

  2. 分区策略优化:根据程序的内存使用特点,合理划分内存分区,减少外部碎片产生的可能性。例如采用动态分区或伙伴系统等算法。

  3. 内存池管理:提前申请一块较大的内存空间,并自行管理内部碎片和对象的分配与释放,减少频繁的内存申请和释放操作。

  4. 垃圾回收机制:对于某些编程语言如Java、Python等,利用垃圾回收器自动清理不再使用的对象,可以有效减少内部碎片。

申请效率问题

例如:我们上学家里给生活费一样,假设一学期的生活费是6000块。

方式1:开学时6000块直接给你,自己保管,自己分配如何花。

方式2:每次要花钱时,联系父母,父母转钱。

同样是6000块钱,第一种方式的效率肯定更高,因为第二种方式跟父母的沟通交互成本太高了。

同样的道理,程序就像是上学的我们,操作系统就像父母,频繁申请内存的场景下,每次需要内存,都像系统申请效率必然有影响。

1.4内存池的演变

  • 最简单的内存分配器
    做一个链表指向空闲内存,分配就是取出一块来,改写链表,返回,释放就是放回到链表里面,并做好归并。注意做好标记和保护,避免二次释放,还可以花点力气在如何查找最适合大小的内存快的搜索上,减少内存碎片,有空你了还可以把链表换成伙伴算法。
    优点: 实现简单
    缺点: 分配时搜索合适的内存块效率低,释放回归内存后归并消耗大,实际中不实用。

  • 定长内存分配器
    即实现一个 FreeList,每个 FreeList 用于分配固定大小的内存块,比如用于分配 32字节对象的固定内存分配器,之类的。每个固定内存分配器里面有两个链表,OpenList 用于存储未分配的空闲对象,CloseList用于存储已分配的内存对象,那么所谓的分配就是从 OpenList 中取出一个对象放到 CloseList 里并且返回给用户,释放又是从 CloseList 移回到 OpenList。分配时如果不够,那么就需要增长 OpenList:申请一个大一点的内存块,切割成比如 64 个相同大小的对象添加到 OpenList中。这个固定内存分配器回收的时候,统一把先前向系统申请的内存块全部还给系统。
    优点: 简单粗暴,分配和释放的效率高,解决实际中特定场景下的问题有效。
    缺点: 功能单一,只能解决定长的内存需求,另外占着内存没有释放。

1.5哈希映射的FreeList池

在定长分配器的基础上,按照不同对象大小(8,16,32,64,128,256,512,1k…64K),构造十多个固定内存分配器,分配内存时根据要申请内存大小进行对齐然后查H表,决定到底由哪个分配器负责,分配后要在内存头部的 header 处写上 cookie,表示由该块内存哪一个分配器分配的,这样释放时候你才能正确归还。如果大于64K,则直接用系统的 malloc作为分配,如此以浪费内存为代价你得到了一个分配时间近似O(1)的内存分配器。这种内存池的缺点是假设某个 FreeList 如果高峰期占用了大量内存即使后面不用,也无法支援到其他内存不够的 FreeList,达不到分配均衡的效果。

优点:这个本质是定长内存池的改进,分配和释放的效率高。可以解决一定长度内的问题。

缺点:存在内碎片的问题,且将一块大内存切小以后,申请大内存无法使用。多线程并发场景下,锁竞争激烈,效率降低。

范例:sgi stl 六大组件中的空间配置器就是这种设计实现的。

二、内存池设计

在设计一个内存池时,首先要对内存池的存储数据部分的构建做一个大概的规划。

因为是动态申请内存,没有办法预计将来正在运行的程序究竟会需要多大的内存空间,因此在内存池的设计上要预留空间,未来防止盲目的使用过大空间,采用的方法就是用多个内存块组成一个内存池,第一次分配的时时,先申请一个内存块,当程序不够用的时候,再向系统申请一块内存,然后将新内存与前面已获得的内存块组织成链表,以后一旦发现内存不够,那么就如此继续申请后续内存块,从而组成一个多内存块的内存池。内存池中内存块的长度可以相等,也可以不相等。

因为 STL容器中的元素大小都是相等的,故在此将内存块的存储区域划分为一个一个大小为 nunitsize 的存储单位。

程序一旦由系统获得内存池所需内存之后,在程序把内存池归还系统之前,系统绝不会再对这些内存进行任何管理工作,要么由程序委派给其它部件,在 STL中,由于空间配置器的存在,那么内存池的管理工作就是由它来执行。

首先要管理的就是内存块的信息表:

struct memoryblock
{
  int nsize; // 该内存的大小
  int nfree; //该内存还有多少可分配的单位
  int nfirst; // 当前可分配的第一个单位序号
  memoryblock *pnext; // 指向下一个内存块
  char adata[1] //用于标记分配单元开始的位置
}

本例将内存块的信息表 Memoryblock 安排在前面,即在一个内存块中,前面的部分为内存块信息表,紧接着表的便是内存块的数据区域,未来能够获得数据存储去的首地址,在信息表 memoryblock 的最后位置安排了 域 adata[1],这里 adata 就是数据存储去的首地址。

对于内存块中空闲存储单位的管理就是内存池的主要管理工作。本例在对这些空间存储区域进行管理时,采用了一个小技巧,即把它们组织成链表,而链表中那些用于指向下一节点的 “指针” 就保存在各个空闲单位的前两个字节,从而免去了为这些“ 指针” 按照存储空间消耗。 至于空闲存储单位链表的头指针则保存于 表 memoryblock 的nfirst 域。需要注意的是,这里所说的空闲存储单位链表各个节点的指针实在内存块初始化时为这些节点安排的序号而不是真正的地址,当然这个序号可以看成是内存块中的内部地址。

链表头memorypool的信息如下:

struct memorypool
{
int ninitsize;///首块长度
int ngrowsize;///后续块长度
int nunitsize;/// 定义存储单位大小
memoryblock *pblock; //指向内存块链表的指针
}

从 adata 开始的区域为内存块用于存储数据元素的空间,这里将它称为数据空间。该数据空间以数据的大小 nunitsize 划分为存储单位,每个存储单位可以存储一个数据。为了对尚未被分配使用的空闲单位进行识别,需要对他们编织序号。在内存块初始化时要编制序号,当内存块被被释放回内存池时也要为他们编制序号,总之内存池管理系统是按照序号来识别和管理空闲存储单位的,至于那些被应用程序占用的单位,则其序号自然消失和失效,直至被程序释放回内存时再由回收函数重新为其编号。

显然对于 memoryblock 构造函数来讲,除了对 memoryblock 的各个域进行初始化之外,为空闲单位编制序号,即进行空闲单位存储链表的构建就是它的任务。

综上, memoryblock 的构造函数如下:

memoryblock(int nunitsize,int nunitamount):nsize(nunitsize * nunitamount),nfree(nunitamount-1),nfirst(1),pnext(NULL)
// nunitsize 存储单位的大小 即 sizeof(T) nunitamount 内存块规模
// nfree 空闲存储单位为 nunitamount-1 nfirst 初值为1 pnext指向下一个内存块的指针为NULL
{
  //为空闲区编号
  char *ppdata=adata;
  cout<<"存储区首指针 ppdate ="<<(int*)ppdata<<endl;
  for(int i=1;i<nunitamount;i++)
  {
    //当前存储单位存储下一个可分配单位的序号
    (*(unsigned short *)ppdata)=i;
    cout<<"下一个可分配单位序号: "<<(int)*ppdata<<endl;
    //移动指针使之指向下一个单位
    ppdata+=nunitsize;
  }
  cout<<"-------------调用内存块的构造函数-------------------"<<endl;
}

因为构造函数的调用发生在用户第一次向内存池请求数据存储单位时,故 存储区域中的第一个存储单位可视为已被分配而不是空闲单位,故需要真正编制序号的是 nunitamount-1个,并且那个用于存储空闲单位链表头部序号的nfirst 也被初始化为1

按照面向对象的方法,用于向系统申请内存的函数 operator new() 也因该重载于类 memoryblock 中,因为c++ 提供的全局 operator new 不再适合本例。

void *operator new(size_t,int nunitsize,int nunitamount)
{
  cout<<"分配内存并创建memoryblock 对象"<<endl;
  return ::operator new(sizeof(memoryblock)+nunitsize*nunitamount);// 内存空间的长度为 memoryblock 对象加上 存储空间长度总和
}

其析构函数为:

void operator delete(void *pblock)  //重载 delete 作为内存块的析构函数
{
  ::operator delete(pblock);
  cout<<"---------------调用内存块的析构函数----------------"<<endl;
}

memorypool 的设计 相比于 memoryblock 的设计就相对简单一点 主要是对其域初始化其代码如下:

memorypool(int _ngrowsize=10,int _ninitsize=3)
{
cout<<"-----------------调用内存池的构造函数---------------"<<endl;
ninitsize=_ninitsize; // 首块长度
ngrowsize=_ngrowsize; //后续块长度
pblock=NULL; // 链表指针置为 null
nunitsize=sizeof(T); // 定义存储单位大小
if(sizeof(T)>4)///调整存储单位的大小
nunitsize=(sizeof(T)+(MEMPOOL_ALIGNMENT-1)) & ~(MEMPOOL_ALIGNMENT-1);// 返回值为8的倍数 所谓的内存对齐
else if(sizeof(T)<2)
nunitsize=2;
else
nunitsize=4;
}

memorypool 的析构函数如下:

~memorypool()
{
memoryblock<T>*pmyblock=pblock;
while(pmyblock!=NULL)
{
pmyblock=pmyblock->pnext;
delete(pmyblock);
}
cout<<"--------------------调用内存池的析构函数-------------------"<<endl;
}

在 memorypool中 主要操作就是用于向内存池请求存储单位的函数 allocate() ,这也是内存池向用户提供的主要服务。从程序的执行任务看,该函数的工作就是遍历内存块,找到 nfree 大于0 ,即有空闲单位的内存块,并从其上分配存储单位。然后将内存块 memoryblock 表中域 nfree 减一,并修改空闲单位链表头指针 nfirst 的值。

其代码如下:

void *allocate(size_t num)
{
for(int i=0;i<num;++i)
{
if(NULL==pblock)
{
///创建首内存块
pblock=(memoryblock<T>*)new (nunitsize,ninitsize)
memoryblock<T>(nunitsize,ninitsize);
return (void*)pblock->adata;///返回内存块中数据元素存储区指针
}
///为内存寻找符合条件的内存块
memoryblock<T>*pmyblock=pblock;
while(pmyblock!=NULL&&0==pmyblock->nfree)
pmyblock=pmyblock->pnext;
if(pmyblock!=NULL)
{
cout<<"找到内存空间 first= "<<pmyblock->nfirst<<endl;
///找到后进行内存分配
char *pfree=pmyblock->adata+pmyblock->nfirst*nunitsize;
pmyblock->nfirst=*((unsigned short*)pfree);
pmyblock->nfree--;
///返回找到的存储单位指针
return (void*)pfree;
}
else
{
///没有找到 说明当前内存块已用完
if(0==ngrowsize)return NULL;
cout<<" 否则分配新内存块"<<endl;
//分配一个后续内存块
pmyblock=(memoryblock<T>*)new(nunitsize,ngrowsize)
memoryblock<T>(nunitsize,ngrowsize);
if(NULL==pmyblock)
return NULL;///失败
pmyblock->pnext=pblock;
pblock=pmyblock;
//返回新内存的的存储区指针
return (void*)pmyblock->adata;
}
}
}

memorypool 的另一个重要函数就是用于释放内存的 free() ,该函数根据 pfree 的值,找到他所在的内存块,然后将他的序号 nfirst 的值(因为他绝对空闲) ,在pfree 的头两个字节写入原来nfirst的值,然后判断 该 块的是否全为 free 方法是检测 nfree*nunitsize==nsize。

若是,则向系统释放内存,若不是,则将该 block放到链表的头部,因为该block上含有空闲的内存单元,这样可以减少分配时遍历链表所消耗的时间。

void free(void *pfree)
{
//找到p所在的块
cout<<"释放存储单位内存空间"<<endl;
memoryblock<T>*pmyblock=pblock;
memoryblock<T>*preblock=NULL;
while(pmyblock!=NULL&&(pblock->adata>pfree||pmyblock->adata+pmyblock->nsize))
{
preblock=pmyblock;
pmyblock=pmyblock->pnext;
}
//该内存块在被内存池pmyblock 所指向的内存块中
if(NULL!=pmyblock)
{
//1 修改数组链表
*((unsigned short*)pfree)=pmyblock->nfirst;
pmyblock->nfirst=(unsigned short)((unsigned long)pfree-(unsigned long)pmyblock->adata)/nunitsize;
pmyblock->nfree++;
// 2 判断是否需要向系统释放内存
if(pmyblock->nsize==pmyblock->nfree*nunitsize)
{
//在链表中删除
delete(pmyblock);
}
else
{
//将该block 插入队首
preblock=pmyblock->pnext;
pmyblock->pnext=pblock;
pblock=pmyblock;
}
}
}

将以上代码组装,并自己设计累 user 来测试这个内存池:

#include<iostream>
#define MEMPOOL_ALIGNMENT 8 // 对齐长度
using namespace std;
///内存块及其信息表 memory block-----------------------
template<typename T>
struct memoryblock
{
int nsize;//该内存块的大小
int nfree;// 该内存还有多少可分配单位
int nfirst;//当前可分配的第一个单位序号
memoryblock *pnext;// 指向下一个内存块
char adata[1];// 用于标记分配单元开始的位置
memoryblock(int nunitsize,int nunitamount):nsize(nunitsize * nunitamount),nfree(nunitamount-1),nfirst(1),pnext(NULL)
// nunitsize 存储单位的大小 即 sizeof(T) nunitamount 内存块规模
// nfree 空闲存储单位为 nunitamount-1 nfirst 初值为1 pnext指向下一个内存块的指针为NULL
{
//为空闲区编号
char *ppdata=adata;
cout<<"存储区首指针 ppdate ="<<(int*)ppdata<<endl;
for(int i=1;i<nunitamount;i++)
{
//当前存储单位存储下一个可分配单位的序号
(*(unsigned short *)ppdata)=i;
cout<<"下一个可分配单位序号: "<<(int)*ppdata<<endl;
//移动指针使之指向下一个单位
ppdata+=nunitsize;
}
cout<<"-------------调用内存块的构造函数-------------------"<<endl;
}
void *operator new(size_t,int nunitsize,int nunitamount)
{
cout<<"分配内存并创建memoryblock 对象"<<endl;
return ::operator new(sizeof(memoryblock)+nunitsize*nunitamount);
}
void operator delete(void *pblock)
{
::operator delete(pblock);
cout<<"---------------调用内存块的析构函数----------------"<<endl;
}
};
///内存池信息表
template<typename T>
struct memorypool
{
int ninitsize;///首块长度
int ngrowsize;///后续块长度
int nunitsize;/// 定义存储单位大小
memoryblock<T>*pblock;
memorypool(int _ngrowsize=10,int _ninitsize=3)
{
cout<<"-----------------调用内存池的构造函数---------------"<<endl;
ninitsize=_ninitsize;
ngrowsize=_ngrowsize;
pblock=NULL;
nunitsize=sizeof(T);
if(sizeof(T)>4)///调整存储单位的大小
nunitsize=(sizeof(T)+(MEMPOOL_ALIGNMENT-1)) & ~(MEMPOOL_ALIGNMENT-1);// 返回值为8的倍数 所谓的内存对齐
else if(sizeof(T)<2)
nunitsize=2;
else
nunitsize=4;
}
~memorypool()
{
memoryblock<T>*pmyblock=pblock;
while(pmyblock!=NULL)
{
pmyblock=pmyblock->pnext;
delete(pmyblock);
}
cout<<"--------------------调用内存池的析构函数-------------------"<<endl;
}
///用于向内存池请求存储单位的函数,也是内存池向用户提供的主要服务
///遍历内存块链表,找到nfree 大于0 即有空闲的内存块 并从其上分配存储单位
///然后将memoryblock 中nfree 减一 并修改空闲存储单位链表头指针 nfirst 的值
void *allocate(size_t num)
{
for(int i=0;i<num;++i)
{
if(NULL==pblock)
{
///创建首内存块
pblock=(memoryblock<T>*)new (nunitsize,ninitsize)
memoryblock<T>(nunitsize,ninitsize);
return (void*)pblock->adata;///返回内存块中数据元素存储区指针
}
///为内存寻找符合条件的内存块
memoryblock<T>*pmyblock=pblock;
while(pmyblock!=NULL&&0==pmyblock->nfree)
pmyblock=pmyblock->pnext;
if(pmyblock!=NULL)
{
cout<<"找到内存空间 first= "<<pmyblock->nfirst<<endl;
///找到后进行内存分配
char *pfree=pmyblock->adata+pmyblock->nfirst*nunitsize;
pmyblock->nfirst=*((unsigned short*)pfree);
pmyblock->nfree--;
///返回找到的存储单位指针
return (void*)pfree;
}
else
{
///没有找到 说明当前内存块已用完
if(0==ngrowsize)return NULL;
cout<<" 否则分配新内存块"<<endl;
//分配一个后续内存块
pmyblock=(memoryblock<T>*)new(nunitsize,ngrowsize)
memoryblock<T>(nunitsize,ngrowsize);
if(NULL==pmyblock)
return NULL;///失败
pmyblock->pnext=pblock;
pblock=pmyblock;
//返回新内存的的存储区指针
return (void*)pmyblock->adata;
}
}
}
//free 函数用于释放内存块 该函数根据pfree 的值找到他所在的内存块 然后将它的序号作为nfirst的值(因为他绝对空闲)
//在pfree 的头两个字节内写入原来nfirst 的值。然后要判断 该block 是否全部为free 方法是检测nfree*nunitsize==size
// 若是 则向系统释放内存 若不是 则将该block 放到链表的头部 因为该block上一定含有内存单元 这样可以减少分配是遍历
//链表所消耗的时间
void free(void *pfree)
{
//找到p所在的块
cout<<"释放存储单位内存空间"<<endl;
memoryblock<T>*pmyblock=pblock;
memoryblock<T>*preblock=NULL;
while(pmyblock!=NULL&&(pblock->adata>pfree||pmyblock->adata+pmyblock->nsize))
{
preblock=pmyblock;
pmyblock=pmyblock->pnext;
}
//该内存块在被内存池pmyblock 所指向的内存块中
if(NULL!=pmyblock)
{
//1 修改数组链表
*((unsigned short*)pfree)=pmyblock->nfirst;
pmyblock->nfirst=(unsigned short)((unsigned long)pfree-(unsigned long)pmyblock->adata)/nunitsize;
pmyblock->nfree++;
// 2 判断是否需要向系统释放内存
if(pmyblock->nsize==pmyblock->nfree*nunitsize)
{
//在链表中删除
delete(pmyblock);
}
else
{
//将该block 插入队首
preblock=pmyblock->pnext;
pmyblock->pnext=pblock;
pblock=pmyblock;
}
}
}
};
class user
{
int s;
double s1;
double s3;
public:
user(int x):s(x)
{
cout<<"-------------------调用 user 的析构函数----------------"<<endl;
}
int get()
{
return s;
}
~user()
{
cout<<"------------------调用user 的析构函数------------------"<<endl;
}
};

int main()
{
memorypool<user>m_pool;
user*dp1=(user*)m_pool.allocate(1);
cout<<"dp1= "<<dp1<<endl;
new(dp1)user(1111);
cout<<" 对象中的数据值为 "<<dp1->get()<<endl;
user*dp2=(user*)m_pool.allocate(1);
cout<<"dp2= "<<dp2<<endl;
new(dp2)user(2222);
cout<<"对象中的数据值为 "<<dp2->get()<<endl;
user*dp3=(user*)m_pool.allocate(1);
cout<<"dp3= "<<dp3<<endl;
new(dp3)user(3333);
cout<<"对象中的数据值为 "<<dp3->get()<<endl;
user*dp4=(user*)m_pool.allocate(1);
cout<<"dp4= "<<dp4<<endl;
new(dp4)user(4444);
cout<<"对象中的数据值为 "<<dp4->get()<<endl;
user*dp5=(user*)m_pool.allocate(1);
cout<<"dp5= "<<dp5<<endl;
new(dp5)user(5555);
cout<<"对象中的数据值为 "<<dp5->get()<<endl;
user*dp6=(user*)m_pool.allocate(1);
cout<<"dp6= "<<dp6<<endl;
new(dp6)user(6666);
cout<<" 对象中的数据值为 "<<dp6->get()<<endl;

user*dp7=(user*)m_pool.allocate(1);
cout<<"dp7= "<<dp7<<endl;
new(dp7)user(7777);
cout<<" 对象中的数据值为 "<<dp7->get()<<endl;

user*dp8=(user*)m_pool.allocate(1);
cout<<"dp8= "<<dp8<<endl;
new(dp8)user(8888);
cout<<" 对象中的数据值为 "<<dp8->get()<<endl;

user*dp9=(user*)m_pool.allocate(1);
cout<<"dp9= "<<dp9<<endl;
new(dp9)user(9999);
cout<<" 对象中的数据值为 "<<dp9->get()<<endl;

user*dp10=(user*)m_pool.allocate(1);
cout<<"dp10= "<<dp10<<endl;
new(dp10)user(11111);
cout<<" 对象中的数据值为 "<<dp10->get()<<endl;

user*dp11=(user*)m_pool.allocate(1);
cout<<"dp11= "<<dp11<<endl;
new(dp11)user(22222);
cout<<" 对象中的数据值为 "<<dp11->get()<<endl;

user*dp12=(user*)m_pool.allocate(1);
cout<<"dp12= "<<dp12<<endl;
new(dp12)user(33333);
cout<<" 对象中的数据值为 "<<dp12->get()<<endl;

user*dp13=(user*)m_pool.allocate(1);
cout<<"dp13= "<<dp13<<endl;
new(dp13)user(44444);
cout<<" 对象中的数据值为 "<<dp13->get()<<endl;

user*dp14=(user*)m_pool.allocate(1);
cout<<"dp14= "<<dp14<<endl;
new(dp14)user(55555);
cout<<" 对象中的数据值为 "<<dp14->get()<<endl;

user*dp15=(user*)m_pool.allocate(1);
cout<<"dp15= "<<dp15<<endl;
new(dp15)user(66666);
cout<<" 对象中的数据值为 "<<dp15->get()<<endl;
return 0;
}

案例题目:频繁的内存操作new,delete是比较耗时的操作,为了减少这些操作,一般都会设计自己的内存分配器。一个程序需要频繁使用大小在512Byte到200KByte不定长的内存,请您设计一个高效的内存分配器?

考虑设计的重要关注点和大致思路,空间使用率等。

1)结构:

  • 首先假设内存分配器的最小内存分配单元为mem_unit,需要确定最小分配单元的大小。如果设置太小,将使得内存单元过于琐碎,过大则造成空间浪费。基于这个考虑,设置多个大小类别的mem_unit。申请内存单元时,将分配能够满足该大小的最小内存单元。

  • 由于同一类别的mem_unit是随机申请的,空间不连续,所以采用单向链表结构管理同一类别的mem_unit。

  • 为了能够快速定位某一个类别的men_unit_list, 通过对men_unit的size进行hash建立索引。

基于以上三点考虑,建立的内存分配器结构如下图:

2)流程

  • 申请内存:根据申请内存的大小选择合适的mem_unit, 若相应的mem_unit_list为NULL,则向操作系统申请内存空间,然后构建mem_unit进行分配;若mem_unit_list不为NULL,则从中取出mem_unit进行分配。

  • 释放内存:将men_unit添加到相应的mem_list中。

3)总结

  • 申请内存时间复杂度为O(1),释放内存时间复杂度为O(1);

  • 存在空间浪费,单个mem_unit最大浪费4KB;

  • 只申请内存不释放,因此对于内存波动大的应用空间浪费会比较严重;

  • 多线程应用时,每个mem_unit_list必须是线程安全的。

三、并发内存池

该项目主要模拟实现tcmalloc最核心的框架。高并发内存池主要解决的是内存碎片问题,而内存碎片又分为内碎片和外碎片。在32位平台下以外碎片为例,如下图:

假设已申请3G的内存,现在想再申请950M的内存,虽然合计剩余内存大于950M,但是由于空间不连续,因此无法申请到950M的内存空间,这就是由于外碎片问题导致申请内存失败。

内存池是程序在向操作系统申请内存时,申请过量内存,由进程自己将这些过量内存组织起来,待到需要使用内存的时候,直接分配,而不需要频繁地向操作系统申请内存,频繁向操作系统申请资源也是一种性能消耗。当上层释放内存时,将内存重新在内存池中组织起来,等待下次使用,而不是直接让操作系统回收。

3.1整体结构

首先要讨论一下如何管理内存?

上层将要申请的内存大小无法预测,但又不能直接将整块申请来的过量内存交给上层,因此需要对大块内存进行切分。这些内存块无疑是要被容器管理起来的,并且要保证当我们要申请使用的时候,能快速找到适合的内存块大小,哈希表就十分契合这些需求,将切分后的内存块大小与其对应内存建立 大小-内存指针 的kv映射。

高并发内存池主要由三个模块构成:

注意:以下反复虽然提到哈希的key->value的映射关系,但实际实现的时候,大部分不需要真的借助unordered_map,而是通过其他规律就能达到和key->value映射的效果,为了方便,以下直接使用key->value映射表述

ThreadCache: 线程缓存。直接与上层交互,为上层提供内存并将释放的内存以类似哈希桶的思想组织起来==(kb -> 内存块地址)==,等待下次申请。线程缓是每个线程独有的,因此线程缓存是无锁的。多线程同时使用线程缓存申请内存时互不影响,这也是并发内存池效率高的一个重要原因。

CentralCache: 中心缓存。中心缓存和线程缓存的整体结构是相似的,key值也是kb。只不过内存划分单位不同,中心缓存中的内存是以页为单位构建Span管理页内存的。中心缓存是所有线程所共享的,当线程缓存中没有适合分配给用户的内存块时,就会向中心缓存申请对应大小(mem_size)的内存块。中心缓存从 mem_size 所对应的Span链中提取Span,并将该Span页切割成多个大小为 mem_size 的内存块,然后返回部分内存块给线程缓存。因此中心缓存的kv映射是 kb -> Span。

由于中心缓存是所有线程共享,因此需要加锁。但中心缓存的锁是桶锁,意味着只要不是要申请的内存块大小相同,一般不会有竞争锁的问题,这使得CentralCache虽然有锁,但是效率相比无锁不会下降很多。

PageCache: 页缓存。页缓存中的内存划分也是以页为单位。当中心缓存中对应内存大小上的Span链中没有可用内存了,就要来向

页缓存申请页内存。它和中心缓存的最大区别在于它不切割页,而是直接就按页分配向上交付,也因此它的kv映射是 page -> 页内存。通过算法由申请的内存大小,计算出分配多少页给CentralCache较为合适,然后到页大小key对应的桶链上拿页内存。

如果该桶链上没有页内存,就从当前桶开始,不断往下继续搜寻(类比线性探测法),因为比当前桶key值(页大小)大的桶,如果有页内存,则该内存页大小一定比我们需要的页大小大。

把向下搜寻到的该页内存切成两部分:上层所需页内存大小 + 切割后剩余页内存大小。然后把上层所需页内存大小返回给上层,将剩余页内存大小按照key值插入到对应桶中。

或者往下探测直到最后一个桶:128页大小,都没有找到有一个桶中有页内存,则需要直接进行系统调用,向系统堆申请页内存,然后切割并分配。

规定:

  • ThreadCache:最多一次性只能分配256kb的内存,即最大划分内存块为256kb。若单次申请内存大于256kb,则直接向PageCache申请页

  • CentrolCache:显然,因为结构与ThreadCache基本相同,所以CentralCache也是最多一次性只能分配256kb的内存,意味着Span页切割小块内存最大把每块小块内存切成256kb。

  • PageCache:最多一次性分配128页内存。若单次申请内存大于128页,则需要直接使用系统调用向系统堆申请内存。

3.2ThreadCache模块

如何切分内存?

如果我们以kb为内存划分的最小单位,能否将内存按1kb、2kb、3kb…直到256K这样切分?

这样管理起来,整个数据结构太过庞大,不可取。因此,我们需要设定对齐数,在可接受的范围内,既保证用户申请的内存大小和我们划分的内存块大小尽量接近,又要保证不使整个结构过于庞大。

对齐数设置

参考tcmalloc,我们设置对齐数如下:

整体控制在最多10%左右的内碎片浪费

内存大小 对齐数 桶下标

[1,128] 8byte对齐 freelist[0,16)
[128+1,1024] 16byte对齐 freelist[16,72)
[1024+1,8*1024] 128byte对齐   freelist[72,128)
[8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
[64*1024+1,256*1024] 8*1024byte对齐   freelist[184,208)

以对齐数8为例:

当用户申请内存大小:1、2、3、4、5、6、7、8字节时,满足[1,128]的区间,按8字节对齐,分配 8 * 1的字节,即都是分配8字节

当用户申请内存大小:9、10、11、12、13、14、15字节时,满足[1,128]的区间,按8字节对齐,分配 8 * 2的字节,即都是分配16字节

依此类推

以8字节对齐的内存,假设用户申请1B,分配8B,则浪费7B内存。像这样,因分配的内存大于实际申请的内存,导致部分内存被浪费,

我们称被浪费的这部分内存为内碎片。第一个区间[1,128]内,由于用户申请的内存比较小,容易产生较大的内碎片浪费率,但是因为本身最多也就分配128B,所以第一个区间的浪费率我们忽略不计。

不考虑第一个区间:

以16B对齐,则内碎片最大为 16 − 1 = 15 16 - 1 = 1516−1=15,最小为0(刚好申请内存大小为对齐数的整数倍)

若申请129B,则内碎片浪费率为 15 / 144 ≈ 10 % 15 / 144 ≈ 10\%15/144≈10%。同理,申请145B,内碎片浪费率为 15 / 160 ≈ 10 % 15 / 160 ≈ 10\%15/160≈10%

以128B对齐,则内碎片最大为 128 − 1 = 127 128 - 1 = 127128−1=127,最小为0(刚好申请内存大小为对齐数的整数倍)

若申请1025B,则内碎片浪费率为 127 / 1152 ≈ 11 % 127 / 1152 ≈ 11\%127/1152≈11%。

以1024B对齐,则内碎片最大为 1024 − 1 = 1023 1024 - 1 = 10231024−1=1023,最小为0(刚好申请内存大小为对齐数的整数倍)

若申请(8*1024+1)B,则内碎片浪费率为 1023 / 9216 ≈ 11 % 1023 / 9216 ≈ 11\%1023/9216≈11%。

以8*1024B对齐,则内碎片最大为 8 ∗ 1024 − 1 = 8191 8*1024 - 1 = 81918∗1024−1=8191,最小为0(刚好申请内存大小为对齐数的整数倍)

若申请(64*1024+1)B,则内碎片浪费率为 8191 / 73728 ≈ 11 % 8191 / 73728 ≈ 11\%8191/73728≈11%。(8*1024+1)B,则内碎片浪费率为 1023 / 9216 ≈ 11 % 1023 / 9216 ≈ 11\%1023/9216≈11%。

综上,ThreadCache的结构图如下:

 

实现过程中的部分注意点:

//向CentralCache申请内存
void* ThreadCache::FetchFromCentralCache(size_t MemSize)
{
//上层只申请了一块内存,ThreadCache也只向中心缓存要一块来供给上层,合情合理
//但是ThreadCache也可以多申请几块(补充库存)
//这样下次上层再向ThreadCache申请内存的时候,就能直接给上层分配,不用频繁向中心缓存申请了。

//申请策略?慢增长
size_t requestMemNum = min(_freeList[index].GetMaxUseNum(), SizeClass::RequestWay(MemSize));

}

 //申请策略。计算实际向下层申请的内存数量

//参数:申请的内存大小
static size_t RequestWay(size_t MemSize)
{
assert(MemSize > 0);

//算法:固定值 / 申请的内存大小
//分母越大,结果越小
//申请的内存越小,一次性可以多要一些
//申请的内存越大,一次性要少点,否则对系统负担大
size_t ret = MaxMemSize / MemSize;

if (ret < 2) //下限
{
ret = 2;
}

if (ret > 512) //上限
{
ret = 512;
}
return ret;
}
 
//释放内存
//ThreadCache桶链太长的话,应当回收一部分给CentralCache
//太长?长短的标准是什么?
//定义使用频数成员变量,当长度 > 使用频数的时候,就可以考虑回收一部分了:回收使用频数个

3.3CentralCache模块

kb -> Span

CentralCache与Thread结构相似,不同点在于kb映射的是Span页,Span页中是一个个切割的小块内存,以及每个桶链中都有桶锁:

 

为什么CentralCache的桶链中的数据结构是带头双向循环链表,而ThreadCache的桶链结构是单链表?

因为CentralCache中,内存被Span结构体管理着,Span是一个结构体,要形成双向循环链表无非就是在结构体中添加分别指向前驱Span和后继SPan的指针。

ThreadCache中,管理的是原生内存,没有经过任何封装,因此无法像结构体那样增加前驱和后继指针,形成带头双向循环链表。

既然ThreadCache中的内存没有经过任何封装,没有添加指针,那它是怎么将内存链起来的?

内存本身天然就是数据的容器!!! 数据是写在内存上的,换句话说,内存上是可以写数据的!!! 只要我们能把下一段内存的地址写入当前内存中,不就好像把他们链起来了吗?!!

 

32位平台下指针地址占4Byte,64位平台下指针地址占8Byte。所以我们也可以大胆猜测一下为什么要设置对齐数从8Byt开始,

或许就是为了让内存池无论是在32位平台还是64位平台下,都至少能把地址写进去,都能够按照这套逻辑进行,保证程序的移植性。

如果内存再大一点,比如16Byte,应该就能想办法做到前驱内存地址,比如:前8Byte用来记录后继内存地址,后8Byte用来记录前驱内存地址。只是这样做,代码可能会变得更复杂,而且8Byte内存无法这样做,无法统一逻辑。或许出于种种考虑,因此不这么做了。

如何正确地修改内存,让它保存下一个内存的地址?

假设有一个指针mem指向一段内存:

void* mem;

如果我们要改变mem指针所指向内存的内容,需要访问这段并修改,回顾一下指针类型的意义:

  • 数据是存储在内存中的,指针类型决定了指针如何看待所指内存,看待内存的角度不同,取出来的数据值也不同

  • 指针类型决定了指针访问所指内存的字节数

int main()
{
//00000001 00001001 00001001 00001001
//小端: 09 09 09 01
unsigned int i = 0x01090909;
cout << i << endl << endl;

cout << 0x09 << endl;
char* p = (char*)&i;
cout << "*p = " << (int)* p << endl << endl; //9;
//这里需要在打印的时候再把值强转成int类型,否则ASCII中,9对应的字符是'\t'
//char类型把他当'\t'打印了,运行结果显示什么也没有。

cout << 0x0909 << endl;
wchar_t* q = (wchar_t*)&i;
cout << "*q = " << (int)*q << endl << endl;

unsigned int* r = &i;
cout << "*r = " << *r << endl;
}

这段代码可以很充分地帮我们验证指针类型的意义。

综上述,如果要修改内存中的内容,只要正确使用指针即可。

能否这样写:

//已知void* m1指向一段内存,void* m2指向另一段内存
*(int*)(m1) = m2;

逐步分析:

  • 强转int*:将m1所指内存当做当做一个存储int数据类型的内存,即将来解引用时,访问4个字节。

  • 解引用并赋值:将m1内存中的前4个字节的内容写入m2(另一段内存的地址)

像以上这样的操作,在32位平台下,是没有问题的,但是在64位平台下,就有问题了。

无论是32位平台还是64位平台,int都是占4个字节!*(int*)表示访问内存的4个字节,并记录另一段内存的地址。

但是在64位平台下,指针(地址)占8个字节,只用4个字节记录,是无法正确记录另一段内存的地址的。 如果上面那样去修改内存内容,在64位平台下会出错,代码的可移植性差。

需要一个类型,在32位平台下是4个字节,64位平台下是8个字节,让我们能够正确强转。 实际上,问题的答案在一开始就有了,指针在32位平台下4个字节,64位平台下8个字节。

但是我们最后是要解引用的,一级指针解引用后就不是指针了,所以强转的指针类型必须是2级指针

*(void**)(m1) = m2;

实现过程中的部分注意点:

//从CentralCache的自由链表中取出一个Span
Span* CentralCache::GetOneSpan(SpanList&list, size_t MemSize)
{
//....

//若桶链中没有Span或所有Span都没有剩余内存
//向PageCache申请Span
//考虑到ThreadCache向CentralCache要内存并不是需要多少拿多少,而是多拿
//如果原本CentralCache中有Span可用,有多少给多少,不一定够,尽量满足也就罢了
//既然调到了这个函数,就说明CentralCache中没有Span,要再找PageCache申请Span
//那既然要申请,那就理所当然申请一定能够满足ThreadCache的内存数
//那就去计算这样大小的内存,上层实际会申请会申请多少个,转换成页是多少页?
//再按照这样的页数报给PageCache去申请
size_t pageNum = SizeClass::NumberOfPages(MemSize);

//PageCache使用的是大锁,锁住整个PageCache
PageCache::GetPageCache()->PageCacheLock();
Span* newSpan = PageCache::GetPageCache()->GetSpanToUpper(pageNum);
PageCache::GetPageCache()->PageCacheUnlock();
assert(newSpan);

// 为什么申请完Span就解锁了?切割不需要加锁吗?
// 切割不需要加锁,因为切割是把新申请的Span切割,这个newSpan指针是只属于当前线程的局部变量
// 也就是说只有当前线程能够看到这个Span。不存在线程安全问题,因此切割的时候不需要加锁。
// 这样就能提高效率,如果有其他的CentralCache桶中无Span,也需要申请,但是PageCache只有一把大锁
// 如果切割的时候没用到却没有释放掉,其他线程就只能阻塞了。
}

3.4PageCache模块

Page -> Span

 

虽然PageCache和CentralCache的桶链结构都是Span,但是他们的区别还是很大的,Central中的Span页内存,终将切割成为一块块与key值对应的小内存块,而PageCache中的Span与它的key值(页大小)直接对应,key值多大,Span中管理的页内存就是多大,不会进行切割。

实现过程中的部分注意点:

//参数:上层要拿k页
Span* PageCache::GetSpanToUpper(size_t k)
{
assert(k > 0);

//申请页数 > 128,直接向堆申请
if (k > NPage - 1)
{
//Span* p = (Span*)SystemAlloc(k); //不能这么写
//栈上的局部变量,出了作用域就销毁了
//但是我们在释放的时候,还要用Span来释放,所以必须在堆上开Span避免被自动销毁

void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
span->_isUse = true;

_pageMapSpan[span->_pageId] = span;

return span;
}

else
{

//页数 = 桶下标
//到对应桶中拿Span
if (!_pclist[k].Empty())
{
Span* kSpan = _pclist[k].PopFront();
kSpan->_isUse = true;

for (int i = 0; i < kSpan->_n; ++i)
{
_pageMapSpan[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}

//走到这里说明对应页的桶中无Span,往下面的桶里找
size_t n = k + 1;
while (n < NPage)
{
//往下找,找到了非空页桶,取出其中的span拿来切割成pages页 和 i - pages页
if (!_pclist[n].Empty())
{
Span* nPages = nPages = _pclist[n].PopFront();
//Span* kPages = new Span;

Span* kPages = _spanPool.New();


//切割成两部分
//前一部分的页号就是原本的页号 不需要改动
kPages->_pageId = nPages->_pageId;
kPages->_n = k;

nPages->_pageId += k; //后一部分的起始页号 = 原本的起始页号 + 前一部分的页数
nPages->_n -= k; //切割后的页数要改一下



//建立id和Span的映射,方便查找
for (PAGE_ID i = 0; i < kPages->_n; ++i)
{
_pageMapSpan[kPages->_pageId + i] = kPages;
}


//没有向上交付的页,即nPages的首尾页映射一下即可

//kPage要把Span中的每一页都映射是因为CentralCache中没有页的概念
//对于CentralCache而言,它拿到的就是一个Span,然后把这个Span不断切割成等量小块内存
//因此中间页的内存也会被切割,和大Span在逻辑上分离。
// 例如: 某Span有 100 101 102 三个页
//假设我们CentralCache层也只映射首尾页,当某个内存需要回收,那它首先需要根据地址找到它所在的页
//如果它所在的页是101页,当我们拿着这个页去映射它对应的Span时,就会发现映射不到Span
//因为我们只映射了首尾:100 102

//但nPages不同,nPages映射的目的是为了在回收内存的时候去合并更大的页,
//因此就要找某页的前后页,如果它的前后页也没有被使用,那么它们就能合并
//例如: 页号为200的Span被回收,并尝试合并
//它会向前找页号199的Span(Span中可能有多页,199是该Span的最后一页), 向后找页号201的Span(201是该Span的第一页),看它们是否被使用
//但首尾位置是相对的,我们无法预知
//就像如果199所在的Span有3页:197 198 199
//对于200来说,它通过且只能通过199页去找所在的它所在Span是否被使用,
//因为程序无法确定198是不是和199在同一个Span,如果不是,那么200要合并的是199所在的Span
//而不是198所在的Span,因为200的Span只和199所在的Span连续

//但对于196来说,他需要的却是197而不是199
//因此相对位置无法确定,首尾页都必须映射。

//并且上述例子又再次印证了为什么nPage不需要映射中间页
//就像200不能够使用198去映射一样,中间页在Span中是没有被切割的,并且在这一层使用中间页映射是不合逻辑的

_pageMapSpan[nPages->_pageId] = nPages;
_pageMapSpan[nPages->_pageId + nPages->_n - 1] = nPages; //最后一页的页号 = _pageId + n - 1

//将 i - page页的Span插入相应的页桶中, 然后将page页的Span交付上层。
_pclist[nPages->_n].PushFront(nPages);

//PushVmap();

return kPages;
}

n++;
}

//走到这里,说明直到最后一个桶都没找到未分配的Span
//需要直接向堆申请内存页。直接申请一个最大内存页:128页
//Span* newSpan = new Span;
Span* newSpan = _spanPool.New();

void* ptr = SystemAlloc(NPage - 1);


newSpan->_pageId = (PAGE_ID)ptr >> 13; // 页号 = 地址 / 8k
newSpan->_n = NPage - 1;
//链接到128页对应的桶
_pclist[newSpan->_n].PushFront(newSpan);


return GetSpanToUpper(k); //再调一次该函数,自动完成切割并向上交付
}
}

 

回收的时候消耗过大:map查找有消耗 + 锁竞争。 并且map越慢,竞争越激烈

转至:https://mp.weixin.qq.com/s/_4gngkFM9glQY3tjV8d-6Q