jiaomrswang 2019-06-26
进行文章的第二次修改,包括了之前的简单方案的升级过程。
因为业务的不断更新升级,为了保证线上业务也能正常使用elk服务,并且使得elk的服务和线业务流解耦(即避免直接写入es的方式可能会带来的耗时影响)所以我们采用了下面最新的方案,也是常规方案
业务层 >> kafka队列 >> logstash 消费 >> elasticsearch
业务层将日志写入到kafka队列,同事logstash可以开启多个线程,启用同一个group_id来对kafka进行消费,将读取到的日志进行解析后,写入到elasticsearch中,并且按照索引模板进行解析。
业务层可以直接写入到kafka队列中,不用担心elasticsearch的写入效率问题。
比起之前的简单版本,需要保证kafka队列、logstash的高可用(虽然logstash挂掉后,可以重启后重新读取队列日志)
整个搭建过程,写入kafka是非常简单的,这里遇到的问题是logstash和elasticsearch索引模板带来的困扰。
如何确定使用谁的模板呢?
网上找到的资料,建议采用将模板配置在elasticsearch侧,这样就不用每个logstash进行一个模板配置文件的维护。
官网的文档kafka的input插件
https://www.elastic.co/guide/...
input {
kafka {
// 需要读取的kafka队列集群配置
bootstrap_servers => "xxx.xxx.xxx.xxx:9092"
// 配置的消费者的group名称,因为同一个组内的消费消息不会重复
group_id => "logstash-group"
// 主题配置
topics => "kibana_log"
// 从未消费过的偏移量开始
auto_offset_reset =>"earliest"
}
}
// 重要,下面单独讲
filter {
json {
source => "message"
}
}
output {
// stdout可以省略,这个是为了命令行模式下方便调试
stdout{ codec => rubydebug }
elasticsearch {
// es 集群
hosts=>"xxx.xxx.xxx.xxx:9200"
// 重要:取消logstash自定义模板功能,进而强制使用es的内置模板
manage_template=>false
// 需要匹配的模板名称
index=>"logstash-dev-%{+YYYY.MM.dd}"
}
}先解释下filter配置,当我们从kafka读取消息的时候,消息体是通过message字段来进行传递的,所以message是一个字符串,但是我们的es索引模板可能会非常复杂,所以我们需要对其进行json解析后,再交给es。否则es收到的之后一个message字段。
filter {
json {
source => "message"
}
}再说下模板配置,首先通过kibana的devtool向es中写入了一个模板,我区分了两套环境dev、prod。
这里字段都进行了strval转义,为什么呢?这和下面要讲的动态模板有关联的。往下看
$position = YnUtil::getPosition();
$urlData = parse_url(\Wii::app()->request->url);
$path = $urlData['path'] ?? '';
$params = [
'category' => strval($category),
'appType' => strval(YnUtil::getAppType()),
'appVersion' => strval(YnUtil::getAppVersion()),
'host' => strval(\Wii::app()->request->hostInfo),
'uri' => strval(\Wii::app()->request->url),
'uid' => strval(\Wii::app()->user->getUid()),
'path' => strval($path),
'server' => strval(gethostname()),
'geoip' => [
'ip' => strval(\Yii::$app->request->userIP),
'location' => [
'lat' => floatval($position['latitude']),
'lon' => floatval($position['longitude']),
],
],
'userAgent' => strval(\Wii::app()->request->userAgent),
'message' => is_array($message) ? Json::encode($message) : strval($message),
// '@timestamp' => intval(microtime(true) * 1000),
];下面的模板是写入到es里面的自定义模板,为了防止索引规则名称冲突,这里将order置为1。
我们先来看下第一个模板(这个是不推荐的,因为很繁琐,但是类型很强制有效)
PUT _template/logstash-dev
{
"index_patterns": "logstash-dev*",
"aliases": {},
"order":1,
"mappings": {
// 这里使用logs是因为logstash默认的type类型
"logs": {
// 动态模板
"dynamic_templates": [
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"norms": false,
"type": "text"
}
}
}
],
// 这里对属性进行了类型设置
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "keyword"
},
"appType": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"appVersion": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"category": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"geoip": {
"dynamic": "true",
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"host": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"message": {
"type": "text",
"norms": false
},
"server": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"uid": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"uri": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"path": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"userAgent": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
},
"settings": {
"index": {
"number_of_shards": "1"
}
}
}上面的模板不是最优的,但是却是一种尝试,模板里面针对每个属性做了设置,这样客户端只需要写入对应的属性就好了。但是如何动态配置呢?如果想加入一个字段,难道还要修改模板么? 这里引入了动态模板的概念
Only the following datatypes can be automatically detected: boolean, date, double, long, object, string. It also accepts * to match all datatypes.
动态映射
https://www.elastic.co/guide/...
动态模板(注意看match和match_mapping_type)
https://www.elastic.co/guide/...
映射属性
https://www.elastic.co/guide/...
然后我们给出了一个全新的模板
这个模板里面只针对特殊的属性进行了设置,其他的都是通过动态模板扩展的,下面看下效果。
PUT _template/logstash-dev
{
"index_patterns": "logstash-dev*",
"aliases": {},
"order":1,
"mappings": {
"logs": {
"dynamic_templates": [
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"norms": false,
"type": "text"
}
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "keyword"
},
"geoip": {
"dynamic": "true",
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
}
}
}
},
"settings": {
"index": {
"number_of_shards": "1"
}
}
}我们上面说了,全都进行了strval转义,为什么呢?因为动态模板里面,匹配的是string类型,如果我们写入的是一个int类型,那么就不会进行自动扩展了。试验后表明,会生成一个int类型的message字段,这样是不合理的。最终生成的效果是如下图的。

分割线 =============================
新项目短时间来实现日志采集。
一台8G 4核 500G硬盘 服务器
项目部署在4台服务器,每台服务器通过phpsdk直接写入一台es服务器中。
(在本次部署中,没有使用logstash的功能)
没有使用异步队列,导致直接写入es可能会影响业务逻辑,但是目前只会在开发和测试环境使用。
主要利用elasticsearch 和 kibana
要使用x-pack做安全校验,包括给kibana加入登录授权功能
Elasticsearch
https://www.elastic.co/cn/pro...
Elasticsearch-clients
这里包含的多种语言的sdk包
https://www.elastic.co/guide/...
Kibana
https://www.elastic.co/cn/pro...
Logstash
https://www.elastic.co/cn/pro...
X-pack
安装流程说明很详细,秘钥生成后记得保存下,并且加入x-pack后,kibana和elasticsearch的通讯,需要修改配置文件。另外phpsdk也需要加入秘钥,后面说明。
https://www.elastic.co/cn/pro...
es的模板超级复杂的,所以我们要利用标准的现有的模板,需要从logstash中提取一个。
解压下载的logstash-6.1.2.tar,执行搜索命令
$ find ./ -name 'elasticsearch-template*' ./vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.0.2-java/lib/logstash/outputs/elasticsearch/elasticsearch-template-es2x.json ./vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.0.2-java/lib/logstash/outputs/elasticsearch/elasticsearch-template-es5x.json ./vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.0.2-java/lib/logstash/outputs/elasticsearch/elasticsearch-template-es6x.json
会发现有2x 5x 6x三个模板,这里我们选择6x,要根据你的es版本来选择。
然后创建索引,索引要在数据写入之前创建,因为要给每个字段设置类型。
https://www.elastic.co/guide/...
按照文档的方式,将获取到的6x通过curl的方式写入到es
因为默认的es配置是开启了localhost:9200端口,用于执行RESTFUL,但是本次我们采用php-sdk的方式,直接写入es,就要求每台业务服务器,都能访问到es。
# ---------------------------------- Network ----------------------------------- # # Set the bind address to a specific IP (IPv4 or IPv6): # 修改此处的host配置为0.0.0.0,这样所有的请求都可以接入进来 network.host: 0.0.0.0 # # Set a custom port for HTTP: # #http.port: 9200
下载地址
https://www.elastic.co/guide/...
try {
$hosts = [
// 这个地方要填写x-pack分配的密码和用户名
'http://{用户名}:{密码}@192.168.1.11:9200', // HTTP Basic Authentication
// 'http://user2:[email protected]:9200' // Different credentials on different host
];
$client = ClientBuilder::create()->setHosts($hosts)->build();
$position = YnUtil::getPosition();
$params = [
'index' => 'logstash-yn-' . date('Ymd'),
// elastic6版本有个bug,每个索引只能有一个type类型
'type' => 'xxxx',
'id' => md5(rand(0, 999999999)) . (microtime(true) * 1000),
'body' => [
// 索引创建好后,写入数据一定要注意类型,不然会报错,我这里都会进行格式化一遍
// 类别作为一个主要字段用于区分日志
'category' => strval($category),
'appType' => strval(YnUtil::getAppType()),
'appVersion' => strval(YnUtil::getAppVersion()),
'host' => strval($hostInfo),
'uri' => strval($url),
'uid' => strval($user->getUid()),
'server' => strval(gethostname()),
'geoip' => [
'ip' => strval($ip),
// 这个很重要,可以实现geo可视化图形
'location' => [
'lat' => floatval($position['latitude']),
'lon' => floatval($position['longitude']),
],
],
'userAgent' => strval($userAgent),
'message' => Json::encode($message),
// 这里一定要写入毫秒时间戳
'@timestamp' => intval(microtime(true) * 1000),
],
];
$client->index($params);
} catch (\Exception $e) {
}官方说明
https://www.elastic.co/guide/...
我们这里以6x作为模板
{
// 将这个模板应用于所有以 logstash- 为起始的索引。
"template": "logstash-*",
"version": 60001,
"settings": {
"index.refresh_interval": "5s"
},
"mappings": {
"_default_": {
// 动态模板说明,很重要,配置了动态模板后,我们可以添加任意字段
// https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html
"dynamic_templates": [{
// 信息字段 官方说这里可以自定义 The template name can be any string value.
"message_field": {
"path_match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false
}
}
}, {
// 字符串字段说明
"string_fields": {
// 匹配所有
"match": "*",
// 并且字段类型是string的
"match_mapping_type": "string",
//
"mapping": {
"type": "text",
// 这里应该是和受欢迎程度评分相关
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}],
// 定义好的属性说明
"properties": {
// 时间字段,这个很重要,不然kibana不会出现时间相关的查询控件
"@timestamp": {
"type": "date"
},
"@version": {
"type": "keyword"
},
// 这个可以只写入properies里面的任意一个字段
"geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
// 我只是用了这个location
"location": {
"type": "geo_point"
},
"latitude": {
"type": "half_float"
},
"longitude": {
"type": "half_float"
}
}
}
}
}
}
}使用logstash内置的模板
检查是否是毫秒时间戳
需要在链接中添加用户名和密码
写入类型,一定要和索引模板中定义的一致,不然肯定报错!
elasticsearch6的 bug,官方承诺在7进行修复
安装x-pack吧
使用supervisor
我的方案是:定时脚本来清理7天之前的索引DELETE logstash-xxxx
在kibana中有一个Dev Tools 可以执行curl,并且看到结果
在kibana菜单的Management->Index Patterns中可以管理
不详细的地方,可以留言