源码分析CHANGE REPLICATION SOURCE TO

发布时间 2023-03-29 14:30:25作者: 吃饭端住碗

从MySQL 8.0.23版本开始,CHANGE MASTER TO开始被替换为CHANGE REPLICATION SOURCE TO,下面使用MySQL 8.0.32的代码分析语句的具体执行流程。

从语句的入口函数mysql_execute_command开始,在命令执行之前首先会检查语句执行用户是否有REPLICATION_SLAVE_ADMIN或SUPER权限:

case SQLCOM_CHANGE_MASTER: {
      Security_context *sctx = thd->security_context();
      if (!sctx->check_access(SUPER_ACL) &&
          !sctx->has_global_grant(STRING_WITH_LEN("REPLICATION_SLAVE_ADMIN"))
               .first) {
        my_error(ER_SPECIFIC_ACCESS_DENIED_ERROR, MYF(0),
                 "SUPER or REPLICATION_SLAVE_ADMIN");
        goto error;
      }
      res = change_master_cmd(thd);
      break;
}       

随后进入change_master_cmd函数,该函数主要是检查channel name的有效性,将channel name添加到channel map中:

bool change_master_cmd(THD *thd) {
  DBUG_TRACE;

  Master_info *mi = nullptr;
  LEX *lex = thd->lex;
  bool res = false;

  channel_map.wrlock();

  // 判断实例是否能够被初始化为从实例;比如server_id为0或在多源复制场景下,本来有多个channel并且repository为table,而当实例重启后且repository被修改为File时,实例就不能成功加载默认channel,此时实例就无法初始化为从实例
  if (!is_slave_configured()) {          
    my_error(ER_SLAVE_CONFIGURATION, MYF(0));
    res = true;
    goto err;
  }

  if (channel_map.is_group_replication_channel_name(lex->mi.channel, true)) { // 判断指定的channel name是否为group_replication_applier channel
   // 判断CHANGE REPLICATION SOURCE TO指定的参数对于group_replication_applier channel是否是有效的(只有PRIVILEGES_CHECKS_USER是有效的) LEX_MASTER_INFO *lex_mi = &thd->lex->mi; if (is_invalid_change_master_for_group_replication_applier(lex_mi)) { my_error(ER_SLAVE_CHANNEL_OPERATION_NOT_ALLOWED, MYF(0), "CHANGE MASTER with the given parameters", lex->mi.channel); res = true; goto err; } // 需要保证group replication处于停止状态 if (is_group_replication_running()) { my_error(ER_GRP_OPERATION_NOT_ALLOWED_GR_MUST_STOP, MYF(0)); res = true; goto err; } } // 如果指定的channel name为group_replication_recovery channel name,判断指定的change master选项是否是受支持的,只能修改MASTER_USER或MASTER_PASSWORD if (channel_map.is_group_replication_channel_name(lex->mi.channel) && !channel_map.is_group_replication_channel_name(lex->mi.channel, true)) { LEX_MASTER_INFO *lex_mi = &thd->lex->mi; if (is_invalid_change_master_for_group_replication_recovery(lex_mi)) { my_error(ER_SLAVE_CHANNEL_OPERATION_NOT_ALLOWED, MYF(0), "CHANGE MASTER with the given parameters", lex->mi.channel); res = true; goto err; } } /* Error out if number of replication channels are > 1 if FOR CHANNEL clause is not provided in the CHANGE MASTER command. */ if (!lex->mi.for_channel && channel_map.get_num_instances() > 1) { //在使用多源复制时不支持不指定channel name my_error(ER_SLAVE_MULTIPLE_CHANNELS_CMD, MYF(0)); res = true; goto err; } /* Get the Master_info of the channel */ mi = channel_map.get_mi(lex->mi.channel); /* create a new channel if doesn't exist */ if (!mi && strcmp(lex->mi.channel, channel_map.get_default_channel())) { /* The mi will be returned holding mi->channel_lock for writing */ if (add_new_channel(&mi, lex->mi.channel)) goto err; // 初始化maser info并将channel添加到channel map中 } if (mi) { bool configure_filters = !Master_info::is_configured(mi); if (!(res = change_master(thd, mi, &thd->lex->mi)))

 随后进入change_master函数,change_master函数主要是检查CHANGE REPLICATION SOURCE TO指定的选项的有效性和选项之间是否冲突、更新Master Info和Relay Log Info信息并刷盘:

int change_master(THD *thd, Master_info *mi, LEX_MASTER_INFO *lex_mi,
                  bool preserve_logs) {
  int error = 0;

  // 标记是否指定了和IO线程相关的选项
  bool have_receive_option = false;
  // 标记是否指定了和SQL、worker线程相关的选项
  bool have_execute_option = false;
  // 标记是否制定了同时会影响到IO线程和SQL、worker线程的选项
  bool have_both_receive_execute_option = false;
  bool validation_error = false;
  // 如果不存在mts gaps,则会删除worker info表信息
  bool mta_remove_worker_info = false;
  // 使用bit位标记正在运行的复制线程
  int thread_mask;
  // 只有在执行CHANGE REPLICATION SOURCE TO时没有指定relay_log_file/relay_log_pos且复制线程都停止时才可能会清除relay log
  bool need_relay_log_purge = true; 
  // 记录先前的Master信息,以便后面在error log中打印相关参数的变更
  char saved_host[HOSTNAME_LENGTH + 1], saved_bind_addr[HOSTNAME_LENGTH + 1];
  uint saved_port = 0;
  char saved_log_name[FN_REFLEN];
  my_off_t saved_log_pos = 0;

  DBUG_TRACE;

  // 由于复制线程需要修改mysql.slave_master_info表所以忽略只读控制
  thd->set_skip_readonly_check();   
  // 防止其他线程修改Master_Info信息
  mi->channel_wrlock();             
  
  // 防止其他线程改变复制线程状态
  lock_slave_threads(mi);         

  // 返回正在运行运行的复制线程(IO/SQL)
  init_thread_mask(&thread_mask, mi, false); 

  // 如果有正在运行的复制线程,为了防止数据丢失CHANGE REPLICATION SOURCE TO语句不会进行purge relay log操作;
  // 但relay log的purge操作依旧受relay_log_purge参数的影响
  if (thread_mask)
  {
    need_relay_log_purge = false;
  }

  /*
    判断语句是否设置或修改了任意影响IO线程的选项:
    - host
    - user
    - password
    - port
    - log_file_name
    - pos
    - connect_retry
    - ssl相关
    ...
  */
  have_receive_option = have_change_replication_source_receive_option(lex_mi);
  
  /*
    判断语句是否设置或修改了任意影响SQL、worker线程的选项:
    - relay_log_name
    - relay_log_pos
    - sql_delay
    - privilege_checks_username
  */
  have_execute_option = have_change_replication_source_execute_option(
      lex_mi, &need_relay_log_purge);
  
  /*
    判断语句是否设置了任意同时影响IO和SQL、worker线程的选项
    - assign_gtids_to_anonymous_transactions_type
    - auto_position
    - source_connection_auto_failover
    - gtid_only
  */
  have_both_receive_execute_option =  
      have_change_replication_source_applier_and_receive_option(lex_mi); 

  // 当复制线程运行时,不允许设置与其对应的选项
  if ((have_both_receive_execute_option &&
       ((thread_mask & SLAVE_IO) || (thread_mask & SLAVE_SQL))) ||
      (have_receive_option && have_execute_option && (thread_mask & SLAVE_IO) &&
       (thread_mask & SLAVE_SQL))) {
    error = ER_SLAVE_CHANNEL_MUST_STOP;
    my_error(ER_SLAVE_CHANNEL_MUST_STOP, MYF(0), mi->get_channel());
    goto err;
  }

  if (have_receive_option && (thread_mask & SLAVE_IO)) {
    error = ER_SLAVE_CHANNEL_IO_THREAD_MUST_STOP;
    my_error(ER_SLAVE_CHANNEL_IO_THREAD_MUST_STOP, MYF(0), mi->get_channel());
    goto err;
  }

  if (have_execute_option && (thread_mask & SLAVE_SQL)) {
    error = ER_SLAVE_CHANNEL_SQL_THREAD_MUST_STOP;
    my_error(ER_SLAVE_CHANNEL_SQL_THREAD_MUST_STOP, MYF(0), mi->get_channel());
    goto err;
  }

 
  /* 如果GTID_MODE != ON,验证指定的选项是否有效:
      - source_auto_position=1需要开启GTID
      - ASSIGN_GTIDS_TO_ANONYMOUS_TRANSACTIONS != OFF需要GTID_MODE = ON
      - GTID_ONLY= 1需要GTID_MODE = ON
      - SOURCE_CONNECTION_AUTO_FAILOVER = 1需要GTID_MODE = ON
  */
  if (global_gtid_mode.get() != Gtid_mode::ON) {
    if ((error = validate_gtid_option_restrictions(lex_mi, mi))) {
      goto err;
    }
  }

  /* 判断选项的兼容性,主要有:
      - master log file/pos和relay log file/log与auto_position选项冲突
      - assign_gtids_to_anonymous_transactions_info参数不为OFF时与auto_position冲突
      - CHANGE REPLICATION SOURCE TO GTID_ONLY = 1需要SOURCE_AUTO_POSITION = 1、REQUIRE_ROW_FORMAT = 1
      - CHANGE REPLICATION SOURCE TO SOURCE_CONNECTION_AUTO_FAILOVER = 1需要SOURCE_AUTO_POSITION = 1
      - GTID_ONLY = 1开启时不能关闭SOURCE_AUTO_POSITION
      - GTID_ONLY开启时不能关闭REQUIRE_ROW_FORMAT
      ...
  */
  if ((error = evaluate_inter_option_dependencies(lex_mi, mi))) { 
    goto err;
  }

  // preserve_logs参数未指定(false)
  if (need_relay_log_purge && 
      preserve_logs &&       
      mi->rli->inited)       
  {
    need_relay_log_purge = false;
  }

  THD_STAGE_INFO(thd, stage_changing_source);

  int thread_mask_stopped_threads;

  // 返回停止的复制线程
  init_thread_mask(&thread_mask_stopped_threads, mi, true); 

  // 如果不存在repository则创建否则读取Master Info和Relay Log Info信息
  if (load_mi_and_rli_from_repositories(mi, false, thread_mask_stopped_threads,  
                                        need_relay_log_purge)) {
    error = ER_MASTER_INFO;
    my_error(ER_MASTER_INFO, MYF(0));
    goto err;
  }
  // 检查PRIVILEGE_CHECKS_USER选项指定的username和hostname是否符合语法、用户是否有权限
  std::tie(validation_error, mta_remove_worker_info) =
      validate_change_replication_source_options(thd, lex_mi, mi, thread_mask); 

  if (validation_error) {
    error = 1;
    goto err;
  }

  // 保存原先的username、hostname、bind_addr
  if (have_receive_option) {   
    strmake(saved_host, mi->host, HOSTNAME_LENGTH);
    strmake(saved_bind_addr, mi->bind_addr, HOSTNAME_LENGTH);
    saved_port = mi->port;
    strmake(saved_log_name, mi->get_master_log_name(), FN_REFLEN - 1);
    saved_log_pos = mi->get_master_log_pos();
  }

  /*
    - 更新指定的master info信息:username、hostname、password、port、auto_position、master_log_file、master_log_pos...
    - 更新指定的relay log info信息:Relay Log File、Relay Log Pos...
  */  
  if (update_change_replication_source_options( 
          thd, lex_mi, mi, have_both_receive_execute_option,
          have_execute_option, have_receive_option)) {
    error = 1;
    goto err;
  }

  /* 
    如果需要purge relay log且没有指定host,port,log_file_name,log_file_position且relay log info有效,说明并没有修改复制源;
    此时使用relay log info的master log file、master log pos初始化master info的master log file、master log pos来从新拉取Binlog
  */
  if (need_relay_log_purge) {
    if (!lex_mi->host && !lex_mi->port && !lex_mi->log_file_name &&
        !lex_mi->pos && !mi->rli->is_applier_source_position_info_invalid()) {
      mi->set_master_log_pos(max<ulonglong>(
          BIN_LOG_HEADER_SIZE, mi->rli->get_group_master_log_pos()));
      mi->set_master_log_name(mi->rli->get_group_master_log_name());
    }
  }

  // 在error log中输出原先的source_host、source_port、source_log_file、source_log_pos和新的信息
  if (have_receive_option)  
    LogErr(SYSTEM_LEVEL, ER_SLAVE_CHANGE_MASTER_TO_EXECUTED,
           mi->get_for_channel_str(true), saved_host, saved_port,
           saved_log_name, (ulong)saved_log_pos, saved_bind_addr, mi->host,
           mi->port, mi->get_master_log_name(), (ulong)mi->get_master_log_pos(),
           mi->bind_addr);

  // 刷盘Master Info
  if ((thread_mask & SLAVE_IO) == 0 && flush_master_info(mi, true)) {
    error = ER_RELAY_LOG_INIT;
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
    goto err;
  }

  if ((thread_mask & SLAVE_SQL) == 0) // SQL线程属于停止状态
  {
    /* 
      如果需要purge relay log(没有指定relay log file、relay log pos),则进行purge relay log操作;
      purge relay log操作会破坏Relay Log Info中的复制位点,所以需要使用Master Info中的Master Log File和Master Log Pos初始化Relay Log Info;
      否则就检查relay log name是否在relay log index中
    */
    if (need_relay_log_purge) {
      const char *errmsg = nullptr;
      THD_STAGE_INFO(thd, stage_purging_old_relay_logs);
      if (mi->rli->purge_relay_logs(thd, &errmsg)) {
        error = ER_RELAY_LOG_FAIL;
        my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
        goto err;
      }
     if (!mi->is_receiver_position_info_invalid()) {   
        mi->rli->set_group_master_log_pos(mi->get_master_log_pos());    
        mi->rli->set_group_master_log_name(mi->get_master_log_name());  
        DBUG_PRINT("info", ("master_log_pos: %llu", mi->get_master_log_pos()));
      }
    } else {
      const char *errmsg = nullptr;
      // 
      if (mi->rli->is_group_relay_log_name_invalid(&errmsg)) { 
        error = ER_RELAY_LOG_INIT;
        my_error(ER_RELAY_LOG_INIT, MYF(0), errmsg);
        goto err;
      }
    }

    char *var_group_master_log_name =
        const_cast<char *>(mi->rli->get_group_master_log_name());

    // 如果没有指定master log name就将Relay Log Info的Pos设置为0
    if (!var_group_master_log_name[0] && 
        !mi->rli->is_applier_source_position_info_invalid())
      mi->rli->set_group_master_log_pos(0);

    // 中断SOURCE_POS_WAIT()操作
    mi->rli->abort_pos_wait++;  

    mi->rli->clear_error();
    if (mi->rli->workers_array_initialized) {
      for (size_t i = 0; i < mi->rli->get_worker_count(); i++) {
        mi->rli->get_worker(i)->clear_error();
      }
    }

    // 刷盘relay log info
    if (mi->rli->flush_info(Relay_log_info::RLI_FLUSH_IGNORE_SYNC_OPT |
                            Relay_log_info::RLI_FLUSH_IGNORE_GTID_ONLY)) {
      error = ER_RELAY_LOG_INIT;
      my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush relay info file.");
      goto err;
    }

  } 

  log_invalid_position_warning(thd, lex_mi, mi);

  // 如果不存在mts gaps,则清空worker info table
  if (mta_remove_worker_info)   
    if (Rpl_info_factory::reset_workers(mi->rli)) {
      error = ER_MTS_RESET_WORKERS;
      my_error(ER_MTS_RESET_WORKERS, MYF(0));
      goto err;
    }
err:
  // 解锁复制线程和Master Info
  unlock_slave_threads(mi);
  mi->channel_unlock();
  return error;
}