ElasticSearch MySQL 增量同步

molong0 2016-11-26

主要用到了一个 JDBC importer for Elasticsearch的库。

想要增量同步,有一些先决条件。首先数据库中要维护一个update_time的时间戳,这个字段表示了该记录的最后更新时间。然后用上面的那个库,定时执行一个任务,这个任务中执行的sql就是根据时间戳判断该记录是否应该被更新。

这里先写一个最简单的例子来展示一下。

从上方插件官网中下载适合的dist包,然后解压。进入bin目录,可以看到一堆sh脚本。在bin目录下创建一个test.sh:

bin=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/bin
lib=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/lib

echo '{
    "type" : "jdbc",
    "statefile" : "statefile.json",
   "jdbc": {
        "url" : "jdbc:mysql://myaddr",
        "user" : "myuser",
        "password" : "mypwd",
        "type" : "mytype",
        "index": "myindex",
        "schedule" : "0 * * * * ?",
        "metrics" : {
            "enabled" : true
        },
        
       "sql" : [
            {
                "statement" : "select * from gd_actor_info where update_time > ?",
                "parameter" : [ "$metrics.lastexecutionstart" ]
            }
        ]
      
    }
}' | java \
       -cp "${lib}/*" \
       -Dlog4j.configurationFile=${bin}/log4j2.xml \
       org.xbib.tools.Runner \
       org.xbib.tools.JDBCImporter

schedule现在设置成每分钟都执行一次,是为了方便观察行为。statefile这一句是一定要加的。$metrics.lastexecutionstart就是这个脚本的关键所在了,这个指的是上一次脚本执行的时间,可以通过比较这个时间和数据库里的字段来判断是否要更新。

Elasticsearch mysql 增量同步 三表联合 脚本

上面简略的说了一下es同步数据脚本的大致情况,但是实际情况里肯定不会像上一篇里面的脚本那么简单。比如目前我就有三张表,两张实体表,一张关联表。大致实现如下:

bin目录建立一个statefile.json文件:

{
    "type" : "jdbc",
    "statefile" : "statefile.json",
   "jdbc": {
        "url" : "jdbc:mysql://",
        "user" : "",
        "password" : "",
        "type" : "actor",
        "index": "test",
           "schedule" : "0 * * * * ?",
        "metrics" : { 
      "lastexecutionstart" : "0",
      "lastexecutionend" : "0",
      "counter" : "1" 
    },  
        
       "sql" : [
            {
                "statement" : "select a.actor_id as _id ,a.*,GROUP_CONCAT(b.tag_name ) as tag_name from ( ( gd_actor_info as a  left join gd_actor_tag as ab on a.actor_id = ab.actor_id ) left join gd_tag_actor as b on ab.tag_id = b.tag_id)   where a.update_time >? or ab.update_time > ? group by a.actor_id ",
                "parameter" : [ "$metrics.lastexecutionstart" ,"$metrics.lastexecutionstart" ]
            }
        ]
      
    }
} 

主要是lastexecutionstart设置为0,为了让第一次执行能进行一次全量备份。

其实sh脚本信息也就都在上面了,再写一个就好了

ElasticSearch 的详细介绍:请点这里
ElasticSearch 的下载地址:请点这里 

相关推荐