mysqld通过 RUN_HOOK(server_state, before_handle_connection, (NULL)); 调用
/**
Thread handler for a connection
@param arg Connection object (Channel_info)
This function (normally) does the following:
- Initialize thread // 初始化线程
- Initialize THD to be used with this thread // 初始化THD 信息
- Authenticate user // 权限认证
- Execute all queries sent on the connection // 处理query
- Take connection down
- End thread / Handle next connection using thread from thread cache
*/
extern "C" void *handle_connection(void *arg)
{
Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
Connection_handler_manager *handler_manager=
Connection_handler_manager::get_instance();
Channel_info* channel_info= static_cast<Channel_info*>(arg);
bool pthread_reused MY_ATTRIBUTE((unused))= false;
// my_thread_init 给线程分配内存空间,如果正常返回false, 否则返回true
if (my_thread_init())
{
connection_errors_internal++;
channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false);
handler_manager->inc_aborted_connects();
Connection_handler_manager
::dec_connection_count(channel_info->is_on_extra_port());
delete channel_info;
my_thread_exit(0);
return NULL;
}
for (;;)
{
// Save this here as init_new_thd destroys channel_info
bool extra_port_connection= channel_info->is_on_extra_port();
THD *thd= init_new_thd(channel_info);
if (thd == NULL)
{
connection_errors_internal++;
handler_manager->inc_aborted_connects();
Connection_handler_manager::dec_connection_count(extra_port_connection);
break; // We are out of resources, no sense in continuing.
}
DBUG_EXECUTE_IF("after_thread_setup",
{
const char act[]=
"now signal thread_setup";
DBUG_ASSERT(!debug_sync_set_action(thd,
STRING_WITH_LEN(act)));
};);
#ifdef HAVE_PSI_THREAD_INTERFACE
if (pthread_reused)
{
/*
Reusing existing pthread:
Create new instrumentation for the new THD job,
and attach it to this running pthread.
*/
PSI_thread *psi= PSI_THREAD_CALL(new_thread)
(key_thread_one_connection, thd, thd->thread_id());
PSI_THREAD_CALL(set_thread_os_id)(psi);
PSI_THREAD_CALL(set_thread)(psi);
}
#endif
#ifdef HAVE_PSI_THREAD_INTERFACE
/* Find the instrumented thread */
PSI_thread *psi= PSI_THREAD_CALL(get_thread)();
/* Save it within THD, so it can be inspected */
thd->set_psi(psi);
#endif /* HAVE_PSI_THREAD_INTERFACE */
mysql_thread_set_psi_id(thd->thread_id());
mysql_thread_set_psi_THD(thd);
mysql_socket_set_thread_owner(
thd->get_protocol_classic()->get_vio()->mysql_socket);
// thd_manager 添加这个线程的thd信息
thd_manager->add_thd(thd);
// thd_prepare_connection 登录认证,ok 返回0, 出错返回1
if (thd_prepare_connection(thd, extra_port_connection))
handler_manager->inc_aborted_connects();
else
{
// 检测链接是否依然存活
while (thd_connection_alive(thd))
{
// 执行query,这个会阻塞等待客户端发query,等待超过"net_wait_timeout"后会断开链接
if (do_command(thd))
break;
}
// 结束链接
end_connection(thd);
}
close_connection(thd, 0, false, false);
thd->get_stmt_da()->reset_diagnostics_area();
thd->release_resources();
#if OPENSSL_VERSION_NUMBER < 0x10100000L
// Clean up errors now, before possibly waiting for a new connection.
ERR_remove_state(0);
#endif
thd_manager->remove_thd(thd);
Connection_handler_manager::dec_connection_count(extra_port_connection);
#ifdef HAVE_PSI_THREAD_INTERFACE
/*
Delete the instrumentation for the job that just completed.
*/
thd->set_psi(NULL);
PSI_THREAD_CALL(delete_current_thread)();
#endif /* HAVE_PSI_THREAD_INTERFACE */
delete thd;
if (abort_loop) // Server is shutting down so end the pthread.
break;
channel_info= Per_thread_connection_handler::block_until_new_connection();
if (channel_info == NULL)
break;
pthread_reused= true;
}
my_thread_end();
my_thread_exit(0);
return NULL;
}
网友评论