PSI内存跟踪详解(三)

发布时间 2023-08-02 17:59:27作者: 吃饭端住碗

内存监控接口则是PSI_memory_service_v2对象数组中的函数,具体的相关函数有:pfs_memory_alloc_vc、pfs_memory_realloc_vc、pfs_memory_claim_vc、pfs_memory_free_vc。

内存跟踪的相关数据结构

PFS监控信息的存储和聚合分为不同的维度,比如基于thread、account、host、user、global,其中thread为最上层的维度。而最重要的一个数据结构也是和用户线程关系最为密切的PFS_thread数据结构,每个用户连接都有一个该数据结构,用于用户PFS监控数据(包括但不限于Memory监控统计信息)和线程信息的存储。

PFS_thread数据结构在创建用户线程的时候在pfs_spawn_thread函数中被创建:

static void *pfs_spawn_thread(void *arg) {
  PFS_spawn_thread_arg *typed_arg = (PFS_spawn_thread_arg *)arg;
  void *user_arg;
  void *(*user_start_routine)(void *);
  PFS_thread *pfs;
  /* First, attach instrumentation to this newly created pthread. */
  // 初始化PFS元数据存储对象
  PFS_thread_class *klass = find_thread_class(typed_arg->m_child_key);
  if (likely(klass != nullptr)) {
    // 创建PFS_thread对象,分配相关内存,并初始化其成员对象
pfs = create_thread(klass, typed_arg->m_child_identity, 0);
...
  } else {
    pfs = nullptr;
  }
  my_thread_set_THR_PFS(pfs);
...
  /* Then, execute the user code for this thread. */
  (*user_start_routine)(user_arg);
  return nullptr;
}

在create_thread函数中分配PFS_thread对象内存,并进行基础对象成员的初始化:

PFS_thread *create_thread(PFS_thread_class *klass,
                          const void *identity MY_ATTRIBUTE((unused)),
                          ulonglong processlist_id) {
  PFS_thread *pfs;
  pfs_dirty_state dirty_state;
  pfs = global_thread_container.allocate(&dirty_state);
  if (pfs != nullptr) {
    pfs->m_thread_internal_id = thread_internal_id_counter.m_u64++;
    pfs->m_parent_thread_internal_id = 0;
    pfs->m_processlist_id = static_cast<ulong>(processlist_id);
    pfs->m_thread_os_id = 0;
    pfs->m_system_thread = !(klass->m_flags & PSI_FLAG_USER);
    pfs->m_event_id = 1;
    pfs->m_stmt_lock.set_allocated();
    pfs->m_session_lock.set_allocated();
    pfs->set_enabled(klass->m_enabled);
    pfs->set_history(klass->m_history);
    pfs->m_class = klass;
    pfs->m_events_waits_current = &pfs->m_events_waits_stack[WAIT_STACK_BOTTOM];
    pfs->m_waits_history_full = false;
...
  }
  return pfs;
}

除了PFS_thread数据结构外,还有其他的一些重要的数据结构,比如PFS_account、PFS_user、PFS_host等,其数据结构及关系图如下(建议放大看,4K高清无码):

最上层的数据结构为PFS_connection_slice,其描述了一组connections的统计信息,可以不用过多关心。

下面有四个核心的数据结构,分别是PFS_thread、PFS_account、PFS_user和PFS_host。这四个数据结构的关系为:PFS_thread为一级结构,PFS_account为二级结构,PFS_user和PFS_host为三级结构。其中的数据成员结构存储了内存监控统计信息,其中的成员函数分两种,一种用来进行统计信息的计算,一种用来进行统计信息的聚合。

存储内存监控统计信息的数据结构主要有两个:PFS_memory_safe_stat和PFS_memory_shared_stat。

PFS_thread中的PFS_memory_safe_stat的对象存储了每个thread的内存聚合统计信息,其可以看作是PERFORMANCE_SCHEMA.MEMORY_SUMMARY_BY_THREAD_BY_EVENT_NAME表的元数据对象。

PFS_user中的PFS_memory_shared_stat对象存储了每个user的内存聚合统计信息,其可以看作是PERFORMANCE_SCHEMA.MEMORY_SUMMARY_BY_USER_BY_EVENT_NAME表的元数据对象。

PFS_host中的PFS_memory_shared_stat对象存储了每个host的内存聚合统计信息,其可以看作是PERFORMANCE_SCHEMA.MEMORY_SUMMARY_BY_HOST_BY_EVENT_NAME表的元数据对象。

PFS_account中的PFS_memory_shared_stat对象存储了每个account的内存聚合统计信息,其可以看作是PERFORMANCE_SCHEMA.MEMORY_SUMMARY_BY_ACCOUNT_BY_EVENT_NAME表的元数据对象。

PFS_memory_safe_stat和PFS_memory_shared_stat主要有以下属性:

  • m_alloc_count:已分配内存的次数
  • m_free_count:已释放内存的次数
  • m_alloc_size:已分配的内存大小
  • m_free_size:已释放的内存大小
  • m_alloc_count_capacity:(在高水位下)能够分配内存的次数
  • m_free_count_capacity:(在高水位下)能够释放内存的次数
  • m_alloc_size_capacity:(在高水位下)能够分配内存的大小
  • m_free_size_capacity:(在高水位下)能够释放内存的大小

(PFS_memory_safe_stat和PFS_memory_shared_stat中数据的区别在于PFS_memory_shared_stat的属性是由std::atomic原子性的。)

从概念上来讲,维护了如下信息:

  • CURRENT_COUNT_USED
  • LOW_COUNT_USED
  • HIGH_COUNT_USED
  • CURRENT_SIZE_USED
  • LOW_SIZE_USED
  • HIGH_SIZE_USED

计算方式如下:

  • CURRENT_COUNT_USED = m_alloc_count - m_free_count
  • LOW_COUNT_USED + m_free_count_capacity = CURRENT_COUNT_USED
  • CURRENT_COUNT_USED + m_alloc_count_capacity = HIGH_COUNT_USED
  • CURRENT_SIZE_USED = m_alloc_size - m_free_size
  • LOW_SIZE_USED + m_free_size_capacity = CURRENT_SIZE_USED
  • CURRENT_SIZE_USED + m_alloc_size_capacity = HIGH_SIZE_USED

参考图示:

PFS_memory_stat_alloc_delta接口中包含了两个属性:

  • m_alloc_count_delta
  • m_alloc_size_delta

这两个属性标识了是否需要推进高水位线。

内存分配统计信息搜集

pfs_memory_alloc_vc函数为PSI的内存分配监控接口,从这里开始进入正题,相关堆栈如下(标记为[X]的为可忽略逻辑):

|pfs_memory_alloc_vc
|--stat->count_alloc(size, &delta_buffer)  
|--pfs_thread->carry_memory_stat_alloc_delta(delta, index)
|----m_account->carry_memory_stat_alloc_delta(delta, index)
|------stat->apply_alloc_delta(delta, &delta_buffer)  
|------m_user->carry_memory_stat_alloc_delta(remaining_delta, index)
|--------stat->apply_alloc_delta(delta, &delta_buffer)
|------m_host->carry_memory_stat_alloc_delta(remaining_delta, index)
|--------stat->apply_alloc_delta
|--------carry_global_memory_stat_alloc_delta(remaining_delta, index)
|----------stat->apply_alloc_delta
|------carry_global_memory_stat_alloc_delta(remaining_delta, index)[X]
|----m_user->carry_memory_stat_alloc_delta(delta, index)[X]
|----m_host->carry_memory_stat_alloc_delta(delta, index)[X]
|----carry_global_memory_stat_alloc_delta(delta, index)[X]
|--stat->count_global_alloc(size)[X]

从堆栈上看整个统计信息的核心逻辑主要在count_alloc函数中,如果判断有delta的话就从thread维度应用delta,应用过之后如果还存在delta,就继续推到account维度,继而user和host维度,直到global维度。

pfs_memory_alloc_vc函数代码如下(代码讲解看注释):

PSI_memory_key pfs_memory_alloc_vc(PSI_memory_key key, size_t size,
                                   PSI_thread **owner) {
  PFS_thread **owner_thread = reinterpret_cast<PFS_thread **>(owner);
  assert(owner_thread != nullptr);
  /*
有三种情况不会进行内存分配跟踪:
1.未开启Consumer
2.未找到对应的Performance Schema Key
3.相关Instrument未启用
  */
  if (!flag_global_instrumentation) { 
    *owner_thread = nullptr;
    return PSI_NOT_INSTRUMENTED;
  }
  PFS_memory_class *klass = find_memory_class(key); 
  if (klass == nullptr) {
    *owner_thread = nullptr;
    return PSI_NOT_INSTRUMENTED;
  }

  if (!klass->m_enabled) {    
    *owner_thread = nullptr;
    return PSI_NOT_INSTRUMENTED;
  }
/*
内存统计信息的搜集分两种:非global和global。
所谓非global可以简单理解为由用户线程触发;
global由MySQL后台线程触发或用户线程需要加载插件等。
不同的统计信息的搜集方式也决定了后续统计信息的聚合,这个放到后续再讲。
*/
 
  uint index = klass->m_event_name_index;
  // 非Global的内存统计信息搜集
  if (flag_thread_instrumentation && !klass->is_global()) {
    PFS_thread *pfs_thread = my_thread_get_THR_PFS();
    if (unlikely(pfs_thread == nullptr)) {
      *owner_thread = nullptr;
      return PSI_NOT_INSTRUMENTED;
    }
    if (!pfs_thread->m_enabled) {
      *owner_thread = nullptr;
      return PSI_NOT_INSTRUMENTED;
    }
    PFS_memory_safe_stat *event_name_array;
    PFS_memory_safe_stat *stat;
    PFS_memory_stat_alloc_delta delta_buffer;
    PFS_memory_stat_alloc_delta *delta;
    /* Aggregate to MEMORY_SUMMARY_BY_THREAD_BY_EVENT_NAME */ 
    event_name_array = pfs_thread->write_instr_class_memory_stats();
    stat = &event_name_array[index];
    delta = stat->count_alloc(size, &delta_buffer);
    if (delta != nullptr) {
      pfs_thread->carry_memory_stat_alloc_delta(delta, index);
    }
    /* Flag this memory as owned by the current thread. */
    *owner_thread = pfs_thread;
  } else {
    // Global的内存统计信息搜集
    PFS_memory_shared_stat *event_name_array;
    PFS_memory_shared_stat *stat;
    /* Aggregate to MEMORY_SUMMARY_GLOBAL_BY_EVENT_NAME */    
    event_name_array = global_instr_class_memory_array;
    stat = &event_name_array[index];
    stat->count_global_alloc(size);
    *owner_thread = nullptr;
  }
  return key;
}

Global内存分配统计信息搜集

在非Global的内存统计信息搜集逻辑中,首先获取相关的PFS_thead对象:

PFS_thread *pfs_thread = my_thread_get_THR_PFS();

后面则通过PFS_memory_stat_alloc_delta *PFS_memory_safe_stat::count_alloc函数进行统计信息的计算,且有delta的话返回delta(delta标记了是否需要推进High watermark):

PFS_memory_stat_alloc_delta *PFS_memory_safe_stat::count_alloc(
    size_t size, PFS_memory_stat_alloc_delta *delta) {
  m_used = true;
  m_alloc_count++;   
  m_free_count_capacity++;  
  m_alloc_size += size;     
  m_free_size_capacity += size;    
  if ((m_alloc_count_capacity >= 1) && (m_alloc_size_capacity >= size)) {         
    m_alloc_count_capacity--;
    m_alloc_size_capacity -= size;
    return nullptr;
  }
  if (m_alloc_count_capacity >= 1) {                                              
    m_alloc_count_capacity--;
    delta->m_alloc_count_delta = 0;   
  } else {                                                                        
    delta->m_alloc_count_delta = 1;   
  }
  if (m_alloc_size_capacity >= size) {                                            
    m_alloc_size_capacity -= size;
    delta->m_alloc_size_delta = 0;
  } else {                                                                        
    delta->m_alloc_size_delta = size - m_alloc_size_capacity;
    m_alloc_size_capacity = 0;
  }
  return delta;
}

在函数开始,会进行统计信息的增长计算。后面会判断是否需要推进高水位线。总共有4种情况:

  1. m_alloc_count_capacity >= 1 && m_alloc_size_capacity >= size: 不需要推进HIGH_COUNT_USED和HIGH_SIZE_USED,直接将m_alloc_count_capacity和m_alloc_size_capacity进行decrease。
  2. m_alloc_count_capacity >= 1 && m_alloc_size_capacity < size: 不需要推进HIGH_COUNT_USED,但是需要推进HIGH_SIZE_USED。对应场景:释放内存后的内存新分配,分配量较大。
  3. m_alloc_count_capacity < 1 && m_alloc_size_capacity < size: HIGH_COUNT_USED和HIGH_SIZE_USED都需要推进。对应场景:连续进行内存分配,且分配量较大。
  4. m_alloc_count_capacity < 1 && m_alloc_size_capacity >= size: 需要推进HIGH_COUNT_USED,不需要推进HIGH_SIZE_USED。对应场景:连续进行内存分配,但是分配量较小。

如果需要推进高水位线,那么需要将m_alloc_count_delta或m_alloc_size_delta设置为非0,用来标记需要推进高水位;且此时也需要将 m_alloc_size_capacity设置位0,代表到水位线下已经没有可用内存。

如果内存使用在高水位以下的话函数就直接返回了,如果存在delta的话后续还需要进行应用delta。

carry_memory_stat_alloc_delta函数主要是将delta从子级维度推到父级维度,比如将delta从thread维度推到account维度应用,然后再从account维度推到user、host维度,直到global维度。实际应用delta的函数主要是apply_alloc_delta函数,不同的维度对象delta应用逻辑都是相同的,其代码如下:

PFS_memory_stat_alloc_delta *PFS_memory_shared_stat::apply_alloc_delta(
    const PFS_memory_stat_alloc_delta *delta,
    PFS_memory_stat_alloc_delta *delta_buffer) {
  size_t val;
  size_t remaining_alloc_count = 0;
  size_t remaining_alloc_size = 0;
  bool has_remaining = false;
  m_used = true;
  val = delta->m_alloc_count_delta;
  if (val > 0) {
    if (val <= m_alloc_count_capacity) {
      m_alloc_count_capacity -= val;
      remaining_alloc_count = 0;
    } else {
      remaining_alloc_count = val - m_alloc_count_capacity;
      m_alloc_count_capacity = 0;
      has_remaining = true;
    }
  }

  val = delta->m_alloc_size_delta;
  if (val > 0) {
    if (val <= m_alloc_size_capacity) {
      m_alloc_size_capacity -= val;
      remaining_alloc_size = 0;
    } else {
      remaining_alloc_size = val - m_alloc_size_capacity;
      m_alloc_size_capacity = 0;
      has_remaining = true;
    }
  }
  if (!has_remaining) {
    return nullptr;
  }
  delta_buffer->m_alloc_count_delta = remaining_alloc_count;
  delta_buffer->m_alloc_size_delta = remaining_alloc_size;
  return delta_buffer;
}

其判断逻辑也比较简单,主要是将m_alloc_count_capacity和m_alloc_size_capacity的值和delta中的m_alloc_count_delta和m_alloc_size_delta属性进行对比,如果delta的属性值较大,就说明还存在delta,需要将delta推到父级维度再次进行对比,并将本维度的m_alloc_count_capacity和m_alloc_size_capacity值置为0,表示在高水位线下已经没有可用分配次数或可用分配大小。

Global内存统计信息搜集

前面讲过在内存分配监控结构pfs_memory_alloc_vc函数中,内存分配统计信息的搜集分为非Global和Global两种方式,上一章节中讨论了非Global的实现逻辑,下面讨论Global的实现逻辑。

其相关堆栈如下:

|pfs_memory_alloc_vc
|--PFS_memory_shared_stat::count_global_alloc
|----PFS_memory_shared_stat::count_builtin_alloc

主要的实现逻辑主要是在PFS_memory_shared_stat::count_builtin_alloc函数中,其相关代码如下:

void PFS_memory_shared_stat::count_builtin_alloc(size_t size) {
  m_used = true;
  m_alloc_count++;
  m_free_count_capacity++;
  m_alloc_size += size;
  m_free_size_capacity += size;
  size_t old_value;
  /* Optimistic */
  old_value = m_alloc_count_capacity.fetch_sub(1);
  /* Adjustment */
  if (old_value <= 0) {
    m_alloc_count_capacity++;
  }
  /* Optimistic */
  old_value = m_alloc_size_capacity.fetch_sub(size);
  /* Adjustment */
  if (old_value < size) {
    m_alloc_size_capacity = 0;
  }
  return;
}

Global的统计信息的统计相对简单,只是对相关统计信息进行了increase操作,并对m_alloc_count_capacity和m_alloc_size_capacity属性进行了适应性调整,防止出现负值。

内存释放统计信息搜集

内存释放时统计信息的搜集和内存分配时统计信息的搜集类似,区别在于分配时是对alloc统计信息的状态变量是进行increase,而内存释放是对free统计信息状态变量进行increase。其相关堆栈和内存分配时类似:

|pfs_memory_free_vc
|--PFS_memory_safe_stat::count_free
|--PFS_thread::carry_memory_stat_free_delta
|----PFS_account::carry_memory_stat_free_delta
|------PFS_memory_shared_stat::apply_free_delta
|------PFS_user::carry_memory_stat_free_delta
|--------PFS_memory_shared_stat::apply_free_delta
|------PFS_host::carry_memory_stat_free_delta
|--------PFS_memory_shared_stat::apply_free_delta
|--------carry_global_memory_stat_free_delta
|----------PFS_memory_shared_stat::apply_free_delta
|------carry_global_memory_stat_free_delta[X]
|----PFS_user::carry_memory_stat_free_delta[X]
|----PFS_host::carry_memory_stat_free_delta[X]
|----carry_global_memory_stat_free_delta[X]
|--PFS_memory_shared_stat::count_free[X]

内存释放的统计信息搜集接口函数是pfs_memory_free_vc,其也分为非Global和Global两种情况,其中非Global的核心实现是在PFS_memory_safe_stat::count_free函数中,其代码如下:

PFS_memory_stat_free_delta *PFS_memory_safe_stat::count_free(
    size_t size, PFS_memory_stat_free_delta *delta) {
  m_used = true;
  m_free_count++;              
  m_alloc_count_capacity++;     
  m_free_size += size;          
  m_alloc_size_capacity += size;        

  if ((m_free_count_capacity >= 1) && (m_free_size_capacity >= size)) {         
    m_free_count_capacity--;
    m_free_size_capacity -= size;
    return nullptr;
  }

  if (m_free_count_capacity >= 1) {                                             
    m_free_count_capacity--;
    delta->m_free_count_delta = 0;
  } else {                                                                      
    delta->m_free_count_delta = 1;
  }

  if (m_free_size_capacity >= size) {                                           
    m_free_size_capacity -= size;
    delta->m_free_size_delta = 0;
  } else {                                                                      
    delta->m_free_size_delta = size - m_free_size_capacity;
    m_free_size_capacity = 0;
  }
  return delta;
}

可以看到,内存分配时统计信息的计算主要是对free的统计信息进行increase,后面也会进行delta的计算,来判断是否需要拉低低低水位线,且如果有delta的话就返回delta,并将delta推到父级继续进行delta的计算。这里就不再展开详细再讲了。