msmysql 2020-06-26
这个官方文档一段对MySQL内核分析的一个向导。是对MySQL一条insert语句写入到MySQL数据库的分析。
但是,对于MySQL 5.7版本来说,基本上都是写入到innodb引擎。但也还是有借鉴意义,大的框架没有太大变化。
后面的文档,会通过mysqld --debug 和gdb
等工具,通过分析mysqld.trace来分析insert语句在MySQL 5.7中怎么写入数据库。
官方文档给出的一段结构,如下:
/sql/mysqld.cc /sql/sql_parse.cc /sql/sql_prepare.cc /sql/sql_insert.cc /sql/ha_myisam.cc /myisam/mi_write.c
上述梳理一个过程,是说从客户段执行一条简单的insert语句,然后到达MySQL服务器端,并通过MyISAM存储层。写入到MyISAM文件的过程。
由于,我们现在的主流都是InnoDB存储引擎,所以我们分析的写入到存储层应该是InnoDB的源代码。但是上述的一个框架也有借鉴意义。虽然,走的是InnoDB存储引擎插入数据,但是也还是需要通过SQL层的ha_*这样的接口进行接入。
正题开始!!!!!!!!!!!!!!!!!!!!!!!
第一步,进入MySQL大门的地方。梦开始的地方。众所周知,C语言都是需要main方法作为主入口。而MySQL的主入口如下:
代码位置/sql/mysqld.cc
int main(int argc, char **argv) { _cust_check_startup(); (void) thr_setconcurrency(concurrency); init_ssl(); server_init(); // ‘bind‘ + ‘listen‘ init_server_components(); start_signal_handler(); acl_init((THD *)0, opt_noacl); init_slave(); create_shutdown_thread(); create_maintenance_thread(); handle_connections_sockets(0); // ! 这里也代表着我们进入下一个门的地方 DBUG_PRINT("quit",("Exiting main thread")); exit(0); }
这里可以看到很多的init_*
或者server_init()
。通过名字我们可以猜测出,这里做了很多初始化的工作。例如:启动过程中一些初始化的检查和MySQL配置变量的加载和一些组件的初始化等。
这里重要的函数是handle_connections_sockets
继续跟踪/sql/mysqld.cc
handle_connections_sockets (arg __attribute__((unused)) { if (ip_sock != INVALID_SOCKET) { FD_SET(ip_sock,&clientFDs); DBUG_PRINT("general",("Waiting for connections.")); while (!abort_loop) { new_sock = accept(sock, my_reinterpret_cast(struct sockaddr*) (&cAddr), &length); thd= new THD; if (sock == unix_sock) thd->host=(char*) localhost; create_new_thread(thd); // ! }
从简易的思维,忽视其他的判断语句。可以看到这里做的是典型的client/server架构。服务器有一个主线程,它总是侦听来自新客户机的请求。一旦它接收到这样的请求,它将分配资源。特别是,主线程将生成一个新线程来处理连接。然后主服务器将循环并侦听新连接——但我们将保留它并跟踪新线程。
这里创建新线程的方法是:create_new_thread(thd);
继续跟踪/sql/mysqld.cc
create_new_thread(THD *thd) { pthread_mutex_lock(&LOCK_thread_count); pthread_create(&thd->real_id,&connection_attrib, handle_one_connection, // ! (void*) thd)); pthread_mutex_unlock(&LOCK_thread_count); }
可以看到这里获得一个新线程加入一个互斥锁,避免冲突。
继续跟踪/sql/mysqld.cc
handle_one_connection(THD *thd) { init_sql_alloc(&thd->mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); while (!net->error && net->vio != 0 && !thd->killed) { if (do_command(thd)) // ! break; } close_connection(net); end_thread(thd,1); packet=(char*) net->read_pos;
从这里开始,我们即将脱离mysqld.cc
文件,因为我们获得了thread,且分配一小段内存资源,给与我们来处理我们的SQL语句了。
我们会走向何方呢,可以开始观察do_command(thd)
方法。
继续跟踪/sql/sql_parse.cc
bool do_command(THD *thd) { net_new_transaction(net); packet_length=my_net_read(net); packet=(char*) net->read_pos; command = (enum enum_server_command) (uchar) packet[0]; dispatch_command(command,thd, packet+1, (uint) packet_length); // ! }
其中从这里可以看到,do_command(THD *thd)
把它串联起来的是一个叫作THD的东西,也就是thread。所以后面的工作和行为,基本都是通过thread进行牵线搭桥的。
my_net_read函数位于另一个名为net_servlet .cc的文件中。该函数从客户端获取一个包,解压缩它,并去除头部。
一旦完成,我们就得到了一个名为packet的多字节变量,它包含客户端发送的内容。第一个字节很重要,因为它包含标识消息类型的代码。
说明了packet第一个字节很重要。debug也有证据进行一个佐证。
packet_header: Memory: 0x7f7fc000a4b0 Bytes: (4) 21 00 00 00
然后把packet第一个字节和余下的部分传递给dispatch_command
继续跟踪/sql/sql_parse.cc
bool dispatch_command(enum enum_server_command command, THD *thd, char* packet, uint packet_length) { switch (command) { case COM_INIT_DB: ... case COM_REGISTER_SLAVE: ... case COM_TABLE_DUMP: ... case COM_CHANGE_USER: ... case COM_EXECUTE: mysql_stmt_execute(thd,packet); case COM_LONG_DATA: ... case COM_PREPARE: mysql_stmt_prepare(thd, packet, packet_length); // ! /* and so on for 18 other cases */ default: send_error(thd, ER_UNKNOWN_COM_ERROR); break; }
这里sql_parser .cc中有一个非常大的switch语句
switch语句中代码有:code for prepare, close statement, query, quit, create database, drop database, dump binary log, refresh, statistics, get process info, kill process, sleep, connect, and several minor commands
除了COM_EXECUTE和COM_PREPARE两种情况外,我们删除了所有情况下的代码细节。
可以看到
COM_EXECUTE 会调用mysql_stmt_execute(thd,packet);
COM_PREPARE 会调用mysql_stmt_prepare(thd, packet, packet_length);
这里就像一个中转站一般,看我们去向什么地方。这里去的门是:COM_PREPARE:mysql_stmt_prepare
跟踪/sql/sql_prepare.cc
下面是一段prepare的注释
"Prepare: Parse the query Allocate a new statement, keep it in ‘thd->prepared statements‘ pool Return to client the total number of parameters and result-set metadata information (if any)"
继续回到主线COM_EXECUTE
跟踪/sql/sql_parse.cc
bool dispatch_command(enum enum_server_command command, THD *thd, char* packet, uint packet_length) { switch (command) { case COM_INIT_DB: ... case COM_REGISTER_SLAVE: ... case COM_TABLE_DUMP: ... case COM_CHANGE_USER: ... case COM_EXECUTE: mysql_stmt_execute(thd,packet); // ! case COM_LONG_DATA: ... case COM_PREPARE: mysql_stmt_prepare(thd, packet, packet_length); /* and so on for 18 other cases */ default: send_error(thd, ER_UNKNOWN_COM_ERROR); break; }
现在``COM_EXECUTE 中的
mysql_stmt_execute`是我们关注的重点,我们来看看
跟踪/sql/sql_prepare.cc
代码
void mysql_stmt_execute(THD *thd, char *packet) { if (!(stmt=find_prepared_statement(thd, stmt_id, "execute"))) { send_error(thd); DBUG_VOID_RETURN; } init_stmt_execute(stmt); mysql_execute_command(thd); // ! }
这里做一个判断,看是否是execute
,然后初始化语句,并开始执行mysql_execute_command(thd);
可以看到,是通过thread来调用动作。
跟踪/sql/sql_parse.cc
代码
void mysql_execute_command(THD *thd) switch (lex->sql_command) { case SQLCOM_SELECT: ... case SQLCOM_SHOW_ERRORS: ... case SQLCOM_CREATE_TABLE: ... case SQLCOM_UPDATE: ... case SQLCOM_INSERT: ... // ! case SQLCOM_DELETE: ... case SQLCOM_DROP_TABLE: ... }
lex 解析sql语句。然后进入SQLCOM_INSERT。
跟踪/sql/sql_parse.cc
代码
case SQLCOM_INSERT: { my_bool update=(lex->value_list.elements ? UPDATE_ACL : 0); ulong privilege= (lex->duplicates == DUP_REPLACE ? INSERT_ACL | DELETE_ACL : INSERT_ACL | update); if (check_access(thd,privilege,tables->db,&tables->grant.privilege)) goto error; if (grant_option && check_grant(thd,privilege,tables)) goto error; if (select_lex->item_list.elements != lex->value_list.elements) { send_error(thd,ER_WRONG_VALUE_COUNT); DBUG_VOID_RETURN; } res = mysql_insert(thd,tables,lex->field_list,lex->many_values, select_lex->item_list, lex->value_list, (update ? DUP_UPDATE : lex->duplicates)); // ! if (thd->net.report_error) res= -1; break; }
对于插入数据,我们要做的第一件事情是:检查用户是否具有对表进行插入的适当特权,服务器通过调用check_access和check_grant函数在这里进行检查。
有了权限才可以做【插入】动作。
我们可以导航 /sql 目录,如下:
Program Name SQL statement type ------------ ------------------ sql_delete.cc DELETE sql_do.cc DO sql_handler.cc HANDLER sql_help.cc HELP sql_insert.cc INSERT // ! sql_load.cc LOAD sql_rename.cc RENAME sql_select.cc SELECT sql_show.cc SHOW sql_update.cc UPDATE
sql_insert.cc
是具体执行插入的操作。
上面的mysql_insert() 的方法具体实现,在sql_insert.cc
文件中。
跟踪 /sql/sql_insert.cc
代码
int mysql_insert(THD *thd,TABLE_LIST *table_list, List<Item> &fields, List<List_item> &values_list,enum_duplicates duplic) { table = open_ltable(thd,table_list,lock_type); if (check_insert_fields(thd,table,fields,*values,1) || setup_tables(table_list) || setup_fields(thd,table_list,*values,0,0,0)) goto abort; fill_record(table->field,*values); error=write_record(table,&info); // ! query_cache_invalidate3(thd, table_list, 1); if (transactional_table) error=ha_autocommit_or_rollback(thd,error); query_cache_invalidate3(thd, table_list, 1); mysql_unlock_tables(thd, thd->lock); }
这里就要开始,打开一张表。然后各种检查,看插入表的字段是否有问题。不行就abort。
然后,开始填充记录数据。最终调用write_record 写记录的方法。
由于write_record
会对应不同的存储引擎,所以这里有分支的。我这里讲解两种
继续跟踪/sql/sql_insert.cc
int write_record(TABLE *table,COPY_INFO *info) { table->file->write_row(table->record[0]; // ! }
终于,要写文件了。调用那个存储引擎呢?看handler.h
/* The handler for a table type. Will be included in the TABLE structure */ handler(TABLE *table_arg) : table(table_arg),active_index(MAX_REF_PARTS), ref(0),ref_length(sizeof(my_off_t)), block_size(0),records(0),deleted(0), data_file_length(0), max_data_file_length(0), index_file_length(0), delete_length(0), auto_increment_value(0), raid_type(0), key_used_on_scan(MAX_KEY), create_time(0), check_time(0), update_time(0), mean_rec_length(0), ft_handler(0) {} ... virtual int write_row(byte * buf)=0;
官方文档默认调用的是ha_myisam::write_row
代码 /sql/ha_myisam.cc
如下:
int ha_myisam::write_row(byte * buf) { statistic_increment(ha_write_count,&LOCK_status); /* If we have a timestamp column, update it to the current time */ if (table->time_stamp) update_timestamp(buf+table->time_stamp-1); /* If we have an auto_increment column and we are writing a changed row or a new row, then update the auto_increment value in the record. */ if (table->next_number_field && buf == table->record[0]) update_auto_increment(); return mi_write(file,buf); // ! }
这些以字母ha开头的程序是处理程序的接口,而这个程序是myisam处理程序的接口。我们这里就开始调用MyISAM了。
可以看到这里调用了mi_write(file,buf);
跟踪/myisam/mi_write.c
int mi_write(MI_INFO *info, byte *record) { _mi_readinfo(info,F_WRLCK,1); _mi_mark_file_changed(info); /* Calculate and check all unique constraints */ for (i=0 ; i < share->state.header.uniques ; i++) { mi_check_unique(info,share->uniqueinfo+i,record, mi_unique_hash(share->uniqueinfo+i,record), HA_OFFSET_ERROR); } ... to be continued in next snippet
这里有很多唯一性的校验,继续看下面
... continued from previous snippet /* Write all keys to indextree */ for (i=0 ; i < share->base.keys ; i++) { share->keyinfo[i].ck_insert(info,i,buff, _mi_make_key(info,i,buff,record,filepos) } (*share->write_record)(info,record); if (share->base.auto_key) update_auto_increment(info,record); }
这里就是我们写入到文件的地方。至此,MySQL的插入操作结束。
路径为:
main in /sql/mysqld.cc handle_connections_sockets in /sql/mysqld.cc create_new_thread in /sql/mysqld.cc handle_one_connection in /sql/sql_parse.cc do_command in /sql/sql_parse.cc dispatch_command in /sql/sql_parse.cc mysql_stmt_execute in /sql/sql_prepare.cc mysql_execute_command in /sql/sql_parse.cc mysql_insert in /sql/mysql_insert.cc write_record in /sql/mysql_insert.cc ha_myisam::write_row in /sql/ha_myisam.cc mi_write in /myisam/mi_write.c
1.进入主函数入口
2.建立socket connection的请求
3.创建一个新的线程
4.处理线程,分配内存资源
5.do_command,是获取packet第一字节,看做什么操作,并接受余下字节。
6.dispatch_command,分发操作,这里分发的是insert。
7.mysql_stmt_execute,检查是否为execute,初始化,准备做execute动作。
8.mysql_execute_command ,lex解析SQL语句,进入到SQLCOM_INSERT
9.mysql_insert ,开始做插入操作。调用write_record
10.write_record,准备写入,看调用哪个存储引擎,写入前期准备工作
11.ha_myisam::write_row,ha_myisam进行插入写入。
12.mi_write,最后做写入操作。