日志平台(网关层) - 基于Openresty+ELKF+Kafka

偏头痛杨 2019-06-28

背景介绍

1、问题现状与尝试

没有做日志记录的线上系统,绝对是给系统运维人员留下的坑。尤其是前后端分离的项目,后端的接口日志可以解决对接、测试和运维时的很多问题。之前项目上发布的接口都是通过Oracle Service Bus(OSB)来做统一编排,在编排时加上日志记录,并将接口日志存储到数据库中。最后基于接口日志数据开发日志平台,来统一的接口日志分析。
但我们总不能为了记录日志而使用OSB,这样很不自由。今年我们有很多后台接口使用Spring来开发,后台程序的部署环境也不局限于Oracle中间件的环境。当某些场景时,脱离了OSB,我们该如何记录接口日志,这是本文要解决的问题。

在我写的Spring系列的文章中,有尝试过使用Spring的AOP来记录日志。在每个项目的代码中,定义一个记录日志的切面,该切面会对该项目下的所有接口做日志记录。
对于一个周期很长、规模很大的一个独立项目来说,这个方案是可行的。因为项目周期很长,花个两天做日志记录的AOP开发没啥问题,而且这个日志更契合该系统的业务特征。
但我们团队所面对的开发,基本上都是数量多、周期短的一些小项目。一个项目的开发周期可能只有十天,就算每个项目在日志记录上只用一天的工作量,所占的比重也有十分之一。如果我们每个项目都要独立的记录日志,累积的工作量也挺大的,而且重复这样的工作很枯燥。
就像面向切面编程(AOP),在一个项目的所有接口上设置“切面”统一编程。如果我们的能在所有的项目上设置“切面”统一编程,就能解决我们现在的问题。这个“切面”就是网关。

2、方案设计

这个方案是公司内的两位技术大佬讨论出来的,这样惊奇的想法,让之前困扰的一切迷雾都豁然开朗了起来。我花了两天做了个Demo,验证方案的确行得通,下文会附上本次Demo中实战操作的代码。
简单来说,所有项目接口都通过Nginx的网关,而我们不需要在代码层面上收集日志,而是在Nginx上获取想要的日志信息,配合ELKF(Elasticsearch、Logstash、Kibana、Filebeat)的解决方案,实现统一的日志平台搭建:

  1. Nginx+Lua编程,按照我们定义的格式,所有通过网关的接口都会留下日志信息,写入log文件。
  2. Filebeat收集数据,Filebeat实时监测目标log文件,收集数据推送给Logstash。
  3. Logstash过滤处理数据,Logstash过滤处理数据后,会将数据同时推送给Elasticsearch和Kafka。
  4. Elasticsearch+Kibana,Elasticsearch作为数据的搜索引擎,而且利用Kibana的可视化界面,将日志数据以报表的形式显示出来。
  5. Kafka消息队列中间件,日志的数据被推送到Kafka上之后发布消息,而所有订阅者就能从队列中读数据。本次就是写程序实时的读取队列中的数据,存入数据库。

日志平台(网关层) - 基于Openresty+ELKF+Kafka

3、系统环境

在本次Demo中,由于资源限制,所有的产品服务都将部署在一台服务器上,服务器上的相关环境如下:

配置项环境配置信息
服务器阿里云服务器ECS(公网:47.96.238.21 ,私网:172.16.187.25)
服务器配置2 vCPU + 4 GB内存
JDK版本JDK 1.8.0_181
操作系统CentOS 7.4 64位
OpenResty1.13.6.2
Filebeat6.2.4
Elasticsearch6.2.4
Logstash6.2.4
Kibana6.2.4
Kafka2.10-0.10.2.1

基于OpenResty的日志记录

OpenResty® 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。
我们选择OpenResty的目的有两个:(1)使用Lua编程,可以在Nginx上更好的拿到想要的日志信息;(2)系统其它功能模块的集成,例如Jwt的集成,可参考同事写的文章《Nginx实现JWT验证-基于OpenResty实现》。

1、OpenResty安装

在安装OpenResty之前需要先安装好依赖库,OpenResty 依赖库有: perl 5.6.1+, libreadline, libpcre, libssl。我们是CentOS系统,可以直接yum来安装。

[root@Kerry ~]# yum install readline-devel pcre-devel openssl-devel perl

接下来我们在当前CentOS系统上使用新的官方 yum 源

[root@Kerry ~]# yum install yum-utils
[root@Kerry ~]# yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo

这时我们就可以直接安装OpenResty

[root@Kerry ~]# yum install openresty
[root@Kerry ~]# yum install openresty-resty

这样OpenResty就安装完成了,默认情况下程序会被安装到 /usr/local/openresty 目录

# 可查看安装成功
[root@Kerry ~]# cd /usr/local/openresty/bin/
[root@Kerry bin]# ./openresty -v
nginx version: openresty/1.13.6.2

# 设置环境变量
[root@Kerry sbin]# vi /etc/profile
# 在文件最后面加上 export PATH=${PATH}:/usr/local/openresty/nginx/sbin
[root@Kerry sbin]# source /etc/profile

2、记录Nginx日志

OpenResty 安装之后就有配置文件及相关的目录的,为了工作目录与安装目录互不干扰,我们单独建一个工作目录。我在根目录下新建了 /openrestyTest/v1/ 的文件夹,并在该目录下创建 logs 和 conf 子目录分别用于存放日志和配置文件。

[root@Kerry ~]# mkdir /openrestyTest /openrestyTest/v1 /openrestyTest/v1/conf /openrestyTest/v1/logs
[root@Kerry ~]# cd /openrestyTest/v1/conf/
# 创建并编辑 nginx.conf
[root@Kerry conf]# vi nginx.conf

在nginx.conf中复制以下文本作为测试

worker_processes  1;        #nginx worker 数量
error_log logs/error.log;   #指定错误日志文件路径
events {
    worker_connections 1024;
}

http {
    server {
        #监听端口,若你的6699端口已经被占用,则需要修改
        listen 6699;
        location / {
            default_type text/html;

            content_by_lua_block {
                ngx.say("HelloWorld")
            }
        }
    }
}

该语法是基于Lua,监听6699端口,输出HelloWorld。我们现在启动Openresty中的Nginx。

[root@Kerry ~]# /usr/local/openresty/nginx/sbin/nginx -p '/openrestyTest/v1/' -c conf/nginx.conf
# 由于配置或环境变量,也可以直接使用
[root@Kerry ~]# nginx -p '/openrestyTest/v1/' -c conf/nginx.conf
[root@Kerry conf]# curl http://localhost:6699
HelloWorld

访问该端口地址,成功的显示HelloWorld。我提前在本服务器的Tomcat上部署了一个接口,端口是8080。我的想法是将8080反向代理成9000,将所有通过8080端口的服务的日志信息获取到,并输出到本地的log文件中。
我暂时需要记录的日志内容包括:接口地址,请求内容,请求时间,响应内容,响应时间等。代码写好了,直接替换 /openrestyTest/v1/conf/nginx.conf 的文件内容。

worker_processes  1;
error_log logs/error.log;

events {
    worker_connections 1024;
}

http {
log_format myformat '{"status":"$status","requestTime":"$requestTime","responseTime":"$responseTime","requestURL":"$requestURL","method":"$method","requestContent":"$request_body","responseContent":"$responseContent"}';
access_log logs/test.log myformat;

upstream tomcatTest {
    server 47.96.238.21:8080;
}

server {
        server_name 47.96.238.21;
        listen 9000;
        # 默认读取 body
        lua_need_request_body on;

        location / {
                log_escape_non_ascii off;
                proxy_pass  http://tomcatTest;
                set $requestURL '';
                set $method '';
                set $requestTime '';
                set $responseTime '';
                set $responseContent '';

                body_filter_by_lua '
                        ngx.var.requestTime=os.date("%Y-%m-%d %H:%M:%S")

                        ngx.var.requestURL=ngx.var.scheme.."://"..ngx.var.server_name..":"..ngx.var.server_port..ngx.var.request_uri
                        ngx.var.method=ngx.var.request_uri

                        local resp_body = string.sub(ngx.arg[1], 1, 1000)
                        ngx.ctx.buffered = (ngx.ctx.buffered or"") .. resp_body
                        if ngx.arg[2] then
                                ngx.var.responseContent = ngx.ctx.buffered
                        end

                        ngx.var.responseTime=os.date("%Y-%m-%d %H:%M:%S")
                  ';

        }

    }
}

重新启动Nginx,然后进行验证

[root@Kerry conf]# nginx -p '/openrestyTest/v1/' -c conf/nginx.conf -s reload

我准备好的接口地址为:http://47.96.238.21:8080/springboot-demo/hello ,该接口返回的结果都是“Hello!Spring boot”。
现在用POST方式调用接口http://47.96.238.21:9000/springboot-demo/hello,Request中使用application/json方式输入内容:“segmentFault《日志平台(网关层) - 基于Openresty+ELKF+Kafka》”。然后查看logs文件夹,发现多了个 test.log 文件,我们查看该文件。就可以发现,当我们每调用一次接口,就会同步的输出接口日志到该文件中。

[root@Kerry conf]#  tail -500f /openrestyTest/v1/logs/test.log
{"status":"200","requestTime":"2018-10-11 18:09:02","responseTime":"2018-10-11 18:09:02","requestURL":"http://47.96.238.21:9000/springboot-demo/hello","method":"/springboot-demo/hello","requestContent":"segmentFault《日志平台(网关层) - 基于Openresty+ELKF+Kafka》","responseContent":"Hello!Spring boot!"}

到此为止,提取经过Nginx网关的接口信息,并将其写入日志文件就完成了,所有的接口日志都写入了 test.log 文件中。

E+L+K+F=日志数据处理

ELKF是 Elastic + Logstash + Kibana + FileBeat 四个组件的组合,可能ELK对于大家来说更熟悉,ELKF只不过多了Filebeat,它们都是Elastic公司推出的开源产品。刚好这几天Elastic公司成功上市,掀起了一波ELKF产品讨论的热潮。
原ELK架构中,Logstash负责收集日志信息并上报,但后来Elastic公司又推出了Filebeat,大家发现Filebeat在日志文件收集上效果更好,就只让Logstash负责日志的处理和上报了。在这个系统中,Elastic充当一个搜索引擎,Logstash为日志分析上报系统,FileBeat为日志文件收集系统,Kibana为此系统提供可视化的Web界面。
日志平台(网关层) - 基于Openresty+ELKF+Kafka

1、Filebeat安装配置

Filebeat:轻量型日志采集器,负责采集文件形式的日志,并将采集来的日志推送给logstash进行处理。

[root@Kerry ~]# cd /u01/install/
[root@Kerry install]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.4-x86_64.rpm
[root@Kerry install]# yum localinstall -y filebeat-6.2.4-x86_64.rpm

安装完成后,我们开始配置Filebeat来采集日志,并推送给Logstash。

[root@Kerry install]# cd /etc/filebeat/
[root@Kerry filebeat]# vi filebeat.yml

该filebeat.yml是filebeat的配置文件,里面大部分的模块都被注释了,本次配置放开的代码有;

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /openrestyTest/v1/logs/*.log
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3
output.logstash:
  hosts: ["47.96.238.21:5044"]

监听 /openrestyTest/v1/logs/ 目录下的log文件,采集的日志信息输出到logstash,该hosts等我们安装启动了Logstash再说,先启动Filebeat。

[root@Kerry filebeat]# cd /usr/share/filebeat/bin/
[root@Kerry bin]# touch admin.out
[root@Kerry bin]# nohup ./filebeat -e -c /etc/filebeat/filebeat.yml > admin.out &
# 查看admin.out 日志,是否启动成功

2、Logstash安装配置

Logstash:日志处理工具,负责日志收集、转换、解析等,并将解析后的日志推送给ElasticSearch进行检索。

[root@Kerry ~]# cd /u01/install/
[root@Kerry install]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.rpm
[root@Kerry install]# yum localinstall -y logstash-6.2.4.rpm
#Logstash不建议用root启动
[root@Kerry install]# group add logstash
[root@Kerry install]# useradd -g logstash logstash
[root@Kerry install]# passwd logstash
# 设置密码
[root@Kerry install]# su logstash
[root@Kerry install]# mkdir -pv /data/logstash/{data,logs}
[root@Kerry install]# chown -R logstash.logstash /data/logstash/
[root@Kerry install]# vi /etc/logstash/conf.d/logstash.conf

创建并编辑/etc/logstash/conf.d/logstash.conf 文件,配置如下:

input {
  beats {
    port => 5044
    codec => plain {
          charset => "UTF-8"
    }
  }
}

output {
  elasticsearch {
    hosts => "47.96.238.21:9200"
    manage_template => false
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
  }
}

1、input:是指Logstash的数据来源,启动后使用5044来监听,是否很熟悉,就是上节Filebeat推送日志的hosts。
2、output;是Logstash输出数据的位置,我们这里定义为elasticsearch,下文中会说到,用于ELK架构中的日志分析

接下来我们修改/etc/logstash/logstash.yml

#vim /etc/logstash/logstash.yml
path.data: /data/logstash/data
path.logs: /data/logstash/logs

现在可以启动Logstash了

[root@Kerry install]# su logstash
[logstash@Kerry root]$ cd /usr/share/logstash/bin/
[logstash@Kerry bin]$ touch admin.out
[logstash@Kerry bin]$ nohup ./logstash -f /etc/logstash/conf.d/logstash.conf >admin.out &

3、Elasticsearch安装配置

ElasticSearch:是一个分布式的RESTful风格的搜索和数据分析引擎,同时还提供了集中存储功能,它主要负责将logstash抓取来的日志数据进行检索、查询、分析等。

[root@Kerry ~]# cd /u01/install/
[root@Kerry install]# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.4.rpm
[root@Kerry install]# yum localinstall -y elasticsearch-6.2.4.rpm
#Elasticsearch不建议用root启动
[root@Kerry install]# group add elsearch
[root@Kerry install]# useradd -g elsearch elsearch
[root@Kerry install]# passwd elsearch
# 设置密码
[root@Kerry install]# su elsearch
[elsearch@Kerry bin]$  mkdir -pv /data/elasticsearch/{data,logs}
[elsearch@Kerry bin]$  chown -R elsearch.elsearch /data/elasticsearch/
[elsearch@Kerry bin]$  vi /etc/elasticsearch/elasticsearch.yml
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
network.host: 0.0.0.0
http.port: 9200

如果想要外网能访问,host就必须要设成0.0.0.0。Elasticsearch的启动如下

[root@Kerry install]# su elsearch
[elsearch@Kerry bin]$ cd /usr/share/elasticsearch/bin/
[elsearch@Kerry bin]$ ./elasticsearch -d
# -d 保证后台启动

4、Kibana安装配置

Kibana:Web前端,可以将ElasticSearch检索后的日志转化为各种图表,为用户提供数据可视化支持。

[root@Kerry ~]# cd /u01/install/
[root@Kerry install]# wget https://artifacts.elastic.co/downloads/kibana/kibana-6.2.4-x86_64.rpm
[root@Kerry install]# yum localinstall -y kibana-6.2.4-x86_64.rpm
[root@Kerry install]# vi /etc/kibana/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.url: "http://47.96.238.21:9200"

同样的,host为0.0.0.0,保证外网能访问。Kibana只作为前端展示,日志数据的获取还是借助于elasticsearch,所以这里配置了elasticsearch.url。接着启动Kibana,就能通过页面看到日志的报表。

[root@Kerry ~]# cd /usr/share/kibana/bin/
[root@Kerry bin]# touch admin.out
[root@Kerry bin]# nohup ./kibana >admin.out &

我们在浏览器上访问 http://47.96.238.21:5601/ ,正常来说就能访问Kibana的页面。如果 ELKF一整套配置没问题,就能在Kibana的页面上实时的看到所有日志信息。

日志平台(网关层) - 基于Openresty+ELKF+Kafka

从Kafka到数据库

在拿到日志的数据后,通过Elasticsearch和Kibana,已经完成了一个日志查看的平台。但我们自己项目内部也已经开发了日志平台,希望把这些日志接入到之前的日志平台中;或者我们希望定制化一个更符合实际使用的日志平台,这些都需要把拿到的日志数据存储到数据库里。
但所有日志的记录,很明显处于高并发环境,很容易由于来不及同步处理,导致请求发生堵塞。比如说,大量的insert,update之类的请求同时到达数据库,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。在比对市场上开源的消息中间件后,我选择了Kafka。
Apache Kafka是一个分布式的发布-订阅消息系统,能够支撑海量数据的数据传递。在离线和实时的消息处理业务系统中,Kafka都有广泛的应用。Kafka将消息持久化到磁盘中,并对消息创建了备份保证了数据的安全。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

日志平台(网关层) - 基于Openresty+ELKF+Kafka

  • Broker:Kafka的broker是无状态的,broker使用Zookeeper维护集群的状态。Leader的选举也由Zookeeper负责。
  • Zookeeper:Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
  • Producer:生产者将数据推送到broker上,当集群中出现新的broker时,所有的生产者将会搜寻到这个新的broker,并自动将数据发送到这个broker上。
  • Consumer:因为Kafka的broker是无状态的,所以consumer必须使用partition
    offset来记录消费了多少数据。如果一个consumer指定了一个topic的offset,意味着该consumer已经消费了该offset之前的所有数据。consumer可以通过指定offset,从topic的指定位置开始消费数据。consumer的offset存储在Zookeeper中。

1、Kafka安装与配置

我们开始Kafka的安装和启动

# 安装
[root@Kerry ~]# cd /u01/install/
[root@Kerry install]# wget http://apache.fayea.com/kafka/0.10.2.1/kafka_2.10-0.10.2.1.tgz
[root@Kerry install]# tar -zvxf kafka_2.10-0.10.2.1.tgz -C /usr/local/
[root@Kerry install]# cd /usr/local/
[root@Kerry local]# mv kafka_2.10-0.10.2.1 kafka
# 启动
[root@Kerry local]# cd /usr/local/kafka/bin/
[root@Kerry bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
[root@Kerry bin]# touch admin.out
[root@Kerry bin]# nohup ./kafka-server-start.sh ../config/server.properties >admin.out &

创建一个topic,命名为 kerry

[root@Kerry bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kerry
# topic创建成功,下面查看一下
[root@Kerry bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
kerry

我们往这个topic中发送信息

[root@Kerry bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic kerry
Hello Kerry!this is the message for test

我们再开一个窗口,从topic中接受消息

[root@Kerry bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kerry --from-beginning
Hello Kerry!this is the message for test
# 能成功接收到

2、生产者:Logstash

Kafka已经安装好了,也建好了topic,而我希望往topic中发送消息的对象(生产者)是Logstash。即Logstash从Filebeat中获取数据后,除了输出给Elasticsearch以外,还输出给Logstash,Logstash作为Kafka的生产者。
这里需要修改一下Logstash的配置文件,在output中再加上kafka的信息

vi /etc/logstash/conf.d/logstash.conf
input {
  beats {
    port => 5044
    codec => plain {
          charset => "UTF-8"
    }
  }
}

output {
  elasticsearch {
    hosts => "47.96.238.21:9200"
    manage_template => false
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
  }
  kafka {
    bootstrap_servers => "localhost:9092"    #生产者
    topic_id => "kerry"    #设置写入kafka的topic
    compression_type => "snappy"
    codec => plain {
            format => "%{message}"
        }
  }
}

重启Logstash

[root@Kerry bin]# cd /usr/share/logstash/bin
[root@Kerry bin]# ps -ef|grep logstash 
# kill 进程
[root@Kerry bin]# nohup ./logstash -f /etc/logstash/conf.d/logstash.conf >admin.out &

我们再用POST方式调用之前的测试接口http://47.96.238.21:9000/springboot-demo/hello,请求request为:“这是对kafka的测试”。然后再查看从topic中接受消息

[root@Kerry bin]#./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kerry --from-beginning
{"status":"200","requestTime":"2018-10-12 09:40:02","responseTime":"2018-10-12 09:40:02","requestURL":"http://47.96.238.21:9000/springboot-demo/hello","method":"/springboot-demo/hello","requestContent":"这是对kafka的测试","responseContent":"Hello!Spring boot!"}

可以成功的接收到推送过来的日志消息

3、消费者:Springboot编程

日志已经可以保证能够持续不断的推送到Kafka中,那么就需要有消费者订阅这些消息,写入到数据库。我用Spring boot写了个程序,用来订阅Kafka的日志,重要代码如下:
1、application.yml

spring:
  # kafka
  kafka:
    # kafka服务器地址(可以多个)
    bootstrap-servers: 47.96.238.21:9092
    consumer:
      # 指定一个默认的组名
      group-id: kafka1
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
      # 服务器地址
      bootstrap-servers: 47.96.238.21:9092

2、POM.xml

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.0.6.RELEASE</version>
        </dependency>

3、KafkaController.java

package df.log.kafka.nginxlog.controller;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.naming.InitialContext;
import javax.sql.DataSource;
import java.sql.Connection;


@RestController
@EnableAutoConfiguration
public class KafkaController {

    @RequestMapping("/hello")
    public String hello(){
        return "Hello!Kerry. This is NginxLog program";
    }
    /**
     * 监听信息
     */
    @KafkaListener(topics = "kerry" )
    public void receive(ConsumerRecord<?, ?> consumer) {
        // kafkaLog 就是获取到的日志信息
        String kafkaLog = (String) consumer.value();
        System.out.println("收到一条消息:"+kafkaLog);
        // 存入数据库的代码省略
    }

}

当程序部署之后,@KafkaListener(topics = "kerry") 会持续监听topics 为kerry的消息。我们再调用之前的测试接口,会发现新的接口日志会被持续监听到,在控制台上打印出来,并存入数据库。

日志平台(网关层) - 基于Openresty+ELKF+Kafka

尾声

本次操作文档是记录Demo的过程,很多地方并不成熟,例如:如何在 Nginx+Lua 时获取更加全面的日志信息;在Logstash上对日志进行再加工;写出漂亮的Spring boot 代码,使得能够很平缓的做写入数据库,用好Kibana的图表等等。
我们下一步就是在项目的生产环境上正式的搭建日志平台,我们已经有了rancher环境,这套架构计划用微服务的方式实现。后续的搭建文档会持续更新。

相关推荐