zhaojp0 2020-01-18
我们使用Elasticsearch索引文档时,最理想的情况是文档JSON结构是确定的,数据源源不断地灌进来即可,但实际情况中,没人能够阻拦需求的变更,在项目的某个版本,可能会对原有的文档结构造成冲击,增加新的字段还好,如果要修改原有的字段,只能重建索引了。
本篇以实战方式讲解如何零停机完成索引重建的三种方案。
系统架构设计中,有关系型数据库用来存储数据,Elasticsearch在系统架构里起到查询加速的作用,如果遇到索引重建的操作,待系统模块发布新版本后(若重建索引不是因为客户端修改导致的,可以不停机发版,直接操作),可以从数据库将数据查询出来,重新灌到Elasticsearch即可。
建议的功能方案:数据库 + MQ + 应用模块 + Elasticsearch,可以在MQ控制台发送MQ消息来触发重导数据,按批次对数据进行导入,整个过程异步化处理,请求操作示意如下所示:
详细操作步骤:
这样就可以完成索引的重建工作。
MQ中间件的选型不做具体要求,常见的rabitmq、activemq、rocketmq等均可。
在微服务模块方面,提供MQ消息处理接口、数据处理模块需要事先开发的,一般是创建新的索引时,配套把重建的功能也一起做好。整体功能共用一个topic,针对每个索引,有单独的结构定义和MQ消息处理tag,代码尽可能复用。处理的批次大小需要根据实际的情况设置。
微服务模块实例会部署多个,数据是分批处理的,批次信息会一次性全部先发送给MQ,各个实例处理的数据相互不重叠,利用MQ消息的异步处理机制,可以充分利用并发的优势,加快数据重建的速度。
利用Elasticsearch自带的一些工具完成索引的重建工具,当然在方案实际落地时,可能也会依赖客户端的一些功能,比如用Java客户端持续的做scroll查询、bulk命令的封装等,但与上一方案相比,最明显的区别就是:数据完全自给自足,不依赖其他数据源。
假设原索引名称是music,新的索引名称为music_new,Java客户端使用别名music_alias连接Elasticsearch,该别名指向原索引music。
PUT /music/_alias/music_alias
GET /music/_search?scroll=1m { "query": { "match_all": {} }, "sort": ["_doc"], "size": 1000 }
POST /_bulk { "index": { "_index": "music_new", "_type": "children", "_id": "1" }} { "name": "wake me, shake me" }
POST /_aliases { "actions": [ { "remove": { "index": "music", "alias": "music_alias" }}, { "add": { "index": "music_new", "alias": "music_alias" }} ] }
在数据传输上基本自给自足,不依赖于其他数据源,Java客户端不需要停机等待数据迁移,网络传输占用带宽较小。
只是scroll查询和bulk提交这部分,数据量大时需要依赖一些客户端工具。
在Java客户端或其他客户端访问Elasticsearch集群时,使用别名是一个好习惯。
Elasticsearch v6.3.1已经支持Reindex API,它对scroll、bulk做了一层封装,能够 对文档重建索引而不需要任何插件或外部工具。
最基础的命令:
POST _reindex { "source": { "index": "music" }, "dest": { "index": "music_new" } }
响应结果:
{ "took": 180, "timed_out": false, "total": 4, "updated": 0, "created": 4, "deleted": 0, "batches": 1, "version_conflicts": 0, "noops": 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": -1, "throttled_until_millis": 0, "failures": [] }
注意:
如果不手动创建新索引music_new的mapping信息,那么Elasticsearch将启动自动映射模板对数据进行类型映射,可能不是期望的类型,这点要注意一下。
使用reindex api也是创建快照后再执行迁移的,这样目标索引的数据可能会与原索引有差异,version_type属性可以决定乐观锁并发处理的规则。
reindex api可以设置version_type属性,如下:
POST _reindex { "source": { "index": "music" }, "dest": { "index": "music_new" "version_type": "internal" } }
version_type属性含义如下:
如果op_type设置为create,那么迁移时只在目标索引中创建ID不存在的文档,已存在的文档,会提示错误,如下请求:
POST _reindex { "source": { "index": "music" }, "dest": { "index": "music_new", "op_type": "create" } }
有错误提示的响应,节选部分:
{ "took": 11, "timed_out": false, "total": 5, "updated": 0, "created": 1, "deleted": 0, "batches": 1, "version_conflicts": 4, "noops": 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": -1, "throttled_until_millis": 0, "failures": [ { "index": "music_new", "type": "children", "id": "2", "cause": { "type": "version_conflict_engine_exception", "reason": "[children][2]: version conflict, document already exists (current version [17])", "index_uuid": "dODetUbATTaRL-p8DAEzdA", "shard": "2", "index": "music_new" }, "status": 409 } ] }
如果加上"conflicts": "proceed"配置项,那么冲突信息将不展示,只展示冲突的文档数量,请求和响应结果将变成这样:
请求:
POST _reindex { "conflicts": "proceed", "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "op_type": "create" } }
响应:
{ "took": 12, "timed_out": false, "total": 5, "updated": 0, "created": 1, "deleted": 0, "batches": 1, "version_conflicts": 4, "noops": 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": -1, "throttled_until_millis": 0, "failures": [] }
reindex api支持数据过滤、数据排序、size设置、_source选择等,也支持脚本执行,这里提供一个简单示例:
POST _reindex { "size": 100, "source": { "index": "music", "query": { "term": { "language": "english" } }, "sort": { "likes": "desc" } }, "dest": { "index": "music_new" } }
本篇介绍了零停机索引重建操作的三个方案,从自研功能、scroll+bulk到reindex,我们作为Elasticsearch的使用者,三个方案的参与度是逐渐弱化的,但稳定性却是逐渐上升的,我们需要清楚地去了解各个方案的优劣,适宜的场景,然后根据实际的情况去权衡,哪个方案更适合我们的业务模型,仅供参考,谢谢。
专注Java高并发、分布式架构,更多技术干货分享与心得,请关注公众号:Java架构社区
可以扫左边二维码添加好友,邀请你加入Java架构社区微信群共同探讨技术
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。