Cannal实现数据异构

oradbm 2017-08-15

问题:
在大型网站架构中,DB会采用分库分表来解决容量和性能的问题。但这带来个新的问题:比如不同维度的查询或者聚合查询
方案:
一般会通过数据异构机制来解决问题。

具体示例:
为提升系统的接单能力,需要对订单表进行分库分表,随之而来的问题:用户如何查询自己的订单列表?
方法1:扫描所有订单表,然后内存聚合,在大流量的架构中肯定是不行的;
方法2:双写,但是双写无法保证一致性;
方法3:订阅数据库变更日志,比如订阅mysql的binlog日志模拟数据库的主从同步机制,然后解析变更日志写到订单列表,从而实现数据异构,
这种机制也能保证数据的一致性。
比如,订单中心按照订单号分库分表,然后异构出:订单列表按照用户分库分表,商家订单,订单缓存,ES搜索

MYSQL主从复制
原理
--canal类似该原理



1.准备:
github:https://github.com/alibaba/canal
包括
1-canal的文档
2-server端
3-client端的
4-例子
5-源码包等等

2.canal概述
canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
说明:目前内部使用的同步,已经支持mysql5.x和Oracle部分版本的日志解析

canal server通过slave机制订阅数据库的binlong日志

基于日志增量订阅&消费支持的业务:
(1)数据库镜像
(2)数据库实时备份
(3)多级索引 (卖家和买家各自分库索引)
(4)search build
(5)业务cache刷新
(6)价格变化等重要业务消息

keyword:数据库同步,增量订阅&消费。


3.canal工作原理:
从上层来看,复制分成三步:
(1)  master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
(2) slave将master的binary log events拷贝到它的中继日志(relay log);
(3) slave重做中继日志中的事件,将改变反映它自己的数据。


4.部署canal:
4.0 前提:
Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH.
需要安装JDK


4.1 部署canal-server:
4.1.1数据库配置:
开启MySQL的binlog功能,并配置binlog模式为row。

在my.cnf 加入如下:
vi /etc/my.cnf
[mysqld] 
log-bin=mysql-bin #开启二进制日志 
binlog-format=ROW #选择row模式  ,不要使用statement或者mix模式
server_id=1  #配置主数据库ID,不能和从数据库重复,即不能和canal的slaveId重复,配置mysql replaction需要定义
binlog提供的三种记录模式:
见书,在使用Canal时建议使用row模式

另外在MYSQL中执行"show binary logs",将看到当前有哪些二进制文件及其大小

4.1.2在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)
CREATE USER canal IDENTIFIED BY 'canal';   
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; 
FLUSH PRIVILEGES;
说明:一定要重启,否则不生效,避免类似这样的错误

4.1.3下载canal https://github.com/alibaba/canal/releases
并解压到相应文件夹,比如我下载的是canal.deployer-1.0.24.tar.gz
mkdir /usr/server/canal
cd /usr/server
tar -zxvf canal.tar.gz -C  canal

canal 文件目录结构
drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin 
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf 
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib 
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs 

4.1.4修改配置-canal的数据库实例 instance.properties
说明:这里可以使用已经存在的 example,也可以新起实例,这个名字和客户端java类中写的名字需要一致
这里我们新配置一个canal Server实例

// vi /usr/server/canal/conf/example/instance.properties 
mkdir -p /usr/server/canal/conf/product
cp /usr/server/canal/conf/example/instance.properties  /usr/server/canal/conf/product/
vi  /usr/server/canal/conf/product/instance.properties

################################################# 
## mysql serverId  必须和master的SQL的ID不一致
canal.instance.mysql.slaveId = 101 
 
# position info:连接的数据库地址,从哪个二进制文件,哪个位置开始 
canal.instance.master.address = 127.0.0.1:3306
# MYSQL主库连接时,起始的binlog文件
canal.instance.master.journal.name =
# MYSQL主库连接时,起始的binlog文件 偏移量  
canal.instance.master.position =
# MYSQL主库连接时,起始的binlog文件 时间戳
canal.instance.master.timestamp =  
 
#从库
#canal.instance.standby.address =  
#canal.instance.standby.journal.name = 
#canal.instance.standby.position =  
#canal.instance.standby.timestamp =  
 
# username/password,需要改成自己的数据库信息 
canal.instance.dbUsername = canal   
canal.instance.dbPassword = canal 
canal.instance.defaultDatabaseName = canal_test 
canal.instance.connectionCharset = UTF-8

# 通过如下配置过滤订阅的是 哪个库中的哪些表,减少不必要的订阅;
# 比如只关注产品数据库,通过如下模式可只订阅产品数据库
# table regex 
# canal.instance.filter.regex = .*\\..* 
  canal.instance.filter.regex = product_\d+\\.*
 
################################################# 

说明:若多个库订阅,则需要配置多个实例,为每个数据库配置一个配置文件


4.1.5 进行canal Server 的配置,修改conf/canal.properties
vi /usr/server/canal/conf/canal.properties

canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=


# 当前canalserver上部署的实例,配置多个时用逗号分隔,此处配置product
canal.destinations= product

# 使用zk持久化模式,这样可以保证集群数据共享,共享HA
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

4.1.5然后cd到bin目录  启动和停止canal-server
启动 
/usr/server/canal/bin/startup.sh & tail -f /usr/server/canal/logs/canal/canal.log

停止
/usr/server/canal/bin/stop.sh 

验证启动状态,查看log文件
tail -f /usr/server/canal/logs/canal/canal.log

2014-07-18 10:21:08.525 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 
2014-07-18 10:21:08.609 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.12.109.201:11111] 
2014-07-18 10:21:09.037 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ...... 
---> 上述日志信息显示启动canal成功


*********canal server 集群**************
canal server可以部署一台,也可以部署多台,但是只有一台是活跃的,其它的作为备机;canalserver的高可用是通过zk维护的。

需要安装:zookeeper

配置文件如下修改:
vi /usr/server/canal/conf/canal.properties

canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=127.0.0.1:2181


4.2运行canal-client实例:
4.2.1 建立实例maven工程
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample 
[实践:手动创建Maven工程]

4.2.2 添加pom依赖:
<dependency> 
    <groupId>com.alibaba.otter</groupId> 
    <artifactId>canal.client</artifactId> 
    <version>1.0.12</version> 
</dependency> 

4.2.3 更新依赖 mvn install

4.2.4 ClientSamplet.Java 
实例代码
package canal.sample;
/**
 * Created by hp on 14-7-17.
 */ 
import java.net.InetSocketAddress; 
import java.util.List;
 
import com.alibaba.otter.canal.client.CanalConnector; 
import com.alibaba.otter.canal.common.utils.AddressUtils; 
import com.alibaba.otter.canal.protocol.Message; 
import com.alibaba.otter.canal.protocol.CanalEntry.Column; 
import com.alibaba.otter.canal.protocol.CanalEntry.Entry; 
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; 
import com.alibaba.otter.canal.protocol.CanalEntry.EventType; 
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; 
import com.alibaba.otter.canal.protocol.CanalEntry.RowData; 
import com.alibaba.otter.canal.client.*; 
import com.google.protobuf.InvalidProtocolBufferException;
//import org.jetbrains.annotations.NotNull;  
 
public class ClientSample { 
 
    public static void main(String args[]) throws Exception { 
        // 连接 canal Server 
        //CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.121",
                11111), "example", "", ""); 
                //11111), "chapter6", "", "");
       
        /**通过zk连接canal Server
        String zkServers = "192.168.1.121:2181";
        //目标实例:可以自定义一个,例如product
        String destination = "product";
        //连接 canal Server
        CanalConnector connector = CanalConnectors.newClusterConnector(zkServers, destination, "", "");
        **/
       
        int emptyCount = 0;  //空跑的次数
        int totalEmtryCount = 1200;//循环多少次为空时退出
        try { 
            connector.connect(); //连接
            connector.subscribe(".*\\..*"); //订阅所有,和不写此行一个效果
            //connector.subscribe("product_.*\\.product_.*");//订阅product数据库下的product表
            connector.rollback(); 
            while (emptyCount < totalEmtryCount) {
            //while(true){//一直循环
                //批量获取1000个日志(不确认模式)
                Message message = connector.getWithoutAck(1000);//这个值根据实际情况修改 
                long batchId = message.getId();
               
                //以下为空跑计数
                int size = message.getEntries().size(); 
                if (batchId == -1 || size == 0) { 
                    emptyCount++; 
                    System.out.println("empty count : " + emptyCount); 
                    try { 
                        Thread.sleep(1000); 
                    } catch (InterruptedException e) { 
                        e.printStackTrace(); 
                    } 
                } else { 
                    emptyCount = 0;
                    //做数据处理
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); 
                    printEntry(message.getEntries());
                } 
 
                connector.ack(batchId); // 提交确认 
                // connector.rollback(batchId); // 处理失败, 回滚数据 
            } 
 
            System.out.println("empty too many times, exit"); 
        } finally { 
            connector.disconnect(); 
        } 
    } 
 
    //private static void printEntry(@NotNull List<Entry> entrys) {
    private static void printEntry(List<Entry> entrys) throws InvalidProtocolBufferException {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { 
                continue; 
            } 
 
            //如果是行数据
            if(entry.getEntryType() == EntryType.ROWDATA){
                //则解析行变更
                RowChange rowChage = null; 
                try { 
                    rowChage = RowChange.parseFrom(entry.getStoreValue()); 
                } catch (Exception e) { 
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); 
                } 
               
                EventType eventType = rowChage.getEventType();
                //这里捕获binlog变更信息
//                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", 
//                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), 
//                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), 
//                        eventType));
               
                for (RowData rowData : rowChage.getRowDatasList()) {
                    //如果是删除,则获取删除的数据进行业务处理
                    if (eventType == EventType.DELETE) { 
                        printColumn(rowData.getBeforeColumnsList());
                        //List<Column> columns = rowData.getBeforeColumnsList();
                        //delete(columns);
                       
                    //如果是新增修改则获取新增修改数据进行处理
                    } else if (eventType == EventType.INSERT || eventType == eventType.UPDATE) { 
                        //printColumn(rowData.getAfterColumnsList()); 
                        List<Column> columns = rowData.getAfterColumnsList();
                        save(columns);
                       
                    } else { 
                        System.out.println("-------> before"); 
                        printColumn(rowData.getBeforeColumnsList()); 
                        System.out.println("-------> after"); 
                        printColumn(rowData.getAfterColumnsList()); 
                    } 
                }                
               
            }
           
        } 
    } 
   
    //新增的异构操作
    private static void save(List<Column> columns) {
        for (Column col:columns) {
            String name = col.getName();
            String value = col.getValue();
            System.out.println("name: "+ name + ",value:" + value);
           
            //name: uid,value:4
            //name: name,value:10
        }
       
    }

    //private static void printColumn(@NotNull List<Column> columns) { 
    private static void printColumn(List<Column> columns) { 
        for (Column column : columns) { 
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated()); 
        } 
    } 

报错[有个jar包没找到注释掉了]
import org.jetbrains.annotations.NotNull;

通过以上代码,捕获数据库日志变更,然后进行相关业务的处理。无论是数据异构还是缓存更新


4.2.5 运行java实例

启动后看到控制端信息:
empty count : 1 
empty count : 2 
empty count : 3 
empty count : 4 


4.2.6触发数据库变更
create table test ( 
    uid int (4) primary key not null auto_increment, 
    name varchar(10) not null
); 
 
insert into test (name) values('10'); 


4.2.7 client 抓取mysql信息:
================> binlog[mysql-bin.000016:3281] , name[canal_test,test] , eventType : INSERT 
uid : 7    update=false 
name : 10    update=false 
empty count : 1 
empty count : 2 

[发现没有捕获到信息]
tail -f /usr/server/canal/logs/example/example.log

2017-06-03 13:11:28.802 [destination = chapter6 , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status
2017-06-03 13:11:28.817 [destination = chapter6 , address = /127.0.0.1:3306 , EventParser] ERROR c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - dump address /127.0.0.1:3306 has an error, retrying. caused by
com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation
2017-06-03 13:11:28.820 [destination = chapter6 , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:chapter6[com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation
]

原因很简单:因为Mysql需要开启binlog,我设置了,但是没重启。


这样可可以捕获到mysql信息了:
================> binlog[mysql-bin.000001:557] , name[chapter6,test] , eventType : INSERT
uid : 1    update=true
name : 10    update=true
empty count : 1
empty count : 2


5、部署过程中产生问题:

(1)启动失败,log日志中地址正在使用
1、11111端口正在被占用 可以用 ls -i:11111 查看监听进程谁占用端口 或者 用 ps -ef | grep 11111 查看哪个进程占用端口号  然后 kill -9 进程号  杀掉占用进程
2、可以编辑 canal/conf/canal.properties 中的端口号 ,改为不占用的端口

(2)canal无法抓取mysql触发数据库改变的信息
1、检查mysql是否打开binlog写入功能  检查binlog 是否为行模式。
    show variables like "binlog_format" 

2、检查my.cnf 和 instance.properties 等配置文件填写信息是否正确。

3、检查client 代码 调试实例代码

4、版本兼容问题,canal 1.8 换成 canal 1.7 继续测试

5、查看所有日志文件 分析日志

相关推荐