csmnjk 2019-07-01
(1)先构造一条数据
PUT /test_index/_doc/1 { "test_field":"test test" }
(2)模拟两个客户端都获取到同一条数据
GET /test_index/_doc/1
(3)其中一个客户端先更新了一下这个数据
PUT /test_index/_doc/1?version=1 { "test_field": "test client 1" }
(4)另外一个客户端,尝试基于version=1的数据去修改,同样带上version版本号,进行乐观锁的并发控制
PUT /test_index/_doc/1?version=1 { "test_field": "test client 2" }
此时就会报错version conflict
(5)在乐观锁成功阻止并发问题之后,尝试正确的完成更新
GET /test_index/_doc/1
对于更新失败的客户端,查询出最新的版本号之后,基于最新的数据和版本号去再次进行修改,修改时,带上最新的版本号,可能这个步骤会需要反复执行好几次,才能成功,特别是在多线程并发更新同一条数据很频繁的情况下
注意特别说明,要完成上面的实验需要es版本低于6.7。
version Deprecated in 6.7.0. Please use if_seq_no & if_primary_term instead. See Optimistic concurrency control for more details.
这段说得很清楚,高版本是使用if_seg_no和if_primary_term这两个参数实现的。
下面把上面的实现用高版本做一下。我用的版本是7.0的
(1)先构造一条数据
PUT /test_index/_doc/1 { "test_field":"test test" } { "_index" : "test_index", "_type" : "_doc", "_id" : "1", "_version" : 3, "result" : "created", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 2, "_primary_term" : 1 }
(2)模拟两个客户端,都获取到了同一条数据
GET /test_index/_doc/1 { "_index" : "test_index", "_type" : "_doc", "_id" : "1", "_version" : 3, "_seq_no" : 2, "_primary_term" : 1, "found" : true, "_source" : { "test_field" : "test test" } }
(3)其中一个客户端,先更新了一下这个数据
PUT /test_index/_doc/1?if_seq_no=2&if_primary_term=1 { "test_field":"test client 1" } { "_index" : "test_index", "_type" : "_doc", "_id" : "1", "_version" : 4, "result" : "updated", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 3, "_primary_term" : 1 }
(4)另外一个客户端,尝试基于if_seq_no和if_primary_term的数据进行修改,进行乐观锁的并发控制
PUT /test_index/_doc/1?if_seq_no=2&if_primary_term=1 { "test_field":"test client 2" } { "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[1]: version conflict, required seqNo [2], primary term [1]. current document has seqNo [3] and primary term [1]", "index_uuid": "0jAS2GP1TPG5J8PlqGdYIQ", "shard": "4", "index": "test_index" } ], "type": "version_conflict_engine_exception", "reason": "[1]: version conflict, required seqNo [2], primary term [1]. current document has seqNo [3] and primary term [1]", "index_uuid": "0jAS2GP1TPG5J8PlqGdYIQ", "shard": "4", "index": "test_index" }, "status": 409 }
(5)在乐观锁成功阻止并发问题之后,尝试正确的完成更新
GET /test_index/_doc/1 { "_index" : "test_index", "_type" : "_doc", "_id" : "1", "_version" : 4, "_seq_no" : 3, "_primary_term" : 1, "found" : true, "_source" : { "test_field" : "test client 1" } }
基于最新的数据和if_seq_no,if_primary_term进行修改,可能这个过程会需要反复执行好几次,才能成功,特别是在多线程并发更新同一条数据很频繁的情况下
PUT /test_index/_doc/1?if_seq_no=3&if_primary_term=1 { "test_field":"test client 2" } { "_index" : "test_index", "_type" : "_doc", "_id" : "1", "_version" : 5, "result" : "updated", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 4, "_primary_term" : 1 } GET /test_index/_doc/1 { "_index" : "test_index", "_type" : "_doc", "_id" : "1", "_version" : 5, "_seq_no" : 4, "_primary_term" : 1, "found" : true, "_source" : { "test_field" : "test client 2" } }
对于if_seq_no和if_primary_term,官方文档已经有比较详细的叙述,https://www.elastic.co/guide/... 。这里我说下简单的理解方式,对于if_primary_term记录的就是具体的哪个主分片,而if_seq_no这个参数起的作用和旧版本中的_version是一样的,之所以加上if_primary_term这个参数主要是提高并发的性能以及更自然,因为每个document都只会在某一个主分片中,所以由所在主分片分配序列号比由之前通过一个参数_version,相当于由整个ES集群分配版本号要来的更好。
To ensure an older version of a document doesn’t overwrite a newer version, every operation performed to a document is assigned a sequence number by the primary shard that coordinates that change. The sequence number is increased with each operation and thus newer operations are guaranteed to have a higher sequence number than older operations. Elasticsearch can then use the sequence number of operations to make sure a newer document version is never overridden by a change that has a smaller sequence number assigned to it.
简单翻译就是为确保较旧版本的文档不会覆盖较新版本,对文档执行的每个操作都会由协调该更改的主分片分配序列号。每次操作都会增加序列号,因此保证较新的操作具有比旧操作更高的序列号。然后,Elasticsearch可以使用序列号操作来确保更新的文档版本永远不会被分配给它的序列号更小的更改覆盖。
具体的实战就不做了,本质思想也很简单,就是版本号是存储在自己的数据库中的,可以由开发人员自己控制。但是在6.7版本之后,就移除这个功能,主要是因为:
The update API does not support versioning other than internal
External (version types external and external_gte) or forced (version type force) versioning is not supported by the update API as it would result in Elasticsearch version numbers being out of sync with the external system.
更新API不支持外部(版本类型external和external_gte)或强制(版本类型force)版本控制,因为它会导致Elasticsearch版本号与外部系统不同步
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。