sifeimeng 2020-01-09
安装logstash查看我的另一篇文章 Docker 部署 logstash
同步数据我们首先需要安装好对应的插件,然后下载对应的数据库链接jar包,下面是具体的步骤
1、进入容器中
docker exec it logstash bash
2、进入到bin 目录下,我这里是/usr/share/logstash/bin,可以看到logstash-plugin文件,然后安装插件
logstash-plugin install logstash-input-jdbc
3、看到如下输出,则表示安装成功
1、场景简介 比如我们需要检索资讯文章,单纯用mysql实现效率实在太低,特别是数据量大的时候。这时候我们就可以用到es,logstash定时把新增和更新的文章同步到es,业务上我们可以直接调用es的API检索文章。
2、在conf.d目录下配置jdbc.conf文件和jdbc.sql文件
(1)配置文件jdbc.conf
input { stdin {} jdbc { #连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连 jdbc_connection_string => "jdbc:mysql://192.168.16.241:3306/wsy_blockchain?characterEncoding=UTF-8&useSSL=false&autoReconnect=true&serverTimezone=UCT" jdbc_user => "root" jdbc_password => "12345678" #连接mysql的jar包存放路径 jdbc_driver_library => "/opt/mysql-connector-java-8.0.15.jar" #驱动 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" codec => plain { charset => "UTF-8" } #追踪的字段 我这里用的是创建时间(主要用来测试) tracking_column => createtime record_last_run => true #这个一直找不到原因为什么打开就会报错 last_run_metadata_path => "/usr/local/opt/logstash/lastrun/.logstash_jdbc_last_run" #这个一直找不到原因为什么打开就会报错 #jdbc_default_timezone => "Asia/Shanghai" #同步数据的语句存放位置 statement_filepath => "/usr/share/logstash/bin/jdbc.sql" #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录 clean_run => false #这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次 schedule => "* * * * *" type => "std" } } filter { json { source => "message" remove_field => ["message"] } ruby { code => "event.set(‘timestamp‘, event.get(‘@timestamp‘).time.localtime + 8*60*60)" } ruby { code => "event.set(‘@timestamp‘,event.get(‘timestamp‘))" } mutate { remove_field => ["timestamp"] } } output { elasticsearch { hosts => "192.168.16.241:9200" index => "test_data" document_type => "users" document_id => "%{id}" } stdout { codec => json_lines } }
(2)配置jdbc.sql
select id,nickname,mobile, createtime from users where createtime > :sql_last_value
3、启动配置文件jdbc.conf文件,开始同步数据
logstash -f jdbc.conf
4、可以看到开始同步了
主要遇到的两个问题
1、查询出的数据库用户创建时间少了8个小时
解决:数据库配置链接需要加参数 serverTimezone=UCT
2、同步数据时间戳少了8个小时,
解决:同步时过滤对应字段timestsmp,获取后加上8小时,如下
filter { ruby { code => "event.set(‘timestamp‘, event.get(‘@timestamp‘).time.localtime + 8*60*60)" } ruby { code => "event.set(‘@timestamp‘,event.get(‘timestamp‘))" } mutate { remove_field => ["timestamp"] } }
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。