JAVA记事 2019-06-30
作者:张学程
TiDB-DM(Data Migration)是用于将数据从 MySQL/MariaDB 迁移到 TiDB 的工具。该工具既支持以全量备份文件的方式将 MySQL/MariaDB 的数据导入到 TiDB,也支持通过解析执行 MySQL/MariaDB binlog 的方式将数据增量同步到 TiDB。特别地,对于有多个 MySQL/MariaDB 实例的分库分表需要合并后同步到同一个 TiDB 集群的场景,DM 提供了良好的支持。如果你需要从 MySQL/MariaDB 迁移到 TiDB,或者需要将 TiDB 作为 MySQL/MariaDB 的从库,DM 将是一个非常好的选择。
DM 是集群模式的,其主要由 DM-master、DM-worker 与 DM-ctl 三个组件组成,能够以多对多的方式将多个上游 MySQL 实例的数据同步到多个下游 TiDB 集群,其架构图如下:
单个 DM 集群可以同时运行多个数据同步任务;对于每一个同步任务,可以拆解为多个子任务同时由多个 DM-worker 节点承担,其中每个 DM-worker 节点负责同步来自对应的上游 MySQL 实例的数据。对于单个 DM-worker 节点上的单个数据同步子任务,其数据迁移流程如下,其中上部的数据流向为全量数据迁移、下部的数据流向为增量数据同步:
在每个 DM-worker 节点内部,对于特定的数据同步子任务,主要由 dumper、loader、relay 与 syncer(binlog replication)等数据同步处理单元执行具体的数据同步操作。
为加快数据导入速度,在 DM 中不论是全量数据迁移,还是增量数据同步,都在其中部分阶段使用了并发处理。
对于全量数据迁移,在导出阶段,dumper 单元调用 mydumper 导出工具执行实际的数据导出操作,对应的并发模型可以直接参考 mydumper 的源码。在使用 loader 单元执行的导入阶段,对应的并发模型结构如下:
使用 mydumper 执行导出时,可以通过 --chunk-filesize
等参数将单个表拆分成多个 SQL 文件,这些 SQL 文件对应的都是上游 MySQL 某一个时刻的静态快照数据,且各 SQL 文件间的数据不存在关联。因此,在使用 loader 单元执行导入时,可以直接在一个 loader 单元内启动多个 worker 工作协程,由各 worker 协程并发、独立地每次读取一个待导入的 SQL 文件进行导入。即 loader 导入阶段,是以 SQL 文件级别粒度并发进行的。在 DM 的任务配置中,对于 loader 单元,其中的 pool-size
参数即用于控制此处 worker 协程数量。
对于增量数据同步,在从上游拉取 binlog 并持久化到本地的阶段,由于上游 MySQL 上 binlog 的产生与发送是以 stream 形式进行的,因此这部分只能串行处理。在使用 syncer 单元执行的导入阶段,在一定的限制条件下,可以执行并发导入,对应的模型结构如下:
当 syncer 读取与解析本地 relay log 时,与从上游拉取 binlog 类似,是以 stream 形式进行的,因此也只能串行处理。当 syncer 解析出各 binlog event 并构造成待同步的 job 后,则可以根据对应行数据的主键、索引等信息经过 hash 计算后分发到多个不同的待同步 job channel 中;在 channel 的另一端,与各个 channel 对应的 worker 协程并发地从 channel 中取出 job 后同步到下游的 TiDB。即 syncer 导入阶段,是以 binlog event 级别粒度并发进行的。在 DM 的任务配置中,对于 syncer 单元,其中的 worker-count
参数即用于控制此处 worker 协程数量。
但 syncer 并发同步到下游 TiDB 时,存在一些限制,主要包括:
在使用 MySQL 支撑大量数据时,经常会选择使用分库分表的方案。但当将数据同步到 TiDB 后,通常希望逻辑上进行合库合表。DM 为支持合库合表的数据同步,主要实现了以下的一些功能。
为说明 DM 中 table router(表名路由)功能,先看如下图所示的一个例子:
在这个例子中,上游有 2 个 MySQL 实例,每个实例有 2 个逻辑库,每个库有 2 个表,总共 8 个表。当同步到下游 TiDB 后,希望所有的这 8 个表最终都合并同步到同一个表中。
但为了能将 8 个来自不同实例、不同库且有不同名的表同步到同一个表中,首先要处理的,就是要能根据某些定义好的规则,将来自不同表的数据都路由到下游的同一个表中。在 DM 中,这类规则叫做 router-rules。对于上面的示例,其规则如下:
name-of-router-rule: schema-pattern: "schema_*" table-pattern: "table_*" target-schema: "schema" target-table: "table"
name-of-router-rule
:规则名,用户指定。当有多个上游实例需要使用相同的规则时,可以只定义一条规则,多个不同的实例通过规则名进行引用。schema-pattern
:用于匹配上游库(schema)名的模式,支持在尾部使用通配符(*)。这里使用 schema_*
即可匹配到示例中的两个库名。table-pattern
:用于匹配上游表名的模式,与 schema-pattern
类似。这里使用 table_*
即可匹配到示例中的两个表名。target-schema
:目标库名。对于库名、表名匹配的数据,将被路由到这个库中。target-table
:目标表名。对于库名、表名匹配的数据,将被路由到 target-schema
库下的这个表中。在 DM 内部实现上,首先根据 schema-pattern
/ table-pattern
构造对应的 trie 结构,并将规则存储在 trie 节点中;当有 SQL 需要同步到下游时,通过使用上游库名、表名查询 trie 即可得到对应的规则,并根据规则替换原 SQL 中的库名、表名;通过向下游 TiDB 执行替换后的 SQL 即完成了根据表名的路由同步。有关 router-rules
规则的具体实现,可以阅读 TiDB-Tools 下的 table-router pkg 源代码。
有了 table router 功能,已经可以完成基本的合库合表数据同步了。但在数据库中,我们经常会使用自增类型的列作为主键。如果多个上游分表的主键各自独立地自增,将它们合并同步到下游后,就很可能会出现主键冲突,造成数据的不一致。我们可看一个如下的例子:
在这个例子中,上游 4 个需要合并同步到下游的表中,都存在 id 列值为 1 的记录。假设这个 id 列是表的主键。在同步到下游的过程中,由于相关更新操作是以 id 列作为条件来确定需要更新的记录,因此会造成后同步的数据覆盖前面已经同步过的数据,导致部分数据的丢失。
在 DM 中,我们通过 column mapping 功能在数据同步的过程中依据指定规则对相关列的数据进行转换改写来避免数据冲突与丢失。对于上面的示例,其中 MySQL 实例 1 的 column mapping 规则如下:
mapping-rule-of-instance-1: schema-pattern: "schema_*" table-pattern: "table_*" expression: "partition id" source-column: "id" target-column: "id" arguments: ["1", "schema_", "table_"]
mapping-rule-of-instance-1
:规则名,用户指定。由于不同的上游 MySQL 实例需要转换得到不同的值,因此通常每个 MySQL 实例使用一条专有的规则。schema-pattern
/ table-pattern
:上游库名、表名匹配模式,与 router-rules
中的对应配置项一致。expression
:进行数据转换的表达式名。目前常用的表达式即为 "partition id"
,有关该表达式的具体说明见下文。source-column
:转换表达式的输入数据对应的来源列名,"id"
表示这个表达式将作用于表中名为 id 的列。暂时只支持对单个来源列进行数据转换。target-column
:转换表达式的输出数据对应的目标列名,与 source-column
类似。暂时只支持对单个目标列进行数据转换,且对应的目标列必须已经存在。arguments
:转换表达式所依赖的参数。参数个数与含义依具体表达式而定。partition id
是目前主要受支持的转换表达式,其通过为 bigint 类型的值增加二进制前缀来解决来自不同表的数据合并同步后可能产生冲突的问题。partition id
的 arguments 包括 3 个参数,分别为:
"1"
表示匹配该规则的数据来自于 MySQL 实例 1,且这个标识将被转换成数值后以二进制的形式作为前缀的一部分添加到转换后的值中。"schema_"
应用于 schema_2
逻辑库时,表示去除前缀后剩下的部分(数字 2
)将以二进制的形式作为前缀的一部分添加到转换后的值中。"table_"
应用于 table_3
表时,表示去除前缀后剩下的部分(数字 3
)将以二进制的形式作为前缀的一部分添加到转换后的值中。各部分在经过转换后的数值中的二进制分布如下图所示(各部分默认所占用的 bits 位数如图所示):
假如转换前的原始数据为 123
,且有如上的 arguments 参数设置,则转换后的值为:
1<<(64-1-4) | 2<<(64-1-4-7) | 3<<(64-1-4-7-8) | 123
另外,arguments 中的 3 个参数均可设置为空字符串(""
),即表示该部分不被添加到转换后的值中,且不占用额外的 bits。比如将其设置为["1", "", "table_"]
,则转换后的值为:
1 << (64-1-4) | 3<< (64-1-4-8) | 123
有关 column mapping 功能的具体实现,可以阅读 TiDB-Tools 下的 column-mapping pkg 源代码。
有了 table router 和 column mapping 功能,DML 的合库合表数据同步已经可以正常进行了。但如果在增量数据同步的过程中,上游待合并的分表上执行了 DDL 操作,则可能出现问题。我们先来看一个简化后的在分表上执行 DDL 的例子。
在上图的例子中,分表的合库合表简化成了上游只有两个 MySQL 实例,每个实例内只有一个表。假设在开始数据同步时,将两个分表的表结构 schema 的版本记为 schema V1
,将 DDL 执行完成后的表结构 schema 的版本记为 schema V2
。
现在,假设数据同步过程中,从两个上游分表收到的 binlog 数据有如下的时序:
schema V1
的 DML。schema V2
的 DML;但从实例 2 收到的仍是 schema V1
的 DML。schema V2
的 DML。假设在数据同步过程中,不对分表的 DDL 进行处理。当将实例 1 的 DDL 同步到下游后,下游的表结构会变更成为 schema V2
。但对于实例 2,在 t2 时刻到 t3 时刻这段时间内收到的仍然是 schema V1
的 DML。当尝试把这些与 schema V1
对应的 DML 同步到下游时,就会由于 DML 与表结构的不一致而发生错误,造成数据无法正确同步。
继续使用上面的例子,来看看我们在 DM 中是如何处理合库合表过程中的 DDL 同步的。
在这个例子中,DM-worker-1 用于同步来自 MySQL 实例 1 的数据,DM-worker-2 用于同步来自 MySQL 实例 2 的数据,DM-master 用于协调多个 DM-worker 间的 DDL 同步。从 DM-worker-1 收到 DDL 开始,简化后的 DDL 同步流程为:
根据上面 DM 处理多个 DM-worker 间的 DDL 同步的流程,归纳一下 DM 内处理多个 DM-worker 间 sharding DDL 同步的特点:
从 DM 处理 DM-worker 间 sharding DDL 同步的特点,可以看出该功能存在以下一些限制:
a
后再增加列 b
,而表 2 先增加列 b
后再增加列 a
,这种不同顺序的 DDL 执行方式是不支持的。在上面的示例中,每个 DM-worker 对应的上游 MySQL 实例中只有一个需要进行合并的分表。但在实际场景下,一个 MySQL 实例可能有多个分库内的多个分表需要进行合并,比如前面介绍 table router 与 column mapping 功能时的例子。当一个 MySQL 实例中有多个分表需要合并时,sharding DDL 的协调同步过程增加了更多的复杂性。
假设同一个 MySQL 实例中有 table_1
和 table_2
两个分表需要进行合并,如下图:
由于数据来自同一个 MySQL 实例,因此所有数据都是从同一个 binlog 流中获得。在这个例子中,时序如下:
schema V1
的 DML。table_1
的 DDL。table_2
schema V1
的 DML。table_2
的 DDL。schema V2
的 DML。假设在数据同步过程中不对 DDL 进行特殊处理,当 table_1
的 DDL 同步到下游、变更下游表结构后,table_2 schema V1
的 DML 将无法正常同步。因此,在单个 DM-worker 内部,我们也构造了与 DM-master 内类似的逻辑 sharding group,但 group 的成员是同一个上游 MySQL 实例的不同分表。
但 DM-worker 内协调处理 sharding group 的同步不能完全与 DM-master 处理时一致,主要原因包括:
table_1
的 DDL 时,同步不能暂停,需要继续解析 binlog 才能获得后续 table_2
的 DDL,即需要从 t2 时刻继续向前解析直到 t3 时刻。table_1
的 schema V2
的 DML 不能向下游同步;但在 sharding DDL 同步并执行成功后,这些 DML 需要同步到下游。在 DM 中,简化后的 DM-worker 内 sharding DDL 同步流程为:
table_1
的 DDL,记录 DDL 信息及此时的 binlog 位置点信息。table_1
的 schema V2
DML,忽略;对于属于 table_2
的 schema V1
DML,正常同步到下游。table_2
的 DDL,记录 DDL 信息及此时的 binlog 位置点信息。table_1
的 schema V2
DML,正常同步到下游;对于属于 table_2
的 shema V1
DML,忽略。从上面的分析可以知道,DM 在处理 sharding DDL 同步时,主要通过两级 sharding group 来进行协调控制,简化的流程为:
在进行数据同步的过程中,有时可能并不需要将上游所有的数据都同步到下游,这时一般期望能在同步过程中根据某些规则,过滤掉部分不期望同步的数据。在 DM 中,支持 2 种不同级别的同步过滤方式。
DM 在 dumper、loader、syncer 三个处理单元中都支持配置规则只同步/不同步部分库或表。
对于 dumper 单元,其实际调用 mydumper 来 dump 上游 MySQL 的数据。比如只期望导出 test 库中的 t1、t2 两个表的数据,则可以为 dumper 单元配置如下规则:
name-of-dump-rule: extra-args: "-B test -T t1,t2"
name-of-dump-rule
:规则名,用户指定。当有多个上游实例需要使用相同的规则时,可以只定义一条规则,多个不同的实例通过规则名进行引用。extra-args
:dumper 单元额外参数。除 dumper 单元中明确定义的配置项外的其他所有 mydumper 配置项都通过此参数传入,格式与使用 mydumper 时一致。有关 mydumper 对库表黑白名单的支持,可查看 mydumper 的参数及 mydumper 的源码。
对于 loader 和 syncer 单元,其对应的库表黑白名单规则为 black-white-list
。假设只期望同步 test 库中的 t1、t2 两个表的数据,则可配置如下规则:
name-of-bwl-rule: do-tables: - db-name: "test" tbl-name: "t1" - db-name: "test" tbl-name: "t2"
示例中只使用了该规则的部分配置项,完整的配置项及各配置项的含义,可阅读该功能对应的用户文档。DM 中该规则与 MySQL 的主从同步过滤规则类似,因此也可参考 Evaluation of Database-Level Replication and Binary Logging Options 与 Evaluation of Table-Level Replication Options。
对于 loader 单元,在解析 SQL 文件名获得库名表名后,会与配置的黑白名单规则进行匹配,如果匹配结果为不需要同步,则会忽略对应的整个 SQL 文件。对于 syncer 单元,在解析 binlog 获得库名表名后,会与配置的黑白名单规则进行匹配,如果匹配结果为不需要同步,则会忽略对应的(部分)binlog event 数据。
在进行增量数据同步时,有时会期望过滤掉某些特定类型的 binlog event,两个典型的场景包括:
TRUNCATE TABLE
时不希望清空下游表中的数据。DROP TABLE
时不希望 DROP
下游合并后的表。在 DM 中支持根据 binlog event 的类型进行过滤,对于需要过滤 TRUNCATE TABLE
与 DROP TABLE
的场景,可配置规则如下:
name-of-filter-rule: schema-pattern: "test_*" table-pattern: "t_*" events: ["truncate table", "drop table"] action: Ignore
规则的匹配模式与 table router、column mapping 类似,具体的配置项可阅读该功能对应的用户文档。
在实现上,当解析 binlog event 获得库名、表名及 binlog event 类型后,与配置的规则进行匹配,并在匹配后依据 action 配置项来决定是否需要进行过滤。有关 binlog event 过滤功能的具体实现,可以阅读 TiDB-Tools 下的 binlog-filter pkg 源代码。