cullinans 2019-12-29
logstash 执行过程
input -->filter -->output
filter 可以对数据进行处理
输出插件
codec plugin
使用脚本将数据导入到ES
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/db_example" jdbc_user => root jdbc_password => ymruan123 #启用追踪,如果为true,则需要指定tracking_column use_column_value => true #指定追踪的字段, tracking_column => "last_updated" #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型 tracking_column_type => "numeric" #记录最后一次运行的结果 record_last_run => true #上面运行结果的保存位置 last_run_metadata_path => "jdbc-position.txt" statement => "SELECT * FROM user where last_updated >:sql_last_value;" schedule => " * * * * * *" } } output { elasticsearch { document_id => "%{id}" document_type => "_doc" index => "users" hosts => ["http://localhost:9200"] } stdout{ codec => rubydebug } }
使用 logstash 执行
logstash -f mysqltoes.conf
使用别名查询索引
POST /_aliases { "actions": [ { "add": { "index": "users", "alias": "view_users", "filter" : { "term" : { "is_deleted" : false } } } } ] }
创建一个索引别名,过滤掉 只显示 is_deleted 为未删除的数据。
通过别名查询数据
POST view_users/_search { "query": { "term": { "name.keyword": { "value": "Jack" } } } }