molong0 2016-11-26
主要用到了一个 JDBC importer for Elasticsearch的库。
想要增量同步,有一些先决条件。首先数据库中要维护一个update_time的时间戳,这个字段表示了该记录的最后更新时间。然后用上面的那个库,定时执行一个任务,这个任务中执行的sql就是根据时间戳判断该记录是否应该被更新。
这里先写一个最简单的例子来展示一下。
从上方插件官网中下载适合的dist包,然后解压。进入bin目录,可以看到一堆sh脚本。在bin目录下创建一个test.sh:
bin=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/bin lib=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/lib echo '{ "type" : "jdbc", "statefile" : "statefile.json", "jdbc": { "url" : "jdbc:mysql://myaddr", "user" : "myuser", "password" : "mypwd", "type" : "mytype", "index": "myindex", "schedule" : "0 * * * * ?", "metrics" : { "enabled" : true }, "sql" : [ { "statement" : "select * from gd_actor_info where update_time > ?", "parameter" : [ "$metrics.lastexecutionstart" ] } ] } }' | java \ -cp "${lib}/*" \ -Dlog4j.configurationFile=${bin}/log4j2.xml \ org.xbib.tools.Runner \ org.xbib.tools.JDBCImporter
schedule现在设置成每分钟都执行一次,是为了方便观察行为。statefile这一句是一定要加的。$metrics.lastexecutionstart就是这个脚本的关键所在了,这个指的是上一次脚本执行的时间,可以通过比较这个时间和数据库里的字段来判断是否要更新。
Elasticsearch mysql 增量同步 三表联合 脚本
上面简略的说了一下es同步数据脚本的大致情况,但是实际情况里肯定不会像上一篇里面的脚本那么简单。比如目前我就有三张表,两张实体表,一张关联表。大致实现如下:
bin目录建立一个statefile.json文件:
{ "type" : "jdbc", "statefile" : "statefile.json", "jdbc": { "url" : "jdbc:mysql://", "user" : "", "password" : "", "type" : "actor", "index": "test", "schedule" : "0 * * * * ?", "metrics" : { "lastexecutionstart" : "0", "lastexecutionend" : "0", "counter" : "1" }, "sql" : [ { "statement" : "select a.actor_id as _id ,a.*,GROUP_CONCAT(b.tag_name ) as tag_name from ( ( gd_actor_info as a left join gd_actor_tag as ab on a.actor_id = ab.actor_id ) left join gd_tag_actor as b on ab.tag_id = b.tag_id) where a.update_time >? or ab.update_time > ? group by a.actor_id ", "parameter" : [ "$metrics.lastexecutionstart" ,"$metrics.lastexecutionstart" ] } ] } }
主要是lastexecutionstart设置为0,为了让第一次执行能进行一次全量备份。
其实sh脚本信息也就都在上面了,再写一个就好了
ElasticSearch 的详细介绍:请点这里
ElasticSearch 的下载地址:请点这里
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。