CHINA华军 2017-04-23
POC的目的:
1、与MySQL的对接方式,配置文档
2、订阅的延迟
3、订阅后宕机消息会不会丢失
4、能不能从指定的点开始重新订阅
5、高并发写入的时候,日志的顺序是否还能保持,不考虑消费的情况订阅是否会延迟
官网地址:https://github.com/alibaba/canal
从上层来看,复制分成三步:
直接下载,访问:https://github.com/alibaba/canal/releases,也可以在linux上直接联网下载:
服务端包:https://github.com/alibaba/canal/releases/download/v1.0.23/canal.deployer-1.0.23.tar.gz
客户端包:https://github.com/alibaba/canal/releases/download/v1.0.23/canal.example-1.0.23.tar.gz
Mkdir /app/canal
Mkdir /app/canal-example
Tar zxvf canal.deployer-1.0.23.tar.gz -C /app/canal
Tar zxvf canal.example-1.0.23.tar.gz -C /app/canal-example
a. canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,建议配置binlog模式为row.
**针对阿里云RDS账号默认已经有binlog dump权限,不需要任何权限或者binlog设置,可以直接跳过这一步**
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
b. canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限.
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
针对已有的账户可直接通过grant
vi conf/example/instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234
# position info,需要改成自己的数据库信息
canal.instance.master.address = 172.16.0.158:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =canal
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex = .*\\..*
#################################################
说明:
sh bin/startup.sh 启动
sh bin/stop.sh 停止
vi logs/canal/canal.log 查看canal日志
vi logs/example/example.log 查看instance的日志
Cd /app/canal-example
sh bin/startup.sh 启动canal客户端
sh bin/stop.sh 停止canal客户端
tail -f /app/canal-example/logs/example/entry.log 查看canal客户端订阅的日志
尝试修改mysql数据库,如上述我们配置的库是canal,我们创建一个userinfo的用户表,可以在entry.log里面打印出userinfo的信息
到此为止整个canal环境搭建完成。
不过canal-example是一个已经编译好的包,如果我们需要对源码进行修改,输出一些我们自己想要的信息,可以重新开发canal客户端。
客户端源码官方下载地址:https://github.com/alibaba/canal/wiki/ClientExample
更多配置策略请参考官方文档:https://github.com/alibaba/canal/wiki/AdminGuide
a. 修改canal.properties,加上zookeeper配置
canal.zkServers=172.16.7.122:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
b. 创建example目录,并修改instance.properties
canal.instance.mysql.slaveId = 1234 ##另外一台机器改成1235,保证slaveId不重复即可
canal.instance.master.address = 172.16.0.158:3306
注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置
启动两台机器的canal,启动后,你可以查看logs/example/example.log,只会看到一台机器上出现了启动成功的日志。查看一下zookeeper中的节点信息,也可以知道当前工作的节点为172.16.0.157:11111
Canal pom版本需要1.0.22或以上,否则zkclient可能发生冲突
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.22</version> </dependency>
|
Canal POC
Canal client接收到日志之后要提交ack确认
connector.ack(batchId); // 提交确认 |
canal server在接收了客户端的ack后,就会记录客户端提交的最后位点,如果canal client没有提交位点,则下一次canal client启动的时候 会将最后记录的位点把日志重新推送过来,直到canal client提交ack确认为止。
Canal-server单点模式下,订阅延迟平均22.65毫秒,HA模式下,订阅延迟平均24.16毫秒,具体数据请参考附录。
停止正在工作的172.16.0.157的canal server,这时172.16.0.158会立马启动example instance,提供新的数据服务。与此同时,客户端也会随着canal server的切换,通过获取zookeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成。