娜娜 2020-07-20
input { jdbc { #jdbc驱动包位置 jdbc_driver_library => "D:\tools\elk\logstash-7.6.1\ojdbc8-12.2.0.1.jar" #jdbc驱动类 jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" # 数据库相关配置 jdbc_connection_string => "jdbc:oracle:thin:callcard///callcardsitoracle01.beta.hic.cloud:1521/callcardsit_srv" jdbc_user => "callcard" jdbc_password => "huawei123" # 是否清除sql_last_value的记录,需要增量同步时此字段必须为false; clean_run => true # 同步频率(分 时 天 月 年),默认每分钟同步一次; schedule => "*/10 * * * * *" use_column_value => true tracking_column_type => timestamp tracking_column => updated_at # 同步SQL statement =>"select * from user n where updated_at>:sql_last_value and updated_at<sysdate order by updated_at desc" # 索引类型,不需要指定type,否则会在同步ES后生成type字段 #type => "user" # 设置时区 #jdbc_default_timezone =>"Asia/Shanghai" } } filter { # 删除无用字段 mutate { remove_field => "@timestamp" remove_field => "@version" } # 时间+8个时区,思想是找临时变量,最后+8后替换 ruby { code => "event.set(‘@created_at‘, event.get(‘created_at‘).time.localtime + 8*60*60)" } ruby { code => "event.set(‘created_at‘,event.get(‘@created_at‘))" } mutate { remove_field => ["@created_at"] } } output { stdout { codec => rubydebug } #if [type]=="user" { elasticsearch { # ES host:port hosts => ["127.0.0.1:9200"] #将mysql数据加入blog索引下,会自动创建 index => "user" # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号_id document_id => "%{id}" } # } }