wenwentana 2019-12-21
Logstash:把MySQL数据导入到Elasticsearch中
需要安装好Elasticsearch及Kibana。
根据不同的操作系统我们分别对MySQL进行安装。我们可以访问网页来对MySQL进行安装。等我们安装完我们的MySQL后,在我们的terminal中,打入如下的命令来检查MySQL的版本:
$ /usr/local/mysql/bin/mysql -V /usr/local/mysql/bin/mysql Ver 8.0.17 for macos10.14 on x86_64 (MySQL Community Server - GPL)
在上一步中,已经知道了mysql的版本信息。需要下载相应的JDBC connector。在地址https://dev.mysql.com/downloads/connector/j/
下载最新的Connector。下载完这个Connector后,把这个connector存入到Logstash安装目录下的如下子目录中。
$ ls logstash-core/lib/jars/mysql-connector-java-8.0.17.jar logstash-core/lib/jars/mysql-connector-java-8.0.17.jar
这样我们的安装就完成了。
采用把一个CSV文件导入到MySQL中的办法来形成一个MySQL的数据库。CSV文件下载地址:
https://github.com/liu-xiao-guo/sample_csv
在上面的sample_csv中,有一个SalesJan2009.csv文件。通过MySQL的前端工具把这个导入到MySQL数据库中。
这样MySQL的数据库data里含有一个叫做SalesJan2009的数据就建立好了。
对Logstash做如下的配置sales.conf:
input { jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/data" jdbc_user => "root" jdbc_password => "YourMyQLPassword" jdbc_validate_connection => true jdbc_driver_library => "" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" parameters => { "Product_id" => "Product1" } statement => "SELECT * FROM SalesJan2009 WHERE Product = :Product_id" } } filter { mutate { rename => { "longitude" => "[location][lon]" "latitude" => "[location][lat]" } } } output { stdout { } elasticsearch { index => "sales" hosts => "localhost:9200" document_type => "_doc" } }
在这里,必须替换jdbc_user和jdbc_password为自己的MySQL账号的用户名及密码。特别值得指出的是jdbc_driver_library按elastic的文档是可以放入JDBC驱动的路径及驱动名称。实践证明如果这个驱动不在JAVA的classpath里,也是不能被正确地加载。正因为这样的原因,在上一步里把驱动mysql-connector-java-8.0.17.jar放入到Logstash的jar目录里,所以这里就直接填入空字符串。
接下来我们运行Logstash来加载我们的MySQL里的数据到Elasticsearch中:
./bin/logstash --debug -f ~/data/sales.conf
在这里把sales.conf置于用户home目录下的data子目录中。
我们可以在Kibana中查看到最新的导入到Elasticsearch中的数据
这里显示在sales索引中有847个文档。一旦数据进入到我们的Elastic,我们可以对数据进行分析:
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。