superviser000 2020-06-28
1.概述
logstash是一个日志转化系统,用户通过定义一个input,filter,和一个output配置来完成日志的收集和存储工作。
2.数据类型
bool debug => true bytes
input { file { path => ["/usr/local/logstash/logstash-tutorial-dataset"] type => "file_monitor" tags => ["有用的","标识用的"] start_position => "beginning" } }
定义输出
output { jdbc { driver_jar_path => "D:\repo\mysql\mysql-connector-java\5.1.40\mysql-connector-java-5.1.40.jar" driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://sss:8840/testcase" username => "sss" password => "csssd" statement => ["INSERT INTO job_function_20190621 ( code_val, name_val, level_val, source_name, version ) VALUES (?,?,?,?,?)","code","name","level","source_name","current_version"] } stdout {} }
定义fliter
filter { grok { match => {"@timestamp" => "%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day}" } add_field => { "current_version" => "%{year}%{month}%{day}"} } jdbc_streaming { input jdbc_driver_library => "D:\repo\mysql\mysql-connector-java\5.1.40\mysql-connector-java-5.1.40.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://xxx:3306/xxx" jdbc_user => "xxx" jdbc_password => "xxx" statement => "SELECT location_name_cn FROM dict_location WHERE location_code = :codeParam" parameters => { "codeParam" => "code"} target => "code" } if [code] and [code][0] and ("location_name_cn" in [code][0]) { ruby { code => " r = ‘‘ event.get(‘code‘).each do |variable| # puts variable[‘location_name_cn‘] r = r + variable[‘location_name_cn‘] + ‘;‘ end event.set(‘code‘,r) " } } else { mutate { replace => { "code" => ""} } } }
elk中l即为logstash,在java系统是通过logback定义appender向网络写入数据,在logstash定义tcp的input,并定义es的输出。