MySQLLelove 2019-06-25
MySQL使用了称之为psi/pfs的一系列文件和结构来进行performance监控。Psi全称为performance schema interface,pfs全称为performance storage。
该机制使用pthead来进行操作,其首先定义了pthread的线程存储变量(pfs.cc):
thread_local_key_t THR_PFS; thread_local_key_t THR_PFS_VG; // global_variables thread_local_key_t THR_PFS_SV; // session_variables thread_local_key_t THR_PFS_VBT; // variables_by_thread thread_local_key_t THR_PFS_SG; // global_status thread_local_key_t THR_PFS_SS; // session_status thread_local_key_t THR_PFS_SBT; // status_by_thread thread_local_key_t THR_PFS_SBU; // status_by_user thread_local_key_t THR_PFS_SBH; // status_by_host thread_local_key_t THR_PFS_SBA; // status_by_account bool THR_PFS_initialized= false;
这里的thread_local_key_t
实际上是pthread_key_t
,即pthread
线程存储变量。pthread_key_t
的使用就像一个全局变量,哪个线程都可以用,但是实际上对应了线程内部的变量值,可以参见该例:http://www.jianshu.com/p/d52c...。pthread规定,线程存储变量thread_local_key_t
必须要先初始化。MySQL在pfs_server.cc
中对这些变量统一初始化:
void pre_initialize_performance_schema() { pfs_initialized= false; init_all_builtin_memory_class(); PFS_table_stat::g_reset_template.reset(); global_idle_stat.reset(); global_table_io_stat.reset(); global_table_lock_stat.reset(); if (my_create_thread_local_key(&THR_PFS, destroy_pfs_thread)) return; if (my_create_thread_local_key(&THR_PFS_VG, NULL)) // global_variables return; if (my_create_thread_local_key(&THR_PFS_SV, NULL)) // session_variables return; if (my_create_thread_local_key(&THR_PFS_VBT, NULL)) // variables_by_thread return; if (my_create_thread_local_key(&THR_PFS_SG, NULL)) // global_status return; if (my_create_thread_local_key(&THR_PFS_SS, NULL)) // session_status return; if (my_create_thread_local_key(&THR_PFS_SBT, NULL)) // status_by_thread return; if (my_create_thread_local_key(&THR_PFS_SBU, NULL)) // status_by_user return; if (my_create_thread_local_key(&THR_PFS_SBH, NULL)) // status_by_host return; if (my_create_thread_local_key(&THR_PFS_SBA, NULL)) // status_by_account return; THR_PFS_initialized= true; }
注意,这个初始化只做一次,以后创建线程时直接使用即可。上的第一个变量THR_PFS就是我们要使用的。
使用上的方式初始化,首先要set相应的value:
/** @brief Execute the JOIN generated by parallel @param join [in] JOIN structure */ void execute_join(parallel_execution_thread_arg* parallel_arg) { /* * Get the parameter: * 1. JOIN * 2. pfs */ /// TODO: do we need to handle error? std::cout << "****************I am in worker thread*****************" << std::endl; /// Get join JOIN* join= parallel_arg->join; /// Get and Set pfs PSI_thread* pfs= parallel_arg->pfs; pfs_set_thread_v1(pfs); /// Delete delete parallel_arg; /// Set the new thread context my_thread_set_THR_THD(join->thd); /// Execute join->exec(); }
上面的函数是我在MySQL中新加入的代码,其中使用pfs_set_thread_v1进行set操作,即把当前THR_PFS对应的值设置为pfs。
get操作。由于我们加入了boost
线程库,所以当启动一个线程时需要把JOIN
结构和pfs
结构传入。思路是首先通过THR_PFS
获得pfs
线程句柄,作为参数传入到新的线程中。再新线程执行函数中,把pfs
线程句柄set进去。具体在sql_select.cc中,我们加入了如下代码:
/** Parallel execution. @details When a JOIN is parallel, its JOINs will execute parallelly. Put all JOINs into thread pool to execute. */ void JOIN::parallel_exec_joins() { for (uint i= 0; i < m_parallel_joins.size(); i ++) { /// Delete it in join->exec parallel_execution_thread_arg* parallel_arg= new parallel_execution_thread_arg(); /// Set join JOIN* join= m_parallel_joins[i]; parallel_arg->join= join; /// Set pfs PSI_thread* pfs= pfs_get_thread_v1(); parallel_arg->pfs= pfs; /// Thread pool generic_thread_pool.SubmitTask(execute_join, (parallel_execution_thread_arg* &&)parallel_arg); } }
可以看到,我们通过MySQL的pfs_get_thread_v1
获得pfs线程句柄传入到新的线程。
上面的例子是针对pfs的线程。对于MySQL普通线程的例子在上面的execute_join也能找到。注意里面有一行code:
/// Set the new thread context my_thread_set_THR_THD(join->thd);
这里就是把当前的thd设置到pthread中。所以我们看到,在MySQL中的很多地方都用到了这个东西,用法也已经明确了。