huangzhe0 2017-09-14
把 Elasticsearch 当数据库使 系列:
推销Elasticsearch
时间序列数据库的秘密(1)—— 介绍
时间序列数据库的秘密(2)——索引
时间序列数据库的秘密(3)——加载和分布式计算
用SQL查询Elasticsearch
https://github.com/taowen/es-monitor
【01】把 Elasticsearch 当数据库使:表结构定义
【02】把 Elasticsearch 当数据库使:过滤和排序
【03】把 Elasticsearch 当数据库使:简单指标
【04】把 Elasticsearch 当数据库使:按字段聚合
【05】把 Elasticsearch 当数据库使:HISTOGRAM聚合
【06】把 Elasticsearch 当数据库使:CASE WHEN 聚合
【07】把 Elasticsearch 当数据库使:聚合后排序
【08】把 Elasticsearch 当数据库使:计算后再聚合
【09】把 Elasticsearch 当数据库使:HAVING与Pipeline Aggregation
【10】把 Elasticsearch 当数据库使:Drill Down 下钻
【11】把 Elasticsearch 当数据库使:Filter 下钻
【12】把 Elasticsearch 当数据库使:聚合后再计算
【13】把 Elasticsearch 当数据库使:Join
使用 https://github.com/taowen/es-monitor 可以用 SQL 进行 elasticsearch 的查询。要真正把Elasticsearch当作数据库来使,Join是一个绕不过的话题。关于Elasticsearch如何支持join,这个slide总结得很好:http://www.slideshare.net/sirensolutions/searching-relational-data-with-elasticsearch。总体来说有这么几种方式:
完全不join,把关联表的字段融合到一张表里。当然这会造成数据的冗余
录入的时候join:使用 nested documents(nested document和主文档是同segment存储的,对于一个symbol,几千万个quote这样的场景就不适合了)
录入的时候join:使用 siren
查询时join:使用 parent/child (这个是elasticsearch的特性,要求parent/child同shard存在)
查询时join:使用 siren-joins(就是一个在服务端求值的filter,然后把结果发布给每个shard去做二次match)
查询时join:在客户端拼装第二个查询(和siren-joins差不多,但是多了一次客户端到服务器的来回)
查询时join:在coordinate节点上做两个查询的join合并(https://github.com/NLPchina/elasticsearch-sql)
我个人喜欢的是siren-joins和客户端拼装这两种方案。这两种方案都是先做了一次查询,把查询结果再次分发到每个分布式节点上再次去做分布式的聚合。相比在coordinate节点上去做join合并更scalable。
首先我来看如何在客户端完成结果集的求值
$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200 SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000; SAVE RESULT AS finance_symbols; EOF
这里引入的 SAVE RESULT AS 就是用于触发前面的SQL的求值,并把结果集命名为 finance_symbols。如果因为一些中间结果我们不需要,我们也可以用REMOVE 命令把求值结果删除
$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200 SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000; SAVE RESULT AS finance_symbols; REMOVE RESULT finance_symbols; EOF
甚至我们可以使用任意的python代码来修改result_map。
$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200 SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000; SAVE RESULT AS finance_symbols; result_map['finance_symbols'] = result_map['finance_symbols'][1:-1]; EOF
在客户端求值的基础上,我们可以利用客户端保留的结果集来发第二个请求。
cat << EOF | python2.6 es_query.py http://127.0.0.1:9200 SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 5; SAVE RESULT AS finance_symbols; SELECT MAX(adj_close) FROM quote JOIN finance_symbols ON quote.symbol = finance_symbols.symbol; REMOVE RESULT finance_symbols; EOF
这个产生的Elaticsearch请求是这样的两条:
{ "query": { "term": { "sector": "Finance" } }, "size": 5 }
然后根据其返回,产生了第二个请求
{ "query": { "bool": { "filter": [ {}, { "terms": { "symbol": [ "TFSC", "TFSCR", "TFSCU", "TFSCW", "PIH" ] } } ] } }, "aggs": { "MAX(adj_close)": { "max": { "field": "adj_close" } } }, "size": 0 }
可以看到,所谓客户端join,就是用前一次的查询结果拼出了第二次查询的条件(terms filter)。
有了 siren-join 插件(https://github.com/sirensolutions/siren-join),我们可以在服务端完成同样的join操作
cat << EOF | python2.6 es_query.py http://127.0.0.1:9200 WITH finance_symbols AS (SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 5); SELECT MAX(adj_close) FROM quote JOIN finance_symbols ON quote.symbol = finance_symbols.symbol; EOF
前面第一个查询是用SAVE RESULT AS求值并命名为finance_symbols,这里我们并没有求值而是给其取了一个名字(WITH AS),然后就可以引用了。
{ "query": { "bool": { "filter": [ {}, { "filterjoin": { "symbol": { "indices": "symbol*", "path": "symbol", "query": { "term": { "sector": "Finance" } } } } } ] } }, "aggs": { "MAX(adj_close)": { "max": { "field": "adj_close" } } }, "size": 0 }
可见产生的filterjoin把两步合为一步了。注意对于filterjoin查询,需要POST _coordinate_search 而不是_search这个URL。
Profile
[ { "query": [ { "query_type": "BoostQuery", "lucene": "ConstantScore(BytesFieldDataTermsQuery::[size=8272])^0.0", "time": "29.32334300ms", "breakdown": { "score": 0, "create_weight": 360426, "next_doc": 137906, "match": 0, "build_scorer": 15027540, "advance": 0 }, "children": [ { "query_type": "BytesFieldDataTermsQuery", "lucene": "BytesFieldDataTermsQuery::[size=8272]", "time": "13.79747100ms", "breakdown": { "score": 0, "create_weight": 14903, "next_doc": 168010, "match": 0, "build_scorer": 13614558, "advance": 0 } } ] } ], "rewrite_time": 30804, "collector": [ { "name": "MultiCollector", "reason": "search_multi", "time": "1.529236000ms", "children": [ { "name": "TotalHitCountCollector", "reason": "search_count", "time": "0.08967800000ms" }, { "name": "MaxAggregator: [MAX(adj_close)]", "reason": "aggregation", "time": "0.1675550000ms" } ] } ] } ]
从profile的结果来看,其原理也是 terms filter(BytesFieldDataTermsQuery)。所以这也就决定了这种join只是伪join。真正的join不仅仅可以用第一个表去filter第二个表,而且要能够在第二个查询的计算阶段引用第一个阶段的结果。这个是仅仅用terms filter无法完成的。当然所有这些join的努力仅仅是让数据维护变得更加容易而已,如果我们真的要求Elasticsearch的join和传统SQL一样强大,那么我们也无法指望那么复杂的join可以快到哪里去,也就失去了使用Elasticsearch的意义了。有了上面两种Join方式,我们可以在极度快速和极度灵活之间获得一定的选择权利。
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。