author:sufei
版本:8.0.16
本文主要分析MySQL复制中从库IO线程的执行过程。当然MySQL复制过程分为基于gtid的io以及基于文件位置io,其实两种处理方式相差无几,这里主要分析基于gtid的io线程。从库复制线程(包含io线程和sql线程)的入口函数为start_slave
,io线程的主要逻辑函数为handle_slave_io
,之间的调用关系如下:
start_slave
->start_slave_threads //检测复制结构体是否初始化
->handle_slave_io
一、IO线程处理逻辑
io线程逻辑浅绿色代表线程进入的状态信息,也就是show slave status显示的;
橙色代表relay log io观察者类的调用接口
二、分步详解
- io线程结构体thd创建,主要用于管理以及保存该线程相关数据以及属性
- 初始化thd结构,这一步,主要是初始化该thd为系统线程,并且对一些客户端属性做一些修改,相关操作在
init_slave_thread
函数中
// 设置为系统线程SYSTEM_THREAD_SLAVE_IO
thd->system_thread = (thd_type == SLAVE_THD_WORKER)
? SYSTEM_THREAD_SLAVE_WORKER
: (thd_type == SLAVE_THD_SQL)
? SYSTEM_THREAD_SLAVE_SQL
: SYSTEM_THREAD_SLAVE_IO;
thd->security_context()->skip_grants(); //取消权限检测
thd->get_protocol_classic()->init_net(0); //关闭thd自身网络结构
thd->slave_thread = 1; //标记为salve线程
/*
Replication threads are:
- background threads in the server, not user sessions,
- yet still assigned a PROCESSLIST_ID,
for historical reasons (displayed in SHOW PROCESSLIST).
*/
thd->set_new_thread_id(); //分配thread id,主要为了在SHOW PROCESSLIST显示
/* Do not use user-supplied timeout value for system threads. */
thd->variables.lock_wait_timeout = LONG_TIMEOUT;
- 进入relay log io观察类的thread_start函数,半同步插件从库就是通过此挂钩实现半同步状态开启,具体如下:
//如果使能了半同步并且现在半同步状态变量为off,则设置半同步状态变量为on
bool semi_sync = getSlaveEnabled();
if (semi_sync && !rpl_semi_sync_slave_status) rpl_semi_sync_slave_status = 1;
- 连接主库,最终调用的函数为
connect_to_master
,主要的连接逻辑如下:
-> 将连接超时和读超时都设置为slave_net_timeout
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *)&slave_net_timeout);
mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *)&slave_net_timeout);
-> 循环尝试连接
while (!(slave_was_killed = io_slave_killed(thd, mi)) &&
(reconnect ? mysql_reconnect(mysql) != 0
: mysql_real_connect(mysql, mi->host, user, password, 0,
mi->port, 0, client_flag) == 0)) {
last_errno = mysql_errno(mysql);
suppress_warnings = 0;
mi->report(ERROR_LEVEL, last_errno,
"error %s to master '%s@%s:%d'"
" - retry-time: %d retries: %lu",
(reconnect ? "reconnecting" : "connecting"), mi->get_user(),
mi->host, mi->port, mi->connect_retry, err_count + 1);
//达到重试次数(参数MASTER_RETRY_COUNT指定)则关闭io线程,默认重试次数为3600*24=86400次
if (++err_count == mi->retry_count) {
slave_was_killed = 1;
break;
}
//两次失败之间的间隔(参数MASTER_CONNECT_RETRY指定),默认为60秒
slave_sleep(thd, mi->connect_retry, io_slave_killed, mi);
}
- 连接成功之后,则进入stage_checking_master_version阶段,该阶段主要是一下检测
- dump binlog前的相关检查,主要包含如下检测
- 主库版本检查;
- 执行SELECT UNIX_TIMESTAMP()指令,获取主从时间差;
mi->clock_diff_with_master = (long)(time((time_t *)0) - strtoul(master_row[0], 0, 10));
- 执行SELECT @@GLOBAL.SERVER_ID 获取主库server id,如果主从server id一致则报错;
- 执行SET @master_heartbeat_period= %s 设置心跳周期时间,有参数MASTER_HEARTBEAT_PERIOD指定;
- 执行SET @master_binlog_checksum= @@global.binlog_checksum 检查binlogchecksum逻辑是否一致;
- 执行SELECT @@GLOBAL.GTID_MODE 获取主库gtid模式,如果出现主从gtid模式不匹配则报错
/* 如果从库gtid模式为GTID_MODE_OFF,而主库>=GTID_MODE_ON_PERMISSIVE 或者从库gtid模式为GTID_MODE_ON,而主库<=GTID_MODE_OFF_PERMISSIVE 则报错 */ if ((slave_gtid_mode == GTID_MODE_OFF && master_gtid_mode >= GTID_MODE_ON_PERMISSIVE) || (slave_gtid_mode == GTID_MODE_ON && master_gtid_mode <= GTID_MODE_OFF_PERMISSIVE)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, "The replication receiver thread cannot start because " "the master has GTID_MODE = %.192s and this server has " "GTID_MODE = %.192s.", get_gtid_mode_string(master_gtid_mode), get_gtid_mode_string(slave_gtid_mode)); DBUG_RETURN(1); } //并且从库开启了auto position,而主库没有开启gtid模式也报错 if (mi->is_auto_position() && master_gtid_mode != GTID_MODE_ON) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, "The replication receiver thread cannot start in " "AUTO_POSITION mode: the master has GTID_MODE = %.192s " "instead of ON.", get_gtid_mode_string(master_gtid_mode)); DBUG_RETURN(1); }
- 执行SELECT @@GLOBAL.SERVER_UUID 得到主库的uuid,与从库对比一致则报错。
- 发送io线程初始化指令,实际上是执行SET @slave_uuid= '%s'语句,将从库自身的uuid设置为线程的用户变量,从而dump线程能够获得相应的从库uuid,在初始化dump线程通过该uuid查找并kill僵死的dump线程
- 进入stage_registering_slave_on_master阶段,也就是注册从库阶段
- 发送注册命令COM_REGISTER_SLAVE,从而使得在主库执行show slave hosts;能够查看从库信息,这里的注册包含如下信息
- server id
- report host
- report user
- report password
- report port
- 进入stage_requesting_binlog_dump阶段,向主库发送dump指令
- 进入注册在relay log io观察类的before_request_transmit函数,在半同步插件中,通过该挂钩进入半同步插件实现如下功能
1、如果自身半同步没有开启,则之间返回
if (!repl_semisync->getSlaveEnabled()) return 0;
2、检测主库是否支持半同步
query = "SELECT @@global.rpl_semi_sync_master_enabled"; if (mysql_real_query(mysql, query, static_cast<ulong>(strlen(query))) || !(res = mysql_store_result(mysql))){ …… } DBUG_ASSERT(mysql_error == ER_UNKNOWN_SYSTEM_VARIABLE || strtoul(row[0], 0, 10) == 0 || strtoul(row[0], 0, 10) == 1); //如果没有安装半同步插件则关闭从库半同步 if (mysql_error == ER_UNKNOWN_SYSTEM_VARIABLE) { /* Master does not support semi-sync */ LogPluginErr(WARNING_LEVEL, ER_SEMISYNC_NOT_SUPPORTED_BY_MASTER); rpl_semi_sync_slave_status = 0; mysql_free_result(res); return 0; }
3、告诉主库dump线程,希望使用半同步复制,并开启半同步状态变量
query = "SET @rpl_semi_sync_slave= 1"; if (mysql_real_query(mysql, query, static_cast<ulong>(strlen(query)))) { LogPluginErr(ERROR_LEVEL, ER_SEMISYNC_SLAVE_SET_FAILED); return 1; } mysql_free_result(mysql_store_result(mysql)); rpl_semi_sync_slave_status = 1;
- 发送dump指令,这一步需要发送gtid信息或者binlog文件位置信息给主库(有关dump指令的格式可参考《MySQL dump线程分析》)
- 如果gtid模式,则发送restirve_gtid+execute_gtid
//添加restirve_gtid到发送的gtid中 mi->rli->get_sid_lock()->wrlock(); if (gtid_executed.add_gtid_set(mi->rli->get_gtid_set()) != RETURN_STATUS_OK) { mi->rli->get_sid_lock()->unlock(); DBUG_RETURN(1); } mi->rli->get_sid_lock()->unlock(); //添加execute_gtid到发送的gtid中 global_sid_lock->wrlock(); if (gtid_executed.add_gtid_set(gtid_state->get_executed_gtids()) != RETURN_STATUS_OK) { global_sid_lock->unlock(); DBUG_RETURN(1); } global_sid_lock->unlock();
- 如果是文件位置模式
rpl->file_name = mi->get_master_log_name(); rpl->start_position = DBUG_EVALUATE_IF("request_master_log_pos_3", 3, mi->get_master_log_pos());
- 进入循环处理event逻辑,直到io线程被kill才退出
while (!io_slave_killed(thd, mi))
- 进入stage_waiting_for_master_to_send_event阶段
- mysql客户端阻塞读取一个event,底层的调用为
NET *net = &mysql->net;
if (net->vio != 0) len = my_net_read(net);
-
进入stage_queueing_master_event_to_the_relay_log阶段
-
调用注册在relay log io观察类的before_request_transmit函数,在半同步插件中,通过该接口实现了分离半同步包的头信息与event元数据,半同步包头中包含了主库是否在等待ack信息
-
进入存储event逻辑函数queue_event
-
这一步需要对不同event调用
feed_event
函数进行鉴别事务开始或者结束的event类型,从而确保只有gtid事务所有的event之后才更新Retrieved_Gtid_Set变量,从而避免意外中断造成的event丢失;其实也可以保证relay log的切换不会造成事务event分割到两个relaylog中。
相应的之后检测不同event是否合法,
- STOP_EVENT
这是一个主库关闭时发送了event,无需写入relay log
- ROTATE_EVENT
收到主库rotate事件时,执行相应的relaylog切换,重置master_log_name和master_log_pos变量
int ret = rotate_relay_log(mi, false, false, true); memcpy(const_cast<char *>(mi->get_master_log_name()), rev->new_log_ident, rev->ident_len + 1); mi->set_master_log_pos(rev->pos);
- FORMAT_DESCRIPTION_EVENT
保存到mi结构中,用于event解析,所以每次从库连接主库进行复制时,必然先发送一个FORMAT_DESCRIPTION_EVENT,用于从库event解析
- HEARTBEAT_LOG_EVENT
主库发送心跳事件的情况有两种:1.主库没有新的binlog写入时;2、在跳过一些gtid时。对于第一种忽略即可,对于第二种由于其携带了跳过event在主库的位置信息,需要用于更新io线程的master_log_pos变量,以及在让sql线程知道。
//如果心态事件log pos大于io线程的master log pos,说明其是第二种情况 if (mi->get_master_log_pos() < hb.common_header->log_pos && mi->get_master_log_name() != NULL) { DBUG_ASSERT(memcmp(const_cast<char *>(mi->get_master_log_name()), hb.get_log_ident(), hb.get_ident_len()) == 0); //更新io线程的master log pos位置点 mi->set_master_log_pos(hb.common_header->log_pos); //将心跳event变为rotate event写入relay log,从而其携带的位置信息能被sql线程获得 if (write_rotate_to_master_pos_into_relay_log(mi->info_thd, mi, false /* force_flush_mi_info */)) goto end; }
- PREVIOUS_GTIDS_LOG_EVENT
该event对于从库没有什么含义,仅仅更新io线程记录主库位置点信息,并将其转化为rotate event写入relaylog
- GTID_LOG_EVENT
检测该gtid事件的合法性,如果从库gtid mode为GTID_MODE_OFF则报错。从该event获得original_commit_timestamp和immediate_commit_timestamp信息
- ANONYMOUS_GTID_LOG_EVENT
检测该gtid事件的合法性,auto_position或者GTID_MODE_ON则报错。同样获得original_commit_timestamp和immediate_commit_timestamp信息
- 其他event
无需特别处理
-
检测该event是否是数据库自身创建的,也就是event中的server id与从库server id对比,如果一致,则跳过。
-
到了这一步,开始调用
MYSQL_BIN_LOG::write_buffer
真正写入relay log -
写入之后调用
after_write_to_relay_log
,下面看一下其具体做了什么
1、判断event是否为事务最后的event
bool can_rotate = mi->transaction_parser.is_not_inside_transaction();
2、调用flush_and_sync,该函数根据relay log sync参数设置决定是否做sync操作,注意单位是event的个数
unsigned int sync_period = get_sync_period(); if (force || (sync_period && ++sync_counter >= sync_period)) { sync_counter = 0; if (DBUG_EVALUATE_IF("simulate_error_during_sync_binlog_file", 1, m_binlog_file->is_open() && m_binlog_file->sync())) { THD *thd = current_thd; thd->commit_error = THD::CE_SYNC_ERROR; return std::make_pair(true, synced); } synced = true; }
3、如果can_rotate为true,这说明是事务最后一个event,需要进行Retrieved_Gtid_Set,以及如果relay log超过设置大小进行切换
if (can_rotate) { mysql_mutex_lock(&mi->data_lock); // 更新Retrieved_Gtid_Set if (!last_gtid_queued->is_empty()) { mi->rli->get_sid_lock()->rdlock(); DBUG_SIGNAL_WAIT_FOR(current_thd, "updating_received_transaction_set", "reached_updating_received_transaction_set", "continue_updating_received_transaction_set"); mi->rli->add_logged_gtid(last_gtid_queued->sidno, last_gtid_queued->gno); mi->rli->get_sid_lock()->unlock(); } //如果relay log过大,进行切换,并写入之前保存下来的description_event if (m_binlog_file->get_real_file_size() > DBUG_EVALUATE_IF("rotate_slave_debug_group", 500, max_size) || mi->is_rotate_requested()) { error = new_file_without_locking(mi->get_mi_description_event()); mi->clear_rotate_requests(); } }
4、广播relay log更新,以便唤醒sql线程处理
update_binlog_end_pos(false /*need_lock*/);
- 更新位置信息,这里更新的位置信息,主要是写入了relay log的event,那些没写入relay log的event,如果携带了master位置信息已经在前面更新好了
mi->set_master_log_pos(mi->get_master_log_pos() + inc_pos);
- flush master info,这里注意特别是对于非gtid的复制,每次relay log写入一个event都需要flush master info信息,如果没有很可能由于宕机而造成复制出错
if (res == QUEUE_EVENT_OK && do_flush_mi) {
if (flush_master_info(mi, false /*force*/, lock_count == 0 /*need_lock*/,
false /*flush_relay_log*/))
res = QUEUE_EVENT_ERROR_FLUSHING_INFO;
}
- 调用注册在relay log io观察类的after_queue_event函数,在半同步插件中,通过该接口实现从库发送ack消息回复给主库,最终调用函数为
slaveReply
。
完成上述步骤都又进入第15步,开始读取新的event事件。
网友评论