美文网首页
MySQL组复制提交流程

MySQL组复制提交流程

作者: 真之棒2016 | 来源:发表于2019-08-16 00:45 被阅读0次

author:sufei

版本:8.0.16


一、简介

 本文主要介绍MySQL 组复制的提交流程。在理解组复制的提交流程之前,建议先明白MySQL的事务提交过程,可参考《MySQL事务提交分析》。这里假设已经明白了MySQL服务层的事物提交过程。基于此,来分析讲解MySQL组复制的提交过程。

 文中首先给出整个组复制的提交流程框图,然后再结合源码进行分步细致讲解,希望对各位组复制源码阅读有所帮助。

二、组复制提交流程图

组复制提交

三、流程说明

3.1 广播事务

  1. 事务进入commit前,调用组复制的注册函数group_replication_trans_before_commit。有关该函数的注册是在group replication在组件初始化时,注册事务观察者
if (register_trans_observer(&trans_observer, (void *)lv.plugin_info_ptr)) 

 这样既可在befor_commit挂钩处进入group replication。进入组复制中相关的函数为:group_replication_trans_before_commit

  1. 检测复制通道类型,这里主要是区分一般提交,还是组复制applier channel 或者组复制recovery channel的提交。针对不同情况进入不同逻辑。

  2. 如果是一般性组提交,检测组复制该成员的状态信息,状态为MEMBER_ONLINE才能进行相关提交

  3. 生成transaction context event,这是一个新的event类型,内容主要包含事务的write set集合以及snapshot_version,主要用于冲突检测。

  • write set集合

write_set集合本质上事务修改行主键按照一定格式生成的hash值,这样同一行对应相同的hash值,从而检测组内两个不同的mysql实例提交的事务是否存在冲突。如果想详细了解可参考《WriteSet并行复制》

  • snapshot_version

其实就是gtid set,其值为提交时刻gtid_executed值。相关的生成代码在transaction context event构造函数中

if (mysql_bin_log.get_gtid_executed(sid_map, snapshot_version)) goto err;
  1. GTID event生成。注意这里仅仅是生成一个gtid event,用于构建事务消息,用于广播事务。其中sid和gid的值都为1。除非指定了gtid值,不然gtid值的生成阶段在certification冲突检测之后。相关的代码如下:
const bool is_gtid_specified = param->gtid_info.type == ASSIGNED_GTID;
Gtid gtid = {param->gtid_info.sidno, param->gtid_info.gno};
if (!is_gtid_specified) {
  // Dummy values that will be replaced after certification.
  gtid.sidno = 1;
  gtid.gno = 1;
}
  1. 序列化事务信息

 为了组内通信,需要将事务event信息进行序列化,后续过程中,组内成员收到后进行反序列化。其中信息包含:

  • transaction context event:其中包含write set和snapshot_version
  • GTID event
  • 其他event
    • QUERY_EVENT
    • MAP_EVENT
    • DML EVENT
    • XID_EVENT

 注意上述event的顺序,在分析应用阶段按照该顺序进行。下面是序列化相关代码:

//序列化transaction context event
tcle = new Transaction_context_log_event(param->server_uuid,
                                           is_dml || param->is_atomic_ddl,
                                           param->thread_id, is_gtid_specified);
binary_event_serialize(tcle, transaction_msg);
//序列化gtid event
gle = new Gtid_log_event(
      param->server_id, is_dml || param->is_atomic_ddl, 0, 1,
      may_have_sbr_stmts, *(param->original_commit_timestamp), 0,
      gtid_specification, *(param->original_server_version),
      *(param->immediate_server_version));
gle->set_trx_length_by_cache_size(cache_log_position);
//序列化其他event
cache_log->copy_to(transaction_msg) //copy binlog cache到序列化事务信息

 这里还有一点需要注意,其事务信息序列化结构体的创建是根据组复制事务一致性参数group_replication_consistency不同而不同,不同参数的影响可以参考文章《有关MySQL组复制的事务一致性参数理解》。具体代码

if (consistency_level < GROUP_REPLICATION_CONSISTENCY_AFTER) {
  transaction_msg = new Transaction_message();
} else {
  transaction_msg = new Transaction_with_guarantee_message(consistency_level);
}
  1. 在组复制事务锁系统中,注册事务ticket。以便阻塞提交流程,等待相关事务广播并通过冲突检测后释放。
if (transactions_latch->registerTicket(param->thread_id))
  1. 广播序列化事务信息
// Broadcast the Transaction Message
send_error = gcs_module->send_message(*transaction_msg);
  1. 阻塞等待事务ticket状态完成
if (transactions_latch->waitTicket(param->thread_id))

3.2 XCOM通信系统事件注册

 为了理解广播之后,组复制如何接受到相关事务信息,需要明白XCOM回调注册过程。以及相应的回调过程。首先我们来看一下其注册过程。

plugin_group_replication_init          //初始化组复制插件
|--plugin_group_replication_start      //启动组复制插件
   |--initialize_plugin_and_join       //加入组
      |--configure_group_communication //配置底层通信模块
         |--Gcs_xcom_interface::initialize_xcom  //初始化底层通信模块XCOM
            /*
            在底层XCOM模块注册相关回调函数,用于处理各种事件
            这里主要关注cb_xcom_receive_data函数,其用于处理事务数据接收
            */
            |--::set_xcom_data_receiver(cb_xcom_receive_data);
               ::set_xcom_local_view_receiver(cb_xcom_receive_local_view);
               ::set_xcom_global_view_receiver(cb_xcom_receive_global_view);
               ::set_port_matcher(cb_xcom_match_port);
               ::set_app_snap_handler(cb_xcom_handle_app_snap);
               ::set_should_exit_getter(cb_xcom_get_should_exit);
               ::set_app_snap_getter(cb_xcom_get_app_snap);
               ::set_xcom_run_cb(cb_xcom_ready);
               ::set_xcom_comms_cb(cb_xcom_comms);
               ::set_xcom_exit_cb(cb_xcom_exit);
               ::set_xcom_expel_cb(cb_xcom_expel);
               ::set_xcom_socket_accept_cb(cb_xcom_socket_accept);
               ::set_xcom_input_try_pop_cb(cb_xcom_input_try_pop);

 然后通过XCOM机制,在接收到事务事件后,广播给所有组成员,并通过调用cb_xcom_receive_data回调函数进行处理。最后调用Gcs_xcom_communication::notify_received_message函数进行信息处理。

void Gcs_xcom_communication::notify_received_message(Gcs_message *message) {
  map<int, const Gcs_communication_event_listener &>::iterator callback_it =
      event_listeners.begin();
  //遍历事件监听类的on_message_received函数,以处理相关事件
  while (callback_it != event_listeners.end()) {
    callback_it->second.on_message_received(*message);
    MYSQL_GCS_LOG_TRACE("Delivered message to client handler= %d",
                        (*callback_it).first)
    ++callback_it;
  }
  stats->update_message_received(
      (long)(message->get_message_data().get_header_length() +
             message->get_message_data().get_payload_length()));
  MYSQL_GCS_LOG_TRACE("Delivered message from origin= %s",
                      message->get_origin().get_member_id().c_str())
  delete message;
}

那么事件监听类的添加是何时添加的呢?

 这里是相关add listener添加事件监听类的调用栈如下:

plugin_group_replication_init           //初始化组复制插件
|--plugin_group_replication_start       //启动组复制插件
   |--initialize_plugin_and_join        //加入组
      |--start_group_communication      //开启组通信系统模块
         /* 
         在gcs上注册事件监听类communication_event_listener
         在gcs上注册view视图监听类view_notifier
         */
         |--Gcs_operations::join(&communication_event_listener,
                                 &control_event_listener,
                                 *view_notifier)
            |--gcs_control->add_event_listener(control_event_listener);
               gcs_communication->add_event_listener(communication_event_listener);
    

 最终初始化插件后,事件监听类为Plugin_gcs_events_handler,视图监听类Plugin_gcs_view_modification_notifier。最后的处理有事件监听类的on_message_received函数处理。后续的处理流程也是从该函数开始(图中第14步)。

3.3 事务处理

  1. 回调函数最终调用applier模块的handle函数,将事务信息打包放入incomming队列中,等待applier主程序的处理
  2. applier模块主线程循环取出incoming中的数据,进行分类处理
while (!applier_error && !packet_application_error && !loop_termination) {
    if (is_applier_thread_aborted()) break;

    this->incoming->front(&packet);  // 阻塞等待队列中的事务信息到来

    switch (packet->get_packet_type()) {  //根据信息类型不同,进行相应处理
      case ACTION_PACKET_TYPE:
        this->incoming->pop();
        loop_termination = apply_action_packet((Action_packet *)packet);
        break;
      case VIEW_CHANGE_PACKET_TYPE:   //view change
        packet_application_error = apply_view_change_packet(
            (View_change_packet *)packet, fde_evt, cont);
        this->incoming->pop();
        break;
      case DATA_PACKET_TYPE:          //一般事务数据包
        packet_application_error =
            apply_data_packet((Data_packet *)packet, fde_evt, cont);
        // 当事务处理完成后再去清除队列中的数据
        this->incoming->pop();
        break;
      ……
      default:
        DBUG_ASSERT(0); /* purecov: inspected */
    }
    delete packet;
  }
  1. 反序列化数据信息包,每个event分别进入event处理管道。
int Applier_module::apply_data_packet(Data_packet *data_packet,
                                      Format_description_log_event *fde_evt,
                                      Continuation *cont) {
  int error = 0;
  uchar *payload = data_packet->payload;
  uchar *payload_end = data_packet->payload + data_packet->len;
  /*
  循环处理每个event,event的顺序如下:
  transaction context event
  GTID event
  其他event
  */
  while ((payload != payload_end) && !error) {
    uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET);

    Data_packet *new_packet = new Data_packet(payload, event_len);
    payload = payload + event_len;

    std::list<Gcs_member_identifier> *online_members = NULL;
    if (NULL != data_packet->m_online_members) {
      online_members =
          new std::list<Gcs_member_identifier>(*data_packet->m_online_members);
    }
    //构建event管道处理类
    Pipeline_event *pevent =
        new Pipeline_event(new_packet, fde_evt, UNDEFINED_EVENT_MODIFIER,
                           data_packet->m_consistency_level, online_members);
    error = inject_event_into_pipeline(pevent, cont); //将event放入管道进行处理

    delete pevent;
    DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", {
      if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) {
        error = 1;
      }
    });
  }

  return error;
}
int Applier_module::inject_event_into_pipeline(Pipeline_event *pevent,
                                               Continuation *cont) {
  int error = 0;
  pipeline->handle_event(pevent, cont); //进入管道处理 

  if ((error = cont->wait())) //等待管道处理,并放回错误码
    LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_EVENT_HANDLING_ERROR, error);

  return error;
}

 我们已经知道了每个event都需要进入管道,进行相应的处理。并且也知道event进入顺序为:

  • transaction context event:其中包含write set和snapshot_version
  • GTID event
  • 其他event

 管道处理分为三步,详细见管道的设置函数configure_pipeline

switch (pipeline_type) {
  case STANDARD_GROUP_REPLICATION_PIPELINE:
      (*pipeline_conf) = new Handler_id[3];
      (*pipeline_conf)[0] = CATALOGING_HANDLER;
      (*pipeline_conf)[1] = CERTIFICATION_HANDLER;
      (*pipeline_conf)[2] = SQL_THREAD_APPLICATION_HANDLER;
      DBUG_RETURN(3);
  default:
      /* purecov: begin inspected */
      LogPluginErr(ERROR_LEVEL,
                   ER_GRP_RPL_UNKNOWN_GRP_RPL_APPLIER_PIPELINE_REQUESTED);
      /* purecov: end */
}

int configure_pipeline(Event_handler **pipeline, Handler_id handler_list[],
                       int num_handlers) {
  DBUG_ENTER("configure_pipeline(pipeline, handler_list[], num_handlers)");
  int error = 0;
  //循环设置管道链表,这样每个event可以通过该管道链表进行相应处理
  for (int i = 0; i < num_handlers; ++i) {
    Event_handler *handler = NULL;

    switch (handler_list[i]) {
      case CATALOGING_HANDLER:
        handler = new Event_cataloger();
        break;
      case CERTIFICATION_HANDLER:
        handler = new Certification_handler();
        break;
      case SQL_THREAD_APPLICATION_HANDLER:
        handler = new Applier_handler();
        break;
      default:
        /* purecov: begin inspected */
        error = 1;
        LogPluginErr(
            ERROR_LEVEL,
            ER_GRP_RPL_FAILED_TO_BOOTSTRAP_EVENT_HANDLING_INFRASTRUCTURE,
            handler_list[i]);
        /* purecov: end */
    }
    ...

    if ((error = handler->initialize())) {
      /* purecov: begin inspected */
      LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FAILED_TO_INIT_APPLIER_HANDLER);
      DBUG_RETURN(error);
      /* purecov: end */
    }

    // Add the handler to the pipeline
    Event_handler::append_handler(pipeline, handler); //添加到管道链表中
  }
  DBUG_RETURN(0);
}

  总结一下:每个事务event,按照顺序依次经过Event_cataloger,Certification_handler,Applier_handler处理。相应的处理函数为handle_event。

16-17. 该线路主要是针对transaction context event事件的管道处理逻辑,主要包含标记事务开始,以及存储transaction context以便后续冲突检测阶段使用

18-23. 该部分是组复制的核心,主要包含冲突检测,gtid生成,以及通过判断是否为本地事务进行不同处理。下面分析冲突检测核心步骤19进行分析。

冲突验证(Certification)

每次提交阶段,被广播到各成员的不仅包含事务修改的数据本身,还保证由行的主键hash值以及相应的数据库版本号信息组成的write set。版本信息告知了事务的修改是基于哪个版本进行的;主键hash值告知了事务具体修改了哪些行。通过上述信息,在冲突检测中,可以对有冲突的事物进行甄别。

数据库版本是由GTID_EXECUTED生成,更准确的说,事物在执行时从GTID_EXECUTED变量中获得的连续gtid中最后一个数组,比如:GTID_EXECUTED为GTID_EXECUTED: UUID:1-10, UUID:12。则服务器版本为10。

通过

如上图所示:这是三个成员组成的组复制。事物update首先在服务器s1上执行,直到提交阶段,广播write set和data到各成员;接下来进入冲突验证阶段,然后对事物中每个write set的版本号进行比较,如果相同write set版本号小于验证系统中的版本号,则验证冲突;如果认证系统中没有存在相应write set的版本号,说明无冲突。

对于上图,事物write set的版本号为1(dbv),验证系统对于write set版本号也为1(cv)。相应write set的版本一致,即事务没有与任何在线的事物冲突,本地事物允许提交,并将验证系统版本号更新为cv:2。接下来就是提交过程。

对于本地事物(s1):

​ 提交事务,并放回客户端成功;

对于远程事务(s2,s3):

​ 将事务日志放入relaylog中,等待后续Applier模块执行。

GTID生成

为了组复制有一个统一的全局id,GTID的生成由Certification模块控制。其UUID取自组复制的group_replication_group_name参数。下面是在认证函数中,有关GTID生成代码

//在Certifier::certify函数中,有如下一段代码
if (generate_group_id) {
    /*
     确保group_sidno在快照版本号中
    */
    if (snapshot_version->ensure_sidno(group_sidno) != RETURN_STATUS_OK) {
      LogPluginErr(
          ERROR_LEVEL,
          ER_GRP_RPL_UPDATE_TRANS_SNAPSHOT_VER_ERROR); /* purecov: inspected */
      goto end;                                        /* purecov: inspected */
    }
    //得到下一个GTID的gno
    result = get_group_next_available_gtid(member_uuid);
    if (result < 0) goto end;

    /*
      更新验证系统版本号,即cv
    */
    snapshot_version->_add_gtid(group_sidno, result);

    /*
      更新状态变量
    */
    last_conflict_free_transaction.set(group_gtid_sid_map_group_sidno, result);

    DBUG_PRINT("info", ("Group replication Certifier: generated transaction "
                        "identifier: %llu",
                        result));
  } else {
 
//最终gtid的gno生成使用函数
/*
该函数基于gtid_executed变量,获得下一个可用的gtid
参数:
    [start end] :得到的gno必须在该范围内
*/
rpl_gno Certifier::get_group_next_available_gtid_candidate(rpl_gno start,
                                                           rpl_gno end) const {
  DBUG_ENTER("Certifier::get_group_next_available_gtid_candidate");
  DBUG_ASSERT(start > 0);
  DBUG_ASSERT(start <= end);
  mysql_mutex_assert_owner(&LOCK_certification_info);

  rpl_gno candidate = start;  //从start开始查找
  //获得gtid_executed迭代器ivit
  Gtid_set::Const_interval_iterator ivit(certifying_already_applied_transactions
                                             ? group_gtid_extracted
                                             : group_gtid_executed,
                                         group_gtid_sid_map_group_sidno);
  /*
    遍历gtid_executed,找出最小可用gno
  */
  while (true) {
    DBUG_ASSERT(candidate >= start);
    const Gtid_set::Interval *iv = ivit.get(); //获得一个gtid区间
    rpl_gno next_interval_start = iv != NULL ? iv->start : MAX_GNO;//得到下一个区间开始个gno

    // 判断选出的gno是否在可用范围,即candidate(选出的gno)< next_interval_start 且不超过参数end
    if (candidate < next_interval_start) {
      if (candidate <= end)
        DBUG_RETURN(candidate);
      else
        DBUG_RETURN(-2);
    }
    //如果到最后还没找到合适的,则报错
    if (iv == NULL) {
      LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_CANT_GENERATE_GTID);
      DBUG_RETURN(-1);
    }
    //更新候选gno
    candidate = std::max(candidate, iv->end);
    ivit.next(); 
  }
}

上面已经分析了未发生冲突的情况,下面对于发生冲突情况,也进行简单说明:

冲突

如上图,两个事物在不同的服务器上对相同行进行了修改。整个过程如下:

  1. 基于组复制通信系统确保有序,事务T1(携带版本号为dbv:1)首先到达各服务器的验证系统中,从而各验证系统将cv:2修改为了cv:2,允许提交;
  2. 随后到来的T2事务(同样携带版本号dbv:1),了如果修改了相同的行,则在认证系统中,由于认证系统对于行的版本号已经改为了cv:2,故需要回滚;
  3. 服务器s2本地事务T2回滚,其他服务器忽略T2事务。

源码分析

上一小节说明了冲突检测过程,这一节结合源码详细说明。

/*
冲突检测主要函数,用于检测组复制中的事物冲突,以判断提交与否
参数:
    snapshot_version : 待检测事务的版本号
    write_set :待检测事务的write set链表
    generate_group_id : 是否需要生成GTID
    member_uuid : 待检测事务来自member的uuid
    gle : GTID EVENT指针
    local_transaction : 是否是本地事务
返回值:
    >0 通过冲突检测,返回gno
    =0 存在冲突,拒绝提交
    <0 error
*/
rpl_gno Certifier::certify(Gtid_set *snapshot_version,
                           std::list<const char *> *write_set,
                           bool generate_group_id, const char *member_uuid,
                           Gtid_log_event *gle, bool local_transaction) {
  DBUG_ENTER("Certifier::certify");
  rpl_gno result = 0;
  const bool has_write_set = !write_set->empty();

  if (!is_initialized()) DBUG_RETURN(-1); /* purecov: inspected */

  mysql_mutex_lock(&LOCK_certification_info);
  int64 transaction_last_committed = parallel_applier_last_committed_global;

  DBUG_EXECUTE_IF("certifier_force_1_negative_certification", {
    DBUG_SET("-d,certifier_force_1_negative_certification");
    goto end;
  });
  /*
  如果开启了冲突检测(单主模式没有冲突检测),进行相应的检测
  遍历待检测事务的write set,得到Certifier系统相应的版本号,检测如果满足下面两个添加,则冲突。
  1、Certifier系统相应的版本号不为null
  2、Certifier系统相应的版本号不是待检测事务版本号的子集
  */
  if (conflict_detection_enable) {
    for (std::list<const char *>::iterator it = write_set->begin();
         it != write_set->end(); ++it) {
      Gtid_set *certified_write_set_snapshot_version =
          get_certified_write_set_snapshot_version(*it);

      if (certified_write_set_snapshot_version != NULL &&
          !certified_write_set_snapshot_version->is_subset(snapshot_version))
        goto end;
    }
  }

  if (certifying_already_applied_transactions &&
      !group_gtid_extracted->is_subset_not_equals(group_gtid_executed)) {
    certifying_already_applied_transactions = false;
  }

  /*
    生成gtid
  */
  if (generate_group_id) { //自己生成组gtid
    if (snapshot_version->ensure_sidno(group_sidno) != RETURN_STATUS_OK) {
      LogPluginErr(
          ERROR_LEVEL,
          ER_GRP_RPL_UPDATE_TRANS_SNAPSHOT_VER_ERROR); /* purecov: inspected */
      goto end;                                        /* purecov: inspected */
    }

    result = get_group_next_available_gtid(member_uuid);
    if (result < 0) goto end;

    snapshot_version->_add_gtid(group_sidno, result);
    last_conflict_free_transaction.set(group_gtid_sid_map_group_sidno, result);

    DBUG_PRINT("info", ("Group replication Certifier: generated transaction "
                        "identifier: %llu",
                        result));
  } else { //已经存在gtid
    ……
  }

  /*
    更新认证系统中的版本号Add 通过检测事务的write set到certification info中
  */
  if (has_write_set) {
    // Only consider remote transactions for parallel applier indexes.
    int64 transaction_sequence_number =
        local_transaction ? -1 : parallel_applier_sequence_number;
    Gtid_set_ref *snapshot_version_value = new Gtid_set_ref(
        certification_info_sid_map, transaction_sequence_number);
    if (snapshot_version_value->add_gtid_set(snapshot_version) !=
        RETURN_STATUS_OK) {
      result = 0;                    /* purecov: inspected */
      delete snapshot_version_value; /* purecov: inspected */
      LogPluginErr(
          ERROR_LEVEL,
          ER_GRP_RPL_UPDATE_TRANS_SNAPSHOT_REF_VER_ERROR); /* purecov: inspected
                                                            */
      goto end; /* purecov: inspected */
    }

    for (std::list<const char *>::iterator it = write_set->begin();
         it != write_set->end(); ++it) {
      int64 item_previous_sequence_number = -1;
      //真正更新cv版本信息
      add_item(*it, snapshot_version_value, &item_previous_sequence_number);

      /*
        Exclude previous sequence number that are smaller than global
        last committed and that are the current sequence number.
        transaction_last_committed is initialized with
        parallel_applier_last_committed_global on the beginning of
        this method.
      */
      if (item_previous_sequence_number > transaction_last_committed &&
          item_previous_sequence_number != parallel_applier_sequence_number)
        transaction_last_committed = item_previous_sequence_number;
    }
  }

  /*
    非本地事务,则修改GTID event中的last commit,以便在应用时,并行回放,加快速度
  */
  if (!local_transaction) {
    if (!has_write_set) {
      //没有write set,说明是ddl语句,则不能与其他事务并行回放。
      transaction_last_committed = parallel_applier_sequence_number - 1;
    }
    gle->last_committed = transaction_last_committed;
    gle->sequence_number = parallel_applier_sequence_number;
    DBUG_ASSERT(gle->last_committed >= 0);
    DBUG_ASSERT(gle->sequence_number > 0);
    DBUG_ASSERT(gle->last_committed < gle->sequence_number);
    increment_parallel_applier_sequence_number(!has_write_set);
  }

end:
  update_certified_transaction_count(result > 0, local_transaction);//更新状态变量

  mysql_mutex_unlock(&LOCK_certification_info);
  DBUG_PRINT("info", ("Group replication Certifier: certification result: %llu",
                      result));
  DBUG_RETURN(result);
}

本地事务

 如果通过了冲突检测,对于本地事务,mysql仅仅需要通知管道处理结束,唤醒事务等待即可。

if ((seq_number <= 0 || pevent->get_consistency_level() <
                              GROUP_REPLICATION_CONSISTENCY_AFTER) &&
      transactions_latch->releaseTicket(tcle->get_thread_id())) {  //唤醒等待的事务ticket
      /* purecov: begin inspected */
      LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_NOTIFY_CERTIFICATION_OUTCOME_FAILED);
      cont->signal(1, true);
      error = 1;
      goto end;
      /* purecov: end */
}
// 通知管道处理结束
cont->signal(0, true);

 当然,如果一致性级别大于AFTER,需要等待其他组成员进入提交阶段,相关代码如下:

//事务一致性级别大于AFTER
if (pevent->get_consistency_level() >= GROUP_REPLICATION_CONSISTENCY_AFTER) {
    //构建事务一致性信息
    Transaction_consistency_info *transaction_consistency_info =
            new Transaction_consistency_info(
                tcle->get_thread_id(), local_transaction, sid, sidno, gno,
                pevent->get_consistency_level(), pevent->get_online_members());
    pevent->release_online_members_memory_ownership();
    //成员之间协调将从这一点开始
    if (transaction_consistency_manager->after_certification(
                                           transaction_consistency_info)) {
      /* purecov: begin inspected */
      delete transaction_consistency_info;
      cont->signal(1, true);
      error = 1;
      goto end;
      /* purecov: end */
    }
}

远端事务

 对于远端事务,如果一致性级别大于AFTER,同样需要位置点同步。

if (pevent->get_consistency_level() >=
          GROUP_REPLICATION_CONSISTENCY_AFTER) {
        Transaction_consistency_info *transaction_consistency_info =
            new Transaction_consistency_info(
                tcle->get_thread_id(), local_transaction, sid, sidno, gno,
                pevent->get_consistency_level(), pevent->get_online_members());
        pevent->release_online_members_memory_ownership();
        if (transaction_consistency_manager->after_certification(
                transaction_consistency_info)) {
          /* purecov: begin inspected */
          delete transaction_consistency_info;
          cont->signal(1, true);
          error = 1;
          goto end;
          /* purecov: end */
        }

 与本地事务不同的是,对于本地事务,其调用transactions_latch->releaseTicket(tcle->get_thread_id())唤醒等待提交,进行接下来的提交过程。远端事务则是调用next函数,进入管道处理的下一阶段。

next(pevent, cont);   //进入下一阶段

23-25. 本质上就是将gtid event 和 事务其他event(不包含transaction context event)写入relaylog,以便applier复制通道的回放线程进行回放。相关代码在Applier_handler::handle_event中,如下

int Applier_handler::handle_event(Pipeline_event *event, Continuation *cont) {
  DBUG_ENTER("Applier_handler::handle_event");
  int error = 0;

  Data_packet *p = NULL;
  error = event->get_Packet(&p);
  DBUG_EXECUTE_IF("applier_handler_force_error_on_pipeline", error = 1;);
  if (error || (p == NULL)) {
    LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_TRANS_DATA_FAILED);
    error = 1;
    goto end;
  }

  /*
    There is no need to queue Transaction_context_log_event to
    server applier, this event is only need for certification,
    performed on the previous handler.
  */
  if (event->get_event_type() != binary_log::TRANSACTION_CONTEXT_EVENT) {
    //最终调用queue_event函数,将event放入relaylog进行回放
    error = channel_interface.queue_packet((const char *)p->payload, p->len); 

    if (event->get_event_type() == binary_log::GTID_LOG_EVENT &&
        local_member_info->get_recovery_status() ==
            Group_member_info::MEMBER_ONLINE) {
      applier_module->get_pipeline_stats_member_collector()
          ->increment_transactions_waiting_apply();
    }
  }
end:
  if (error)
    cont->signal(error); 
  else
    next(event, cont);

  DBUG_RETURN(error);
}

相关文章

  • MySQL组复制提交流程

    author:sufei 版本:8.0.16 一、简介  本文主要介绍MySQL 组复制的提交流程。在理解组复制的...

  • MySQL原生复制概述

    mysql5.7相对mysql5.6增加了多线程回放 复制流程 针对主库 Master提交事务GTID增加,主库b...

  • 数据库常问面试题

    [TOC] 1. mysql的复制原理以及流程 Mysql的复制原理以及流程 第一步Master记录二进制日志, ...

  • MySQL 主从复制

    1、复制概述 MySQL的复制原理大致如下: (1)首先,MySQL主库在事务提交时会把数据变更作为事件Event...

  • MySQL深入理解

    1、MySQL的复制原理以及流程(1)、复制基本原理流程 参考地址:https://www.cnblogs.com...

  • Mysql 异步/同步/半同步复制

    Mysql复制 异步 逻辑 MySQL 默认的复制就是异步的,主库再执行完客户端提交的事务后会立即将结果返回给客户...

  • MySQL MGR集群原理及实践

    一、MGR简介 MGR全称MySQL Group Replication(Mysql组复制),是MySQL官方于2...

  • MySQL MGR集群实践

    1、MGR说明 MGR全称MySQL Group Replication(Mysql组复制),是MySQL官方于2...

  • MySQL并发复制系列一:binlog组提交

    原文链接 MySQL Binary log在MySQL 5.1版本后推出主要用于主备复制的搭建,我们回顾下MySQ...

  • MySQL组提交

    RedoLog 写入机制 BinLog 写入机制 日志安全模式 组提交 参数配置 相关

网友评论

      本文标题:MySQL组复制提交流程

      本文链接:https://www.haomeiwen.com/subject/mnsxsctx.html