author:sufei
版本:8.0.16
一、简介
本文主要介绍MySQL 组复制的提交流程。在理解组复制的提交流程之前,建议先明白MySQL的事务提交过程,可参考《MySQL事务提交分析》。这里假设已经明白了MySQL服务层的事物提交过程。基于此,来分析讲解MySQL组复制的提交过程。
文中首先给出整个组复制的提交流程框图,然后再结合源码进行分步细致讲解,希望对各位组复制源码阅读有所帮助。
二、组复制提交流程图
组复制提交三、流程说明
3.1 广播事务
- 事务进入commit前,调用组复制的注册函数group_replication_trans_before_commit。有关该函数的注册是在group replication在组件初始化时,注册事务观察者
if (register_trans_observer(&trans_observer, (void *)lv.plugin_info_ptr))
这样既可在befor_commit挂钩处进入group replication。进入组复制中相关的函数为:group_replication_trans_before_commit
-
检测复制通道类型,这里主要是区分一般提交,还是组复制applier channel 或者组复制recovery channel的提交。针对不同情况进入不同逻辑。
-
如果是一般性组提交,检测组复制该成员的状态信息,状态为MEMBER_ONLINE才能进行相关提交
-
生成transaction context event,这是一个新的event类型,内容主要包含事务的write set集合以及snapshot_version,主要用于冲突检测。
- write set集合
write_set集合本质上事务修改行主键按照一定格式生成的hash值,这样同一行对应相同的hash值,从而检测组内两个不同的mysql实例提交的事务是否存在冲突。如果想详细了解可参考《WriteSet并行复制》
- snapshot_version
其实就是gtid set,其值为提交时刻gtid_executed值。相关的生成代码在transaction context event构造函数中
if (mysql_bin_log.get_gtid_executed(sid_map, snapshot_version)) goto err;
- GTID event生成。注意这里仅仅是生成一个gtid event,用于构建事务消息,用于广播事务。其中sid和gid的值都为1。除非指定了gtid值,不然gtid值的生成阶段在certification冲突检测之后。相关的代码如下:
const bool is_gtid_specified = param->gtid_info.type == ASSIGNED_GTID;
Gtid gtid = {param->gtid_info.sidno, param->gtid_info.gno};
if (!is_gtid_specified) {
// Dummy values that will be replaced after certification.
gtid.sidno = 1;
gtid.gno = 1;
}
- 序列化事务信息
为了组内通信,需要将事务event信息进行序列化,后续过程中,组内成员收到后进行反序列化。其中信息包含:
- transaction context event:其中包含write set和snapshot_version
- GTID event
- 其他event
- QUERY_EVENT
- MAP_EVENT
- DML EVENT
- XID_EVENT
注意上述event的顺序,在分析应用阶段按照该顺序进行。下面是序列化相关代码:
//序列化transaction context event
tcle = new Transaction_context_log_event(param->server_uuid,
is_dml || param->is_atomic_ddl,
param->thread_id, is_gtid_specified);
binary_event_serialize(tcle, transaction_msg);
//序列化gtid event
gle = new Gtid_log_event(
param->server_id, is_dml || param->is_atomic_ddl, 0, 1,
may_have_sbr_stmts, *(param->original_commit_timestamp), 0,
gtid_specification, *(param->original_server_version),
*(param->immediate_server_version));
gle->set_trx_length_by_cache_size(cache_log_position);
//序列化其他event
cache_log->copy_to(transaction_msg) //copy binlog cache到序列化事务信息
这里还有一点需要注意,其事务信息序列化结构体的创建是根据组复制事务一致性参数group_replication_consistency不同而不同,不同参数的影响可以参考文章《有关MySQL组复制的事务一致性参数理解》。具体代码
if (consistency_level < GROUP_REPLICATION_CONSISTENCY_AFTER) {
transaction_msg = new Transaction_message();
} else {
transaction_msg = new Transaction_with_guarantee_message(consistency_level);
}
- 在组复制事务锁系统中,注册事务ticket。以便阻塞提交流程,等待相关事务广播并通过冲突检测后释放。
if (transactions_latch->registerTicket(param->thread_id))
- 广播序列化事务信息
// Broadcast the Transaction Message
send_error = gcs_module->send_message(*transaction_msg);
- 阻塞等待事务ticket状态完成
if (transactions_latch->waitTicket(param->thread_id))
3.2 XCOM通信系统事件注册
为了理解广播之后,组复制如何接受到相关事务信息,需要明白XCOM回调注册过程。以及相应的回调过程。首先我们来看一下其注册过程。
plugin_group_replication_init //初始化组复制插件
|--plugin_group_replication_start //启动组复制插件
|--initialize_plugin_and_join //加入组
|--configure_group_communication //配置底层通信模块
|--Gcs_xcom_interface::initialize_xcom //初始化底层通信模块XCOM
/*
在底层XCOM模块注册相关回调函数,用于处理各种事件
这里主要关注cb_xcom_receive_data函数,其用于处理事务数据接收
*/
|--::set_xcom_data_receiver(cb_xcom_receive_data);
::set_xcom_local_view_receiver(cb_xcom_receive_local_view);
::set_xcom_global_view_receiver(cb_xcom_receive_global_view);
::set_port_matcher(cb_xcom_match_port);
::set_app_snap_handler(cb_xcom_handle_app_snap);
::set_should_exit_getter(cb_xcom_get_should_exit);
::set_app_snap_getter(cb_xcom_get_app_snap);
::set_xcom_run_cb(cb_xcom_ready);
::set_xcom_comms_cb(cb_xcom_comms);
::set_xcom_exit_cb(cb_xcom_exit);
::set_xcom_expel_cb(cb_xcom_expel);
::set_xcom_socket_accept_cb(cb_xcom_socket_accept);
::set_xcom_input_try_pop_cb(cb_xcom_input_try_pop);
然后通过XCOM机制,在接收到事务事件后,广播给所有组成员,并通过调用cb_xcom_receive_data
回调函数进行处理。最后调用Gcs_xcom_communication::notify_received_message
函数进行信息处理。
void Gcs_xcom_communication::notify_received_message(Gcs_message *message) {
map<int, const Gcs_communication_event_listener &>::iterator callback_it =
event_listeners.begin();
//遍历事件监听类的on_message_received函数,以处理相关事件
while (callback_it != event_listeners.end()) {
callback_it->second.on_message_received(*message);
MYSQL_GCS_LOG_TRACE("Delivered message to client handler= %d",
(*callback_it).first)
++callback_it;
}
stats->update_message_received(
(long)(message->get_message_data().get_header_length() +
message->get_message_data().get_payload_length()));
MYSQL_GCS_LOG_TRACE("Delivered message from origin= %s",
message->get_origin().get_member_id().c_str())
delete message;
}
那么事件监听类的添加是何时添加的呢?
这里是相关add listener添加事件监听类的调用栈如下:
plugin_group_replication_init //初始化组复制插件
|--plugin_group_replication_start //启动组复制插件
|--initialize_plugin_and_join //加入组
|--start_group_communication //开启组通信系统模块
/*
在gcs上注册事件监听类communication_event_listener
在gcs上注册view视图监听类view_notifier
*/
|--Gcs_operations::join(&communication_event_listener,
&control_event_listener,
*view_notifier)
|--gcs_control->add_event_listener(control_event_listener);
gcs_communication->add_event_listener(communication_event_listener);
最终初始化插件后,事件监听类为Plugin_gcs_events_handler
,视图监听类Plugin_gcs_view_modification_notifier
。最后的处理有事件监听类的on_message_received函数处理。后续的处理流程也是从该函数开始(图中第14步)。
3.3 事务处理
- 回调函数最终调用applier模块的handle函数,将事务信息打包放入incomming队列中,等待applier主程序的处理
- applier模块主线程循环取出incoming中的数据,进行分类处理
while (!applier_error && !packet_application_error && !loop_termination) {
if (is_applier_thread_aborted()) break;
this->incoming->front(&packet); // 阻塞等待队列中的事务信息到来
switch (packet->get_packet_type()) { //根据信息类型不同,进行相应处理
case ACTION_PACKET_TYPE:
this->incoming->pop();
loop_termination = apply_action_packet((Action_packet *)packet);
break;
case VIEW_CHANGE_PACKET_TYPE: //view change
packet_application_error = apply_view_change_packet(
(View_change_packet *)packet, fde_evt, cont);
this->incoming->pop();
break;
case DATA_PACKET_TYPE: //一般事务数据包
packet_application_error =
apply_data_packet((Data_packet *)packet, fde_evt, cont);
// 当事务处理完成后再去清除队列中的数据
this->incoming->pop();
break;
……
default:
DBUG_ASSERT(0); /* purecov: inspected */
}
delete packet;
}
- 反序列化数据信息包,每个event分别进入event处理管道。
int Applier_module::apply_data_packet(Data_packet *data_packet,
Format_description_log_event *fde_evt,
Continuation *cont) {
int error = 0;
uchar *payload = data_packet->payload;
uchar *payload_end = data_packet->payload + data_packet->len;
/*
循环处理每个event,event的顺序如下:
transaction context event
GTID event
其他event
*/
while ((payload != payload_end) && !error) {
uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET);
Data_packet *new_packet = new Data_packet(payload, event_len);
payload = payload + event_len;
std::list<Gcs_member_identifier> *online_members = NULL;
if (NULL != data_packet->m_online_members) {
online_members =
new std::list<Gcs_member_identifier>(*data_packet->m_online_members);
}
//构建event管道处理类
Pipeline_event *pevent =
new Pipeline_event(new_packet, fde_evt, UNDEFINED_EVENT_MODIFIER,
data_packet->m_consistency_level, online_members);
error = inject_event_into_pipeline(pevent, cont); //将event放入管道进行处理
delete pevent;
DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", {
if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) {
error = 1;
}
});
}
return error;
}
int Applier_module::inject_event_into_pipeline(Pipeline_event *pevent,
Continuation *cont) {
int error = 0;
pipeline->handle_event(pevent, cont); //进入管道处理
if ((error = cont->wait())) //等待管道处理,并放回错误码
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_EVENT_HANDLING_ERROR, error);
return error;
}
我们已经知道了每个event都需要进入管道,进行相应的处理。并且也知道event进入顺序为:
- transaction context event:其中包含write set和snapshot_version
- GTID event
- 其他event
管道处理分为三步,详细见管道的设置函数configure_pipeline
:
switch (pipeline_type) {
case STANDARD_GROUP_REPLICATION_PIPELINE:
(*pipeline_conf) = new Handler_id[3];
(*pipeline_conf)[0] = CATALOGING_HANDLER;
(*pipeline_conf)[1] = CERTIFICATION_HANDLER;
(*pipeline_conf)[2] = SQL_THREAD_APPLICATION_HANDLER;
DBUG_RETURN(3);
default:
/* purecov: begin inspected */
LogPluginErr(ERROR_LEVEL,
ER_GRP_RPL_UNKNOWN_GRP_RPL_APPLIER_PIPELINE_REQUESTED);
/* purecov: end */
}
int configure_pipeline(Event_handler **pipeline, Handler_id handler_list[],
int num_handlers) {
DBUG_ENTER("configure_pipeline(pipeline, handler_list[], num_handlers)");
int error = 0;
//循环设置管道链表,这样每个event可以通过该管道链表进行相应处理
for (int i = 0; i < num_handlers; ++i) {
Event_handler *handler = NULL;
switch (handler_list[i]) {
case CATALOGING_HANDLER:
handler = new Event_cataloger();
break;
case CERTIFICATION_HANDLER:
handler = new Certification_handler();
break;
case SQL_THREAD_APPLICATION_HANDLER:
handler = new Applier_handler();
break;
default:
/* purecov: begin inspected */
error = 1;
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_FAILED_TO_BOOTSTRAP_EVENT_HANDLING_INFRASTRUCTURE,
handler_list[i]);
/* purecov: end */
}
...
if ((error = handler->initialize())) {
/* purecov: begin inspected */
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FAILED_TO_INIT_APPLIER_HANDLER);
DBUG_RETURN(error);
/* purecov: end */
}
// Add the handler to the pipeline
Event_handler::append_handler(pipeline, handler); //添加到管道链表中
}
DBUG_RETURN(0);
}
总结一下:每个事务event,按照顺序依次经过Event_cataloger,Certification_handler,Applier_handler处理。相应的处理函数为handle_event。
16-17. 该线路主要是针对transaction context event事件的管道处理逻辑,主要包含标记事务开始,以及存储transaction context以便后续冲突检测阶段使用
18-23. 该部分是组复制的核心,主要包含冲突检测,gtid生成,以及通过判断是否为本地事务进行不同处理。下面分析冲突检测核心步骤19进行分析。
冲突验证(Certification)
每次提交阶段,被广播到各成员的不仅包含事务修改的数据本身,还保证由行的主键hash值以及相应的数据库版本号信息组成的write set。版本信息告知了事务的修改是基于哪个版本进行的;主键hash值告知了事务具体修改了哪些行。通过上述信息,在冲突检测中,可以对有冲突的事物进行甄别。
通过数据库版本是由GTID_EXECUTED生成,更准确的说,事物在执行时从GTID_EXECUTED变量中获得的连续gtid中最后一个数组,比如:GTID_EXECUTED为GTID_EXECUTED: UUID:1-10, UUID:12。则服务器版本为10。
如上图所示:这是三个成员组成的组复制。事物update首先在服务器s1上执行,直到提交阶段,广播write set和data到各成员;接下来进入冲突验证阶段,然后对事物中每个write set的版本号进行比较,如果相同write set版本号小于验证系统中的版本号,则验证冲突;如果认证系统中没有存在相应write set的版本号,说明无冲突。
对于上图,事物write set的版本号为1(dbv),验证系统对于write set版本号也为1(cv)。相应write set的版本一致,即事务没有与任何在线的事物冲突,本地事物允许提交,并将验证系统版本号更新为cv:2。接下来就是提交过程。
对于本地事物(s1):
提交事务,并放回客户端成功;
对于远程事务(s2,s3):
将事务日志放入relaylog中,等待后续Applier模块执行。
GTID生成
为了组复制有一个统一的全局id,GTID的生成由Certification模块控制。其UUID取自组复制的group_replication_group_name参数。下面是在认证函数中,有关GTID生成代码
//在Certifier::certify函数中,有如下一段代码
if (generate_group_id) {
/*
确保group_sidno在快照版本号中
*/
if (snapshot_version->ensure_sidno(group_sidno) != RETURN_STATUS_OK) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_UPDATE_TRANS_SNAPSHOT_VER_ERROR); /* purecov: inspected */
goto end; /* purecov: inspected */
}
//得到下一个GTID的gno
result = get_group_next_available_gtid(member_uuid);
if (result < 0) goto end;
/*
更新验证系统版本号,即cv
*/
snapshot_version->_add_gtid(group_sidno, result);
/*
更新状态变量
*/
last_conflict_free_transaction.set(group_gtid_sid_map_group_sidno, result);
DBUG_PRINT("info", ("Group replication Certifier: generated transaction "
"identifier: %llu",
result));
} else {
//最终gtid的gno生成使用函数
/*
该函数基于gtid_executed变量,获得下一个可用的gtid
参数:
[start end] :得到的gno必须在该范围内
*/
rpl_gno Certifier::get_group_next_available_gtid_candidate(rpl_gno start,
rpl_gno end) const {
DBUG_ENTER("Certifier::get_group_next_available_gtid_candidate");
DBUG_ASSERT(start > 0);
DBUG_ASSERT(start <= end);
mysql_mutex_assert_owner(&LOCK_certification_info);
rpl_gno candidate = start; //从start开始查找
//获得gtid_executed迭代器ivit
Gtid_set::Const_interval_iterator ivit(certifying_already_applied_transactions
? group_gtid_extracted
: group_gtid_executed,
group_gtid_sid_map_group_sidno);
/*
遍历gtid_executed,找出最小可用gno
*/
while (true) {
DBUG_ASSERT(candidate >= start);
const Gtid_set::Interval *iv = ivit.get(); //获得一个gtid区间
rpl_gno next_interval_start = iv != NULL ? iv->start : MAX_GNO;//得到下一个区间开始个gno
// 判断选出的gno是否在可用范围,即candidate(选出的gno)< next_interval_start 且不超过参数end
if (candidate < next_interval_start) {
if (candidate <= end)
DBUG_RETURN(candidate);
else
DBUG_RETURN(-2);
}
//如果到最后还没找到合适的,则报错
if (iv == NULL) {
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_CANT_GENERATE_GTID);
DBUG_RETURN(-1);
}
//更新候选gno
candidate = std::max(candidate, iv->end);
ivit.next();
}
}
上面已经分析了未发生冲突的情况,下面对于发生冲突情况,也进行简单说明:
冲突如上图,两个事物在不同的服务器上对相同行进行了修改。整个过程如下:
- 基于组复制通信系统确保有序,事务T1(携带版本号为dbv:1)首先到达各服务器的验证系统中,从而各验证系统将cv:2修改为了cv:2,允许提交;
- 随后到来的T2事务(同样携带版本号dbv:1),了如果修改了相同的行,则在认证系统中,由于认证系统对于行的版本号已经改为了cv:2,故需要回滚;
- 服务器s2本地事务T2回滚,其他服务器忽略T2事务。
源码分析
上一小节说明了冲突检测过程,这一节结合源码详细说明。
/*
冲突检测主要函数,用于检测组复制中的事物冲突,以判断提交与否
参数:
snapshot_version : 待检测事务的版本号
write_set :待检测事务的write set链表
generate_group_id : 是否需要生成GTID
member_uuid : 待检测事务来自member的uuid
gle : GTID EVENT指针
local_transaction : 是否是本地事务
返回值:
>0 通过冲突检测,返回gno
=0 存在冲突,拒绝提交
<0 error
*/
rpl_gno Certifier::certify(Gtid_set *snapshot_version,
std::list<const char *> *write_set,
bool generate_group_id, const char *member_uuid,
Gtid_log_event *gle, bool local_transaction) {
DBUG_ENTER("Certifier::certify");
rpl_gno result = 0;
const bool has_write_set = !write_set->empty();
if (!is_initialized()) DBUG_RETURN(-1); /* purecov: inspected */
mysql_mutex_lock(&LOCK_certification_info);
int64 transaction_last_committed = parallel_applier_last_committed_global;
DBUG_EXECUTE_IF("certifier_force_1_negative_certification", {
DBUG_SET("-d,certifier_force_1_negative_certification");
goto end;
});
/*
如果开启了冲突检测(单主模式没有冲突检测),进行相应的检测
遍历待检测事务的write set,得到Certifier系统相应的版本号,检测如果满足下面两个添加,则冲突。
1、Certifier系统相应的版本号不为null
2、Certifier系统相应的版本号不是待检测事务版本号的子集
*/
if (conflict_detection_enable) {
for (std::list<const char *>::iterator it = write_set->begin();
it != write_set->end(); ++it) {
Gtid_set *certified_write_set_snapshot_version =
get_certified_write_set_snapshot_version(*it);
if (certified_write_set_snapshot_version != NULL &&
!certified_write_set_snapshot_version->is_subset(snapshot_version))
goto end;
}
}
if (certifying_already_applied_transactions &&
!group_gtid_extracted->is_subset_not_equals(group_gtid_executed)) {
certifying_already_applied_transactions = false;
}
/*
生成gtid
*/
if (generate_group_id) { //自己生成组gtid
if (snapshot_version->ensure_sidno(group_sidno) != RETURN_STATUS_OK) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_UPDATE_TRANS_SNAPSHOT_VER_ERROR); /* purecov: inspected */
goto end; /* purecov: inspected */
}
result = get_group_next_available_gtid(member_uuid);
if (result < 0) goto end;
snapshot_version->_add_gtid(group_sidno, result);
last_conflict_free_transaction.set(group_gtid_sid_map_group_sidno, result);
DBUG_PRINT("info", ("Group replication Certifier: generated transaction "
"identifier: %llu",
result));
} else { //已经存在gtid
……
}
/*
更新认证系统中的版本号Add 通过检测事务的write set到certification info中
*/
if (has_write_set) {
// Only consider remote transactions for parallel applier indexes.
int64 transaction_sequence_number =
local_transaction ? -1 : parallel_applier_sequence_number;
Gtid_set_ref *snapshot_version_value = new Gtid_set_ref(
certification_info_sid_map, transaction_sequence_number);
if (snapshot_version_value->add_gtid_set(snapshot_version) !=
RETURN_STATUS_OK) {
result = 0; /* purecov: inspected */
delete snapshot_version_value; /* purecov: inspected */
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_UPDATE_TRANS_SNAPSHOT_REF_VER_ERROR); /* purecov: inspected
*/
goto end; /* purecov: inspected */
}
for (std::list<const char *>::iterator it = write_set->begin();
it != write_set->end(); ++it) {
int64 item_previous_sequence_number = -1;
//真正更新cv版本信息
add_item(*it, snapshot_version_value, &item_previous_sequence_number);
/*
Exclude previous sequence number that are smaller than global
last committed and that are the current sequence number.
transaction_last_committed is initialized with
parallel_applier_last_committed_global on the beginning of
this method.
*/
if (item_previous_sequence_number > transaction_last_committed &&
item_previous_sequence_number != parallel_applier_sequence_number)
transaction_last_committed = item_previous_sequence_number;
}
}
/*
非本地事务,则修改GTID event中的last commit,以便在应用时,并行回放,加快速度
*/
if (!local_transaction) {
if (!has_write_set) {
//没有write set,说明是ddl语句,则不能与其他事务并行回放。
transaction_last_committed = parallel_applier_sequence_number - 1;
}
gle->last_committed = transaction_last_committed;
gle->sequence_number = parallel_applier_sequence_number;
DBUG_ASSERT(gle->last_committed >= 0);
DBUG_ASSERT(gle->sequence_number > 0);
DBUG_ASSERT(gle->last_committed < gle->sequence_number);
increment_parallel_applier_sequence_number(!has_write_set);
}
end:
update_certified_transaction_count(result > 0, local_transaction);//更新状态变量
mysql_mutex_unlock(&LOCK_certification_info);
DBUG_PRINT("info", ("Group replication Certifier: certification result: %llu",
result));
DBUG_RETURN(result);
}
本地事务
如果通过了冲突检测,对于本地事务,mysql仅仅需要通知管道处理结束,唤醒事务等待即可。
if ((seq_number <= 0 || pevent->get_consistency_level() <
GROUP_REPLICATION_CONSISTENCY_AFTER) &&
transactions_latch->releaseTicket(tcle->get_thread_id())) { //唤醒等待的事务ticket
/* purecov: begin inspected */
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_NOTIFY_CERTIFICATION_OUTCOME_FAILED);
cont->signal(1, true);
error = 1;
goto end;
/* purecov: end */
}
// 通知管道处理结束
cont->signal(0, true);
当然,如果一致性级别大于AFTER,需要等待其他组成员进入提交阶段,相关代码如下:
//事务一致性级别大于AFTER
if (pevent->get_consistency_level() >= GROUP_REPLICATION_CONSISTENCY_AFTER) {
//构建事务一致性信息
Transaction_consistency_info *transaction_consistency_info =
new Transaction_consistency_info(
tcle->get_thread_id(), local_transaction, sid, sidno, gno,
pevent->get_consistency_level(), pevent->get_online_members());
pevent->release_online_members_memory_ownership();
//成员之间协调将从这一点开始
if (transaction_consistency_manager->after_certification(
transaction_consistency_info)) {
/* purecov: begin inspected */
delete transaction_consistency_info;
cont->signal(1, true);
error = 1;
goto end;
/* purecov: end */
}
}
远端事务
对于远端事务,如果一致性级别大于AFTER,同样需要位置点同步。
if (pevent->get_consistency_level() >=
GROUP_REPLICATION_CONSISTENCY_AFTER) {
Transaction_consistency_info *transaction_consistency_info =
new Transaction_consistency_info(
tcle->get_thread_id(), local_transaction, sid, sidno, gno,
pevent->get_consistency_level(), pevent->get_online_members());
pevent->release_online_members_memory_ownership();
if (transaction_consistency_manager->after_certification(
transaction_consistency_info)) {
/* purecov: begin inspected */
delete transaction_consistency_info;
cont->signal(1, true);
error = 1;
goto end;
/* purecov: end */
}
与本地事务不同的是,对于本地事务,其调用transactions_latch->releaseTicket(tcle->get_thread_id())唤醒等待提交,进行接下来的提交过程。远端事务则是调用next函数,进入管道处理的下一阶段。
next(pevent, cont); //进入下一阶段
23-25. 本质上就是将gtid event 和 事务其他event(不包含transaction context event)写入relaylog,以便applier复制通道的回放线程进行回放。相关代码在Applier_handler::handle_event中,如下
int Applier_handler::handle_event(Pipeline_event *event, Continuation *cont) {
DBUG_ENTER("Applier_handler::handle_event");
int error = 0;
Data_packet *p = NULL;
error = event->get_Packet(&p);
DBUG_EXECUTE_IF("applier_handler_force_error_on_pipeline", error = 1;);
if (error || (p == NULL)) {
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_TRANS_DATA_FAILED);
error = 1;
goto end;
}
/*
There is no need to queue Transaction_context_log_event to
server applier, this event is only need for certification,
performed on the previous handler.
*/
if (event->get_event_type() != binary_log::TRANSACTION_CONTEXT_EVENT) {
//最终调用queue_event函数,将event放入relaylog进行回放
error = channel_interface.queue_packet((const char *)p->payload, p->len);
if (event->get_event_type() == binary_log::GTID_LOG_EVENT &&
local_member_info->get_recovery_status() ==
Group_member_info::MEMBER_ONLINE) {
applier_module->get_pipeline_stats_member_collector()
->increment_transactions_waiting_apply();
}
}
end:
if (error)
cont->signal(error);
else
next(event, cont);
DBUG_RETURN(error);
}
网友评论