mengyue 2020-04-30
Elasticsearch在文档更新时默认使用的是乐观锁方案,而Elasticsearch利用文档的一些create限制条件,也能达到悲观锁的效果,我们一起来看一看。
ES默认实现乐观锁,所有的数据更新默认使用乐观锁机制。document更新时,必须要带上currenct version,更新时与document的version进行比较,如果相同进行更新操作,不相同表示已经被别的线程更新过了,此时更新失败,并且重新获取新的version再尝试更新。
我们举一个这样的例子:Elasticsearch存储文件系统的目录、文件名信息,有多个线程需要对/home/workspace/ReadMe.txt进行追加修改,而且是并发执行的,有先后顺序之分,跟之前的库存更新案例有点不一样,此时单纯使用乐观锁,可能会出现乱序的问题。
这种场景就需要使用悲观锁控制,保证线程的执行顺序,有一个线程在修改,其他的线程只能挂起等待。悲观锁通过/index/lock/实现,只有一个线程能做修改操作,其他线程block掉。
悲观锁有三种,分别对应三种粒度,由粗到细可为分:
我们使用锁的基本步骤都是一样的,无论是关系型数据库、Redis/Memcache/Zookeeper分布式锁,还是今天介绍的Elasticsearch实现的锁机制,都有如下三步:
假定有两个线程,线程1和线程2
PUT /files/file/global/_create {}
POST /files/file/global/_update { "doc": { "name":"ReadMe.txt" } }
# 请求: PUT /files/file/global/_create {} # 响应: { "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[file][global]: version conflict, document already exists (current version [1])", "index_uuid": "_6E1d7BLQmy9-7gJptVp7A", "shard": "2", "index": "files" } ], "type": "version_conflict_engine_exception", "reason": "[file][global]: version conflict, document already exists (current version [1])", "index_uuid": "_6E1d7BLQmy9-7gJptVp7A", "shard": "2", "index": "files" }, "status": 409 }
DELETE files/file/global
PUT /files/file/global/_create {}
响应
{ "_index": "files", "_type": "file", "_id": "global", "_version": 3, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "_seq_no": 2, "_primary_term": 1 }
全局锁本质上是所有线程都用_create语法来创建id为global的文档,利用Elasticsearch对_create语法的校验来实现锁的目的。
优点:操作简单,容易使用,成本低。
缺点:直接锁住整个索引,除了加锁的那个线程,其他所有对此索引的线程都block住了,并发量较低。
适用场景:读多写少的数据,并且加解锁的时间非常短,类似于数据库的表锁。
注意事项:加锁解锁的控制必须严格在程序里定义,因为单纯基于doc的锁控制,如果id固定使用global,在有锁的情况,任何线程执行delete操作都是可以成功的,因为大家都知道id。
document level级别的锁是更细粒度的锁,以文档为单位进行锁控制。
我们新建一个索引专门用于加锁操作:
PUT /files-lock/_mapping/lock { "properties": { } }
我们先创建一个script脚本,ES6.0以后默认使用painless脚本:
POST _scripts/document-lock { "script": { "lang": "painless", "source": "if ( ctx._source.process_id != params.process_id ) { Debug.explain(‘already locked by other thread‘); } ctx.op = ‘noop‘;" } }
Debug.explain表示抛出一个异常,内容为already locked by other thread。
ctx.op = ‘noop‘表示不执行更新。
POST /files-lock/lock/1/_update { "upsert": { "process_id": "181ab3ee-28cc-4339-ba35-69802e06fe42" }, "script": { "id": "document-lock", "params": { "process_id": "181ab3ee-28cc-4339-ba35-69802e06fe42" } } }
响应结果:
{ "_index": "files-lock", "_type": "lock", "_id": "1", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "_seq_no": 0, "_primary_term": 1 }
{ "_index": "files-lock", "_type": "lock", "_id": "1", "_version": 1, "found": true, "_source": { "process_id": "181ab3ee-28cc-4339-ba35-69802e06fe42" } }
POST /files-lock/lock/1/_update { "upsert": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" }, "script": { "id": "document-lock", "params": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" } } }
提示该文档已经被别的线程(线程1)锁住了,你不能更新了,响应报文如下:
{ "error": { "root_cause": [ { "type": "remote_transport_exception", "reason": "[node-1][192.168.17.137:9300][indices:data/write/update[s]]" } ], "type": "illegal_argument_exception", "reason": "failed to execute script", "caused_by": { "type": "script_exception", "reason": "runtime error", "painless_class": "java.lang.String", "to_string": "already locked by other thread", "java_class": "java.lang.String", "script_stack": [ "Debug.explain(‘already locked by other thread‘); } ", " ^---- HERE" ], "script": "judge-lock", "lang": "painless", "caused_by": { "type": "painless_explain_error", "reason": null } } }, "status": 400 }
POST /files/file/1/_update { "doc": { "name":"README1.txt" } }
DELETE /files-lock/lock/1
POST /files-lock/lock/1/_update { "upsert": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" }, "script": { "id": "document-lock", "params": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" } } }
结果:
{ "_index": "files-lock", "_type": "lock", "_id": "1", "_version": 3, "found": true, "_source": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" } }
此时锁的process_id变成线程2传入的"a6d13529-86c0-4422-b95a-aa0a453625d5"
{ "_index": "files-lock", "_type": "lock", "_id": "1", "_version": 3, "found": true, "_source": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" } }
这样基于ES的行锁操作控制过程就完成了。
update+upsert操作,如果该记录没加锁(此时document为空),执行upsert操作,设置process_id,如果已加锁,执行script
script内的逻辑是:判断传入参数与当前doc的process_id,如果不相等,说明有别的线程尝试对有锁的doc进行加锁操作,Debug.explain表示抛出一个异常。
process_id可以由Java应用系统里生成,如UUID。
如果两个process_id相同,说明当前执行的线程与加锁的线程是同一个,ctx.op = ‘noop‘表示什么都不做,返回成功的响应,Java客户端拿到成功响应的报文,就可以继续下一步的操作,一般这里的下一步就是执行事务方法。
文档级别的锁颗粒度小,并发性高,吞吐量大,类似于数据库的行锁。
共享锁:允许多个线程获取同一条数据的共享锁进行读操作
排他锁:同一条数据只能有一个线程获取排他锁,然后进行增删改操作
互斥性:共享锁与排他锁是互斥的,如果这条数据有共享锁存在,那么排他锁无法加上,必须得共享锁释放完了,排他锁才能加上。
反之也成立,如果这条数据当前被排他锁锁信,那么其他的排他锁不能加,共享锁也加不上。必须等这个排他锁释放完了,其他锁才加得上。
有人在改数据,就不允许别人来改,也不让别人来读。
读写锁的分离
如果只是读数据,每个线程都可以加一把共享锁,此时该数据的共享锁数量一直递增,如果这时有写数据的请求(写请求是排他锁),由于互斥性,必须等共享锁全部释放完,写锁才加得上。
有人在读数据,就不允许别人来改。
我们先创建一个共享锁的脚本:
# 读操作加锁脚本 POST _scripts/rw-lock { "script": { "lang": "painless", "source": "if (ctx._source.lock_type == ‘exclusive‘) { Debug.explain(‘one thread is writing data, the lock is exclusive now‘); } ctx._source.lock_count++" } } # 读操作完毕释放锁脚本 POST _scripts/rw-unlock { "script": { "lang": "painless", "source": "if ( --ctx._source.lock_count == 0) { ctx.op = ‘delete‘ }" } }
POST /files-lock/lock/1/_update { "upsert": { "lock_type": "shared", "lock_count": 1 }, "script": { "id": "rw-lock" } }
在多个页面上尝试,可以看到lock_count在逐一递增,模拟多个线程同时读一个文档的操作。
PUT /files-lock/lock/1/_create { "lock_type": "exclusive" }
结果肯定会报错:
{ "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[lock][1]: version conflict, document already exists (current version [8])", "index_uuid": "XD7LFToWSKe_6f1EvLNoFw", "shard": "3", "index": "files-lock" } ], "type": "version_conflict_engine_exception", "reason": "[lock][1]: version conflict, document already exists (current version [8])", "index_uuid": "XD7LFToWSKe_6f1EvLNoFw", "shard": "3", "index": "files-lock" }, "status": 409 }
POST /files-lock/lock/1/_update { "script": { "id": "rw-unlock" } }
释放1次lock_count减1,减到0时,说明所有的共享锁已经释放完毕,就把这个doc删除掉
PUT /files-lock/lock/1/_create { "lock_type": "exclusive" }
此时能够加锁成功,响应报文:
{ "_index": "files-lock", "_type": "lock", "_id": "1", "_version": 1, "found": true, "_source": { "lock_type": "exclusive" } }
{ "error": { "root_cause": [ { "type": "remote_transport_exception", "reason": "[node-1][192.168.17.137:9300][indices:data/write/update[s]]" } ], "type": "illegal_argument_exception", "reason": "failed to execute script", "caused_by": { "type": "script_exception", "reason": "runtime error", "painless_class": "java.lang.String", "to_string": "one thread is writing data, the lock is exclusive now", "java_class": "java.lang.String", "script_stack": [ "Debug.explain(‘one thread is writing data, the lock is exclusive now‘); } ", " ^---- HERE" ], "script": "rw-lock", "lang": "painless", "caused_by": { "type": "painless_explain_error", "reason": null } } }, "status": 400 }
DELETE /files-lock/lock/1
读锁的加锁脚本和释放锁脚本,成对出现,用来统计线程的数量。
写锁利用_create
语法来实现,如果有线程对某一文档有读取操作,那么对这个文档执行_create操作肯定报错。
利用Elasticsearch一些语法的特性,加上painless脚本的配合,也能完整的复现全局锁、行锁、读写锁的特性,实现的思路还是挺有意思的,跟使用redis、zookeeper实现分布式锁有异曲同工之处,只是生产案例上用redis实现分布式锁是比较成功的实践,Elasticsearch的对这种分布式锁的实现方式可能不是最佳实践,但也可以了解一下。
专注Java高并发、分布式架构,更多技术干货分享与心得,请关注公众号:Java架构社区
可以扫左边二维码添加好友,邀请你加入Java架构社区微信群共同探讨技术
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。